Google Cloud Composer is a fully managed workflow orchestration service built on Apache Airflow and operated using Python. It enables you to author, schedule and monitor workflows as directed acyclic graphs (DAGs) of tasks.
In this tutorial, we will walk you through how you can connect to an on-premise datasource Hive behind a private firewall from Google Cloud Composer using the Progress DataDirect Hybrid Data Pipeline.
Follow these easy instructions to get started.
Once you have everything set up, navigate to http://<server-address>:8080/d2c-ui or https://<server-address>:8443/d2c-ui to view the Hybrid Data Pipeline UI.
from
datetime
import
timedelta
import
airflow
import
os
import
csv
from
airflow
import
DAG
from
airflow.operators.python_operator
import
PythonOperator
from
airflow.operators.dummy_operator
import
DummyOperator
from
airflow.hooks.jdbc_hook
import
JdbcHook
#Creating JDBC connection using Conn ID
JdbcConn
=
JdbcHook(jdbc_conn_id
=
'hdpjdbc'
)
def
getconnection():
JdbcConn.get_connection(
'hdpjdbc'
)
print
(
"connected"
)
def
writerrecords():
id
=
JdbcConn.get_records(sql
=
"SELECT * FROM customer"
)
with open(
'records.csv'
,
'w'
) as csvFile:
writer
=
csv.writer(csvFile)
writer.writerows(id)
def
displyrecords():
with open(
'records.csv'
,
'rt'
)as f:
data
=
csv.reader(f)
for
row
in
data:
print
(row)
default_args
=
{
'owner'
:
'airflow'
,
'depends_on_past'
:
False
,
'start_date'
: airflow.utils.dates.days_ago(
2
),
'email'
: [
'airflow@example.com'
],
'email_on_failure'
:
False
,
'email_on_retry'
:
False
,
'retries'
:
1
,
'retry_delay'
: timedelta(minutes
=
5
),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag
=
DAG(
'datadirect_sample'
, default_args
=
default_args,
schedule_interval
=
"@daily"
)
t1
=
PythonOperator(
task_id
=
'getconnection'
,
python_callable
=
getconnection,
dag
=
dag,
)
t2
=
PythonOperator(
task_id
=
'WriteRecords'
,
python_callable
=
writerrecords,
dag
=
dag,
)
t3
=
PythonOperator(
task_id
=
'DisplayRecords'
,
python_callable
=
displyrecords,
dag
=
dag,
)
t1 >> t2 >> t3
We hope this tutorial helped you to get started with how you can access on-premises Hadoop Data from Google Cloud Composer. Please contact us if you need any help or have any questions.