Google Cloud Dataflow is a data processing service for both batch and real-time data streams. Dataflow allows you to build pipes to ingest data, then transform and process according to your needs before making that data available to analysis tools.
In this tutorial, you'll learn how to easily extract, transform and load (ETL) Salesforce data into Google BigQuery using Google Cloud Dataflow and DataDirect Salesforce JDBC drivers. The tutorial below uses a Java project, but similar steps would apply with Apache Beam to read data from JDBC data sources including SQL Server, IBM DB2, Amazon Redshift, Eloqua, Hadoop Hive and more.
java -jar PROGRESS_DATADIRECT_JDBC_SF_ALL.jar
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<List<String>> rows = p.apply(JdbcIO.<List<String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.ddtek.jdbc.sforce.SForceDriver"
,
"jdbc:datadirect:sforce://login.salesforce.com;SecurityToken=<Security Token>"
)
.withUsername(
"<username>"
)
.withPassword(
"<password>"
))
.withQuery(
"SELECT * FROM SFORCE.NOTE"
)
.withCoder(ListCoder.of(StringUtf8Coder.of()))
.withRowMapper(
new
JdbcIO.RowMapper<List<String>>() {
public
List<String> mapRow(ResultSet resultSet) throws Exception {
List<String> addRow =
new
ArrayList<String>();
//Get the Schema for BigQuery
if
(schema ==
null
)
{
schema = getSchemaFromResultSet(resultSet);
}
//Creating a List of Strings for each Record that comes back from JDBC Driver.
for
(
int
i=1; i<= resultSet.getMetaData().getColumnCount(); i++ )
{
addRow.add(i-1, String.valueOf(resultSet.getObject(i)));
}
//LOG.info(String.join(",", addRow));
return
addRow;
}
})
)
.apply(ParDo.of(
new
DoFn<List<String>, List<String>>() {
@ProcessElement
//Apply Transformation - Mask the EmailAddresses by Hashing the value
public
void
processElement(ProcessContext c) {
List<String> record = c.element();
List<String> record_copy =
new
ArrayList(record);
//String hashedEmail = hashemail(record.get(11));
//record_copy.set(11, hashedEmail);
c.output(record_copy);
}
}));
PCollection<TableRow> tableRows = rows.apply(ParDo.of(
new
DoFn<List<String>, TableRow>() {
@ProcessElement
//Convert the rows to TableRows of BigQuery
public
void
processElement(ProcessContext c) {
TableRow tableRow =
new
TableRow();
List<TableFieldSchema> columnNames = schema.getFields();
List<String> rowValues = c.element();
for
(
int
i =0; i< columnNames.size(); i++)
{
tableRow.put(columnNames.
get
(i).
get
(
"name"
).toString(), rowValues.
get
(i));
}
c.output(tableRow);
}
}));
//Write Table Rows to BigQuery
tableRows.apply(BigQueryIO.writeTableRows()
.withSchema(schema)
.to(
"nodal-time-161120:Salesforce.NOTE"
)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Note: Before you run the pipeline, Go to BigQuery Console and create the table with same schema as your Salesforce Table.
We hope this tutorial helped you to get started with how you can ETL Salesforce Data in to Google BigQuery using Google Cloud data flow. You can use similar process with any of the DataDirect JDBC drivers for Eloqua, Oracle Sales Cloud, Oracle Service Cloud, MongoDB, Cloudera etc. Please contact us if you need any help or have any questions.