flink kafka consumer

Introduction To Flink Kafka Consumer in 2022 | Complete Tutorial

With each passing day, the popularity of the flink is also increasing. Flink is used to process a massive amount of data in real time. In this blog, we will learn about the flink Kafka consumer and how to write a flink job in java/scala to read data from Kafka’s topic and save the data to a local file. So let’s get started

Flink Kafka consumer example

In this session, we will understand how to write a flink Kafka consumer job which will read the data from the Kafka topic/topics and write the result to a local file.

To work with Kafka on flink user needs to add the below dependencies in the pom.xml file

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

You can follow the below link to install flink, IntelliJ, and maven.

Setup flink development environment

Flink Project Template for scala

You can follow this link to install Kafka in your local system. Once the flink and Kafka are setups use the below Kafka commands to send data to the Kafka topic.

We will send some sample data to the Kafka topic flink-test.

  • List topics
bin/kafka-topics.sh --list --bootstrap-server "localhost:9092"
  • Create topic
bin/kafka-topics.sh --create --topic flink-test --bootstrap-server localhost:9092
  • Describe a topic
bin/kafka-topics.sh --describe --topic flink-test --bootstrap-server localhost:9092
  • Write data to Kafka topic
bin/kafka-console-producer.sh --topic flink-test --bootstrap-server localhost:9092

Till now we have set up the Kafka topic and flink dependencies to read data from the Kafka topic. In the next session, we will understand the flink Kafka consumer properties.

Flink kafka consumer properties

There are numerous Kafka consumer properties a user can set while reading data from a Kafka topic. Below are the required properties which a user needs to set up to fetch data from Kafka.

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")

If you are using the old version of Kafka(version 0.8), then you need to set up the below properties as well

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

Flink Kafka consumer scala

Below is the scala code to read data from the Kafka topic and write the result into a local text file

package streaming

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties


object KafkaRead {
  def main(args: Array[String]) {

    val params: ParameterTool = ParameterTool.fromArgs(args)

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val topic = params.get("input-topic")
    val bootStrapServers = params.get("bootStrapServers")

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", bootStrapServers)

    val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)

    env.addSource(consumer).writeAsText(params.get("output"))

    env.execute("Socket Window WordCount")
  }
}

Type the below command to submit the flink job

flink run --d \
<path to jar file>/flink-scala-kafka-1.0-SNAPSHOT.jar \
--input-topic="flink-test" \
--bootStrapServers="localhost:9092" \
--output "/home/flink_out.txt"

Flink kafka consumer Java

Below is the Java code to read data from the Kafka topic and save the output to a local file

package com.cloudera.flink;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class kafkaRead {

    public static void main(String[] args) throws Exception {

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String topic = "flink-test";
        String bootStrapServers = "localhost:9092";

        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", bootStrapServers);

        FlinkKafkaConsumer&lt;String> consumer = new FlinkKafkaConsumer&lt;>(
                topic, new SimpleStringSchema(),
                kafkaProperties);

        env.addSource(consumer).writeAsText(params.get("output"));

        env.execute("flink-read");
    }
}

Type the below command to submit the flink job

flink run --d \
&lt;path to jar file>/flink-java-kafka-1.0-SNAPSHOT.jar \
--output "/home/flink_out.txt"

Flink secured Kafka consumer

In the above examples, we have seen work when Kafka is running locally without any security. The above flink job will not work for secured Kafka.

For secured Kafka, the user needs to pass additional parameters like keytab file name, service principal, and jaas.config file etc while submitting the flink job.

Below is the scala code for secured Kafka

package streaming

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties

object KafkaRead {
  def main(args: Array[String]) {

    val params: ParameterTool = ParameterTool.fromArgs(args)

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val topic = params.get("input-topic")

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "&lt;bootStrapServers>")
    properties.setProperty("group.id", "Test")
    properties.setProperty("zookeeper.connect", "&lt;zookeeperServer>")
    properties.setProperty("security.protocol", "SASL_SSL")
    properties.setProperty("sasl.kerberos.service.name", "kafka")

    val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)

    env.addSource(consumer).writeAsText(params.get("output"))

    env.execute("Socket Window WordCount")
  }
}

Submit the flink job and pass the below flink command line parameters(-yD)

flink run --d \
-yD security.kerberos.login.keytab=&lt;location/name.keytab> \
-yD security.kerberos.login.principal=&lt;principal> \
-yD env.java.opts="-Djava.security.auth.login.config="&lt;location/kafka_jaas.conf>" \
/&lt;location of jar>/flink-scala-kafka-1.0-SNAPSHOT.jar \
--output "/home/flink_out.txt"

Flink Kafka consumer from the beginning

Flink also gives the flexibility to set the start position for Kafka. There are various configurations that a user can set like

  • setStartFromEarliest: start from the earliest record possible
  • setStartFromLatest: start from the latest record
  • setStartFromGroupOffsets: the default behavior

Below is the code snippet to set the start position of the message

val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
myConsumer.setStartFromEarliest()   
myConsumer.setStartFromLatest()
myConsumer.setStartFromGroupOffsets() 

val stream = env.addSource(myConsumer)

Conclusion

I hope you like this tutorial on flink Kafka consumers. In this blog, we have understood how to write a flink Kafka consumer to read data from Kafka using java and scala languages. We have also learned about the various flink Kafka properties and understood how to connect to secured Kafka using flink.

I hope you liked this blog. Please do let me know if you are facing any issues while following along.

More to explore on flink

Flink wordcount using scala

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top