




Pipeline p = Pipeline.create( PipelineOptionsFactory.fromArgs(args).withValidation().create());PCollection<List<String>> rows = p.apply(JdbcIO.<List<String>>read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "com.ddtek.jdbc.ddhybrid.DDHybridDriver", "jdbc:datadirect:ddhybrid://<;Hybrid-Data-Pipline-Server-Address>;hybridDataPipelineDataSource=<Data Source Name>") .withUsername("Username") .withPassword("Password")) .withQuery("SELECT * FROM TEST01.CUSTOMER") .withCoder(ListCoder.of(StringUtf8Coder.of())) .withRowMapper(new JdbcIO.RowMapper<List<String>>() { public List<String> mapRow(ResultSet resultSet) throws Exception { List<String> addRow = new ArrayList<String>(); //Get the Schema for BigQuery if(schema == null) { schema = getSchemaFromResultSet(resultSet); } //Creating a List of Strings for each Record that comes back from JDBC Driver. for(int i=1; i<= resultSet.getMetaData().getColumnCount(); i++ ) { addRow.add(i-1, String.valueOf(resultSet.getObject(i))); } //LOG.info(String.join(",", addRow)); return addRow; } }) ).apply(ParDo.of(new DoFn<List<String>, List<String>>() { @ProcessElement //Apply Transformation - Mask the EmailAddresses by Hashing the value public void processElement(ProcessContext c) { List<String> record = c.element(); List<String> record_copy = new ArrayList(record); String hashedEmail = hashemail(record.get(11)); record_copy.set(11, hashedEmail); c.output(record_copy); } }));PCollection<TableRow> tableRows = rows.apply(ParDo.of(new DoFn<List<String>, TableRow>() { @ProcessElement//Convert the rows to TableRows of BigQuery public void processElement(ProcessContext c) { TableRow tableRow = new TableRow(); List<TableFieldSchema> columnNames = schema.getFields(); List<String> rowValues = c.element(); for(int i =0; i< columnNames.size(); i++) { tableRow.put(columnNames.get(i).get("name").toString(), rowValues.get(i)); } c.output(tableRow); } }));//Write Table Rows to BigQuery tableRows.apply(BigQueryIO.writeTableRows() .withSchema(schema) .to("nodal-time-161120:Oracle.CUSTOMER") .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));Note: Before you run the pipeline, Go to BigQuery Console and create the table with same schema as your Oracle Table.



