Import data from any REST API to Kafka incrementally using JDBC

Introduction

Importing data from REST APIs into Kafka topics generally involves writing a custom Kafka producer to read the data from the REST API and writing it in to topics. If you are dealing with multiple REST endpoints, responses, and authentications this can get complex quickly. In this article, we’ll look at how you can accelerate your data integration from REST APIs using Kafka.

Kafka offers several different types of connectors out of the box - including the very popular JDBC connector. By themselves, we know that JDBC connectors can't connect to REST APIs, but with Progress DataDirect Autonomous REST Connector, you can connect to and query any REST API using SQL, without writing single line of code.

In this tutorial, we will show you how you can use Autonomous REST Connector to import data from a stock market research API called Alpha Vantage. The data from the API is in a time series format and gets updated every 60 seconds. We will use Autonomous REST Connector to import the data from this API every 60 seconds for Progress (PRGS) stock into Kafka topics. Feel free, however, to use any stock symbol you’d prefer.

Prerequisites

  1. Download the Confluent community edition of Kafka.
  2. Extract the package to your desired location using the below command.
    tar -xvf confluent-community-5.1.1-2.11.tar.gz
  3. Add the location /<Path-To>/Confluent-5.1.1/bin to environment variable PATH.

Download and Install Autonomous REST Connector

  1. Download Autonomous REST Connector for JDBC from our website.
  2. Install the connector by running the setup executable file on your machine and following the instructions on the installer.

  3. After you have finished installation, you should find the Autonomous REST Connector at the below default path, unless you have chosen to install it at a different location.
    /home/<user>/Progress/DataDirect/JDBC_60/lib/autorest.jar
  4. Copy the autorest.jar from above location to: 
    /<path-to>/confluent-5.1.1/share/java/kafka-connect-jdbc/autorest.jar

Configure Autonomous REST Connector

  1. To connect to the Alpha Vantage API using Autonomous REST Connector, point it at the endpoint using the default values.
  2. For the purpose of the tutorial, we will connect to Stock Time Series endpoints offered by Alpha Vantage. To get all the time series intraday values for a stock with an interval of 1 minute, use the following request:
    https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=PRGS&interval=1min&apikey=<apikey>
  3. To configure the driver to connect to the above endpoint, use the following JDBC URL:
    jdbc:datadirect:autorest:sample=https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=PRGS&interval=1min&apikey=<apikey>


  4. When you connect to a REST API using Autonomous REST Connector, it will automatically sample the API and create a configuration, which you can access by querying the _CONFIGURATION table. You can get this configuration by using the JDBC Autonomous REST Connector in any SQL querying tool like Dbeaver, Squirrel SQL etc. 

  5. For the tutorial, we are providing the configuration below.  You can save this file as alphavantage.rest.
    {
        "query":{
            "#path":[
                "https://www.alphavantage.co/query"
            ],
            "Meta Data":{
                "1. Information":"VarChar(84)",
                "2. Symbol":"VarChar(64),#key",
                "3. Last Refreshed":"Timestamp(0)",
                "4. Interval":"VarChar(64)",
                "5. Output Size":"VarChar(64)",
                "6. Time Zone":"VarChar(64)"
            },
            "Time Series (1min){Timestamp(0)}":{
                "1. open":"Double",
                "2. high":"Double",
                "3. low":"Double",
                "4. close":"Double",
                "5. volume":"Integer"
            },
            "function":{
                "#type":"VarChar(64),#key",
                "#virtual":true,
                "#default":"TIME_SERIES_INTRADAY",
                "#eq":"function"
            },
            "symbol":{
                "#type":"VarChar(64),#key",
                "#virtual":true,
                "#default":"MSFT",
                "#eq":"symbol"
            },
            "interval":{
                "#type":"VarChar(64),#key",
                "#virtual":true,
                "#default":"1min",
                "#eq":"interval"
            },
            "apikey":{
                "#type":"VarChar(64),#key",
                "#virtual":true,
                "#default":"<api-key>",
                "#eq":"apikey"
            }
        }
    }
  6. Looking at this configuration, you'll notice that Autonomous REST Connector has detected all the objects and their data types.
    Note: Please use your api key in the #default property under apikey json object.

  7. You can run the following queries in the SQL editor:
    SELECT * FROM TIMESERIES1MIN
    SELECT * FROM TIMESERIES1MIN WHERE QUERY_SYMBOL='AAPL'

     

  8. To learn more about Autonomous REST Connector and how you can configure it for your needs - and how to connect to multiple endpoints - we recommend you go through these tutorials after you have finished this one.

     

    1. Getting Started
    2. Connecting to Multiple Endpoints
    3. Editing Auto generated Schema

       

  9. Copy the alphavantage.rest file to:
    /<path-to>/Confluent-5.1.1/alphavantage.rest

Create Kafka JDBC Source configuration

  1. Create a new file called source_autorest.json in this location: 
    /<path-to>/Confluent-5.1.1/source_autorest.json

  2. This file will have the configuration on how Kafka connects and queries the REST API via DataDirect Autonomous REST Connector: 
    {
            "name": "source_autorest",
            "config": {
     
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
     
                    "connection.url": "jdbc:datadirect:autorest:config=/home/progress/confluent-5.1.1/alphavantage.rest",
     
                    "query": "SELECT * FROM TIMESERIES1MIN",
     
                    "mode": "timestamp",
     
                    "timestamp.column.name": "KEY",
     
                    "topic.prefix": "PRGS1MIN"
            }
    }
  3. We set the mode to timestamp and timestamp.column.name to KEY. Kafka uses this column to keep track of the data coming in from the REST API. By default, the poll interval is set to 5 seconds, but you can set it to 1 second if you prefer using the poll.interval.ms configuration option.

  4. In the above configuration, change the config file path for alphavantage.rest to its location on your machine in the connection.url.

  5. Save the file.

Import the data into Kafka topic

  1. Start Kafka using the following command:
    confluent start

     

  2. Load the JDBC source configuration you have created in the previous step. You should see the response as shown below after you have run the command: 
    confluent load source_autorest -d source_autorest.json
    load connector
  3. Check the status of the connector by running the following command. You should see a response as shown below if it is running properly.

    confluent status connectors


    List status connectors

  4. Next, check the list of topics. You should find a new topic named PRGS1MIN.

    kafka-topics --list --zookeeper localhost:2181
  5. You can check the data in the topic by running the following command: 
    kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic PRGS1MIN --from-beginning --property schema.registry.url=http://localhost:8081

     

    data in topic

  6. Kafka runs the JDBC source connectors and continues polling the REST API via DataDirect Autonomous REST Connector every 5 seconds to fetch the new data to the Kafka Topic.

 

With Progress DataDirect Autonomous REST Connector, you don’t need to worry about interfacing with different REST endpoints. All you have to do is point the endpoint to Autonomous REST Connector and it provides you the interfacing using JDBC. You can now use this connectivity not only with Kafka but with any application that has ODBC/JDBC support. Feel free to contact us if you have any questions about the Progress DataDirect Autonomous REST Connector.

JDBC TUTORIAL

Import data from any REST API to Kafka incrementally using JDBC

View all Tutorials

Connect any application to any data source anywhere

A product specialist will be glad to get in
touch with you