Create Data Pipelines to move your data using Apache Flink and JDBC

Updated: 26 Feb 2021


Apache Flink is a framework and distributed processing engine for stateful computations over batch and streaming data. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale. One of the use cases for Apache Flink is data pipeline applications where data is transformed, enriched, and moved from one storage system to another. Flink provides many connectors to various systems such as JDBC, Kafka, Elasticsearch, and Kinesis.

One of the common sources or destinations is a storage system with a JDBC interface like SQL Server, Oracle, Salesforce, Hive, Eloqua or Google Big Query. With Progress DataDirect JDBC Connectors you can connect to various Relational, Big Data and SaaS sources such as those listed above. In this tutorial we will walk you through how you can read data from the SQL Server, transform the data and write it to your Hive destination data source.

Install DataDirect SQL Server JDBC Connector

  1. Download DataDirect SQL Server JDBC driver. We will use SQL Server as our source.
  2. Install SQL Server JDBC driver by running the following command
  4. Follow through the prompts of the installer and install the driver in a default path or custom path.

Install DataDirect Hive JDBC Connector

  1. Download DataDirect Hive JDBC driver. We will use Hive as our destination system.
  2. Install SQL Server JDBC driver by running the following command
  4. Follow through the prompts of the installer and install the driver in a default path or custom path.
  1. If this is your first time working with Flink, follow this guide to set up your project.
  2. To begin, initialize the Flink execution environment by adding the below code to your main method
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  3. Add Progress DataDirect SQL Server JDBC driver to your classpath. You can find sqlserver.jar file in the location you installed the driver in the previous section.

  4. Next, configure JDBCInputFormat and provide your connection URL to SQL Server, username, password and query and provide RowTypeInfo which has the information on data types of the column you expect from the database.
    TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
        BasicTypeInfo.INT_TYPE_INFO };
    RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
    JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
            .setQuery("SELECT [AlbumId], [Title] ,[ArtistId] FROM [Chinook].[dbo].[Album]")

  5. In the above code, we are reading data from a table called Album. What we would like to do is read another table Artist, join them and write that data to Hive.
  6. To read another table Artist, we have added the below code to the program –
    TypeInformation<?>[] fieldTypes2 = new TypeInformation<?>[] {
    RowTypeInfo rowTypeInfo2 = new RowTypeInfo(fieldTypes2);
    JDBCInputFormat jdbcInputFormat2 = JDBCInputFormat.buildJDBCInputFormat()
            .setQuery("SELECT [ArtistId], [Name]  FROM [Chinook].[dbo].[Artist]")

  7. Now, you need to create datasets for the above two tables, you can do this by adding below code -

    DataSet<Row> albumData = env.createInput(jdbcInputFormat);
    DataSet<Row> artistData = env.createInput(jdbcInputFormat2);
  8. We want to join the data in Albums with Artists – and in this case it can be done using ArtistId which is a reference of Artists in Albums table. To join these two datasets – add the below code –
    DataSet<Tuple2<Row, Row>> joinedData = albumData.join(artistData).where("f2").equalTo("f0");

  9. After you join, you would need to flat map your dataset from Tuple2<Row, Row> to Row object. To do that, you would have to create a Flatmap function by adding the below code to your main class

        public void flatMap(Tuple2<Row, Row> value, Collector<Row> out) {
            Row row = new Row(3);
            row.setField(0, value.f0.getField(0));
            row.setField(1, value.f0.getField(1));
            row.setField(2, value.f1.getField(1));


  10. And add the below code in your main method after the code in Step 7.
    DataSet<Row> rowDataSet = joinedData.flatMap(new RowFlat());
  11. Now, you are ready to write this data to Hive. Before you go ahead, add the Progress DataDirect Hive JDBC driver to your classpath. Add the below code to your main method to write the data you have read and modified from SQL Server to Hive.
    JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
            .setQuery("INSERT INTO albums (albumid, name, artistname) VALUES (?, ?, ?)")


  12. You can access the full project in GitHub for your reference.

With Progress DataDirect JDBC Connectors for SQL Server, DB2, Oracle, Salesforce, Hive, Google Big Query etc., you are free to create any data pipeline to any source or sink of your choice without having to worry about connectivity because we do the heavy lifting for you. Feel free to try any of our JDBC Connectors and let us know if you have any questions.

Connect any application to any data source anywhere

Explore all DataDirect Connectors

Need additional help with your product?

Get Customer Support