JDBC TUTORIAL

Import Salesforce Data in to Google BigQuery using Google Cloud DataFlow

Updated: 26 Feb 2021

Introduction

Google Cloud Dataflow is a data processing service for both batch and real-time data streams. Dataflow allows you to build pipes to ingest data, then transform and process according to your needs before making that data available to analysis tools.

In this tutorial, you'll learn how to easily extract, transform and load (ETL) Salesforce data into Google BigQuery using Google Cloud Dataflow and DataDirect Salesforce JDBC drivers. The tutorial below uses a Java project, but similar steps would apply with Apache Beam to read data from JDBC data sources including SQL Server, IBM DB2, Amazon Redshift, Eloqua, Hadoop Hive and more.

Install Progress DataDirect Salesforce JDBC driver

  1. Download DataDirect Salesforce JDBC driver from here.
  2. To install the driver, you would have to execute the .jar package and you can do it by running the following command in terminal or just by double clicking on the jar package.
    java -jar PROGRESS_DATADIRECT_JDBC_SF_ALL.jar

  3. This will launch an interactive java installer using which you can install the Salesforce JDBC driver to your desired location as either a licensed or evaluation installation.
  4. Note that this will install Salesforce JDBC driver and bunch of other drivers too for your trial purposes in the same folder.

Setting up Google Cloud Data Flow SDK and Project

  1. Complete the steps in Before you begin section from this quick start from Google.
  2. To Create a new project in Eclipse, Go to File ->New -> Project.
  3. In the Google Cloud Platform directory, select Google Cloud Dataflow Java Project.

     

     new project for gc data flow java

     

  4. Fill in Group ID, Artifact ID.
  5. Select Project Template as Starter Project with a simple pipeline from the drop down
  6. Select Data Flow Version as 2.2.0 or above.

     

     select version

  7. Click Next and the Project should be created.
  8. Add the JDBC IO library for apache beam from maven and DataDirect Salesforce JDBC driver to the build path. You can find the Salesforce JDBC driver in the install path.

Creating the Pipeline

  1. In this tutorial the main goal will be to connect to an Salesforce, read the data, apply a simple transformation and write it to BigQuery. The code for this project has been uploaded to GitHub for your reference.
  2. Open the StarterPipeline.java file and clear all the code in main function.
  3. First thing you need to do is Create the Pipeline. To Create the Pipeline:

     

    Pipeline p = Pipeline.create(
        PipelineOptionsFactory.fromArgs(args).withValidation().create());
  4. This will be using input arguments to the program to configure the Pipeline.
  5. Connect to Salesforce via DataDirect Salesforce JDBC driver and read the data using JdbcIO.<T>read() method to a PCollection as shown below.

    PCollection<List<String>> rows = p.apply(JdbcIO.<List<String>>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                "com.ddtek.jdbc.sforce.SForceDriver", "jdbc:datadirect:sforce://login.salesforce.com;SecurityToken=<Security Token>")
                .withUsername("<username>")
                .withPassword("<password>"))
                .withQuery("SELECT * FROM SFORCE.NOTE")
                .withCoder(ListCoder.of(StringUtf8Coder.of()))
                .withRowMapper(new JdbcIO.RowMapper<List<String>>() {
                    public List<String> mapRow(ResultSet resultSet) throws Exception {
                         
                        List<String> addRow = new ArrayList<String>();
                        //Get the Schema for BigQuery
                        if(schema == null)
                        {
                             schema = getSchemaFromResultSet(resultSet);
                        }
                         
                        //Creating a List of Strings for each Record that comes back from JDBC Driver.
                        for(int i=1; i<= resultSet.getMetaData().getColumnCount(); i++ )
                        {
                            addRow.add(i-1, String.valueOf(resultSet.getObject(i)));
                        }
                         
                        //LOG.info(String.join(",", addRow));
                         
                        return addRow;
                    }
                })
    )

     

  6. Once you have the data in PCollection, apply transform and hash the data in any cloumn in the data(commented) as shown below by using ParDo to iterate through all the items in PCollection.

     

    .apply(ParDo.of(new DoFn<List<String>, List<String>>() {
                       @ProcessElement
                       //Apply Transformation - Mask the EmailAddresses by Hashing the value
                       public void processElement(ProcessContext c) {
                            
                           List<String> record = c.element();
          
                           List<String> record_copy = new ArrayList(record);
                           //String hashedEmail = hashemail(record.get(11));
                           //record_copy.set(11, hashedEmail);
                            
                           c.output(record_copy);
                            
                       }
    }));
  7. Then, convert the PCollection which has each row in format of List<String> to TableRow Object of BigQuery Model.

     

    PCollection<TableRow> tableRows =  rows.apply(ParDo.of(new DoFn<List<String>, TableRow>() {
             @ProcessElement
             //Convert the rows to TableRows of BigQuery
             public void processElement(ProcessContext c) {
                  
                 TableRow tableRow = new TableRow();
                 List<TableFieldSchema> columnNames = schema.getFields();
                 List<String> rowValues = c.element();
                 for(int i =0; i< columnNames.size(); i++)
                 {
                     tableRow.put(columnNames.get(i).get("name").toString(), rowValues.get(i));
                 }
     
                 c.output(tableRow);
             }
    }));

     

  8. Finally, write the data to BigQuery using BigQueryIO.writeTableRows() method as shown below.

     

    //Write Table Rows to BigQuery         
         tableRows.apply(BigQueryIO.writeTableRows()
                 .withSchema(schema)
                 .to("nodal-time-161120:Salesforce.NOTE")
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    Note: Before you run the pipeline, Go to BigQuery Console and create the table with same schema as your Salesforce Table.

  9. You can find all the code for this project in GitHub for your reference.

Running the Pipeline

  1. Go to Run-> Run Configurations. Under Pipeline Arguments, you should see two different options to run the pipeline.

     

    1. DirectRunner – Runs the Pipeline Locally
    2. DataFlowRunner – Runs the Pipeline on Google Cloud DataFlow

       

  2. To Run locally, set the Runner to DirectRunner and run it. Once the pipeline has finished running, you should see your Salesforce data in Google BigQuery.
  3. To Run the pipeline on Google Cloud Data Flow, set the Runner to DataFlowRunner and make sure that you choose your account, project ID and a staging location as shown below.

     

    configure cloud storage

  4. Under Arguments -> Program Arguments, set the path to tempLocation for BigQuery Write to store temporary files as shown below.

     

    configure arguments for project4

     

  5. Click on Run and you should now see a new job in Google Data Flow console starting. You can track the progress by clicking on the job and you should see a flow chart to show status of each stage.

     

    job in cloud data flow console

     

  6. Once the pipeline has run successfully, you can go to Google BigQuery console and run a query on table to see all your data.

salesforce data in BigQuery

 

 

We hope this tutorial helped you to get started with how you can ETL Salesforce Data in to Google BigQuery using Google Cloud data flow. You can use similar process with any of the DataDirect JDBC drivers for Eloqua, Oracle Sales Cloud, Oracle Service Cloud, MongoDB, Cloudera etc.  Please contact us if you need any help or have any questions.

Connect any application to any data source anywhere

Explore all DataDirect Connectors

Need additional help with your product?

Get Customer Support