Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<
List
<String>> rows = p.apply(JdbcIO.<
List
<String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.ddtek.jdbc.ddhybrid.DDHybridDriver", "jdbc:datadirect:ddhybrid://<;
Hybrid-Data-Pipline-Server-Address
>;hybridDataPipelineDataSource=<
Data
Source Name>")
.withUsername("Username")
.withPassword("Password"))
.withQuery("SELECT * FROM TEST01.CUSTOMER")
.withCoder(ListCoder.of(StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<
List
<String>>() {
public List<
String
> mapRow(ResultSet resultSet) throws Exception {
List<
String
> addRow = new ArrayList<
String
>();
//Get the Schema for BigQuery
if(schema == null)
{
schema = getSchemaFromResultSet(resultSet);
}
//Creating a List of Strings for each Record that comes back from JDBC Driver.
for(int i=1; i<= resultSet.getMetaData().getColumnCount(); i++ )
{
addRow.add(i-1, String.valueOf(resultSet.getObject(i)));
}
//LOG.info(String.join(",", addRow));
return addRow;
}
})
)
.apply(ParDo.of(new DoFn<
List
<String>, List<
String
>>() {
@ProcessElement
//Apply Transformation - Mask the EmailAddresses by Hashing the value
public void processElement(ProcessContext c) {
List<
String
> record = c.element();
List<
String
> record_copy = new ArrayList(record);
String hashedEmail = hashemail(record.get(11));
record_copy.set(11, hashedEmail);
c.output(record_copy);
}
}));
PCollection<
TableRow
> tableRows = rows.apply(ParDo.of(new DoFn<
List
<String>, TableRow>() {
@ProcessElement
//Convert the rows to TableRows of BigQuery
public void processElement(ProcessContext c) {
TableRow tableRow = new TableRow();
List<
TableFieldSchema
> columnNames = schema.getFields();
List<
String
> rowValues = c.element();
for(int i =0; i< columnNames.size(); i++)
{
tableRow.put(columnNames.get(i).get("name").toString(), rowValues.get(i));
}
c.output(tableRow);
}
}));
//Write Table Rows to BigQuery
tableRows.apply(BigQueryIO.writeTableRows()
.withSchema(schema)
.to("nodal-time-161120:Oracle.CUSTOMER")
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Note: Before you run the pipeline, Go to BigQuery Console and create the table with same schema as your Oracle Table.