With each passing day, the popularity of the flink is also increasing. Flink is used to process a massive amount of data in real time. In this blog, we will learn about the flink Kafka consumer and how to write a flink job in java/scala to read data from Kafka’s topic and save the data to a local file. So let’s get started
Flink Kafka consumer example
In this session, we will understand how to write a flink Kafka consumer job which will read the data from the Kafka topic/topics and write the result to a local file.
To work with Kafka on flink user needs to add the below dependencies in the pom.xml file
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
You can follow the below link to install flink, IntelliJ, and maven.
Setup flink development environment
Flink Project Template for scala
You can follow this link to install Kafka in your local system. Once the flink and Kafka are setups use the below Kafka commands to send data to the Kafka topic.
We will send some sample data to the Kafka topic flink-test.
- List topics
bin/kafka-topics.sh --list --bootstrap-server "localhost:9092"
- Create topic
bin/kafka-topics.sh --create --topic flink-test --bootstrap-server localhost:9092
- Describe a topic
bin/kafka-topics.sh --describe --topic flink-test --bootstrap-server localhost:9092
- Write data to Kafka topic
bin/kafka-console-producer.sh --topic flink-test --bootstrap-server localhost:9092
Till now we have set up the Kafka topic and flink dependencies to read data from the Kafka topic. In the next session, we will understand the flink Kafka consumer properties.
Flink kafka consumer properties
There are numerous Kafka consumer properties a user can set while reading data from a Kafka topic. Below are the required properties which a user needs to set up to fetch data from Kafka.
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092")
If you are using the old version of Kafka(version 0.8), then you need to set up the below properties as well
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test")
Flink Kafka consumer scala
Below is the scala code to read data from the Kafka topic and write the result into a local text file
package streaming import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.util.serialization.SimpleStringSchema import java.util.Properties object KafkaRead { def main(args: Array[String]) { val params: ParameterTool = ParameterTool.fromArgs(args) // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = params.get("input-topic") val bootStrapServers = params.get("bootStrapServers") val properties = new Properties() properties.setProperty("bootstrap.servers", bootStrapServers) val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties) env.addSource(consumer).writeAsText(params.get("output")) env.execute("Socket Window WordCount") } }
Type the below command to submit the flink job
flink run --d \ <path to jar file>/flink-scala-kafka-1.0-SNAPSHOT.jar \ --input-topic="flink-test" \ --bootStrapServers="localhost:9092" \ --output "/home/flink_out.txt"
Flink kafka consumer Java
Below is the Java code to read data from the Kafka topic and save the output to a local file
package com.cloudera.flink; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class kafkaRead { public static void main(String[] args) throws Exception { final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic = "flink-test"; String bootStrapServers = "localhost:9092"; Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", bootStrapServers); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( topic, new SimpleStringSchema(), kafkaProperties); env.addSource(consumer).writeAsText(params.get("output")); env.execute("flink-read"); } }
Type the below command to submit the flink job
flink run --d \ <path to jar file>/flink-java-kafka-1.0-SNAPSHOT.jar \ --output "/home/flink_out.txt"
Flink secured Kafka consumer
In the above examples, we have seen work when Kafka is running locally without any security. The above flink job will not work for secured Kafka.
For secured Kafka, the user needs to pass additional parameters like keytab file name, service principal, and jaas.config file etc while submitting the flink job.
Below is the scala code for secured Kafka
package streaming import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.util.serialization.SimpleStringSchema import java.util.Properties object KafkaRead { def main(args: Array[String]) { val params: ParameterTool = ParameterTool.fromArgs(args) // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = params.get("input-topic") val properties = new Properties() properties.setProperty("bootstrap.servers", "<bootStrapServers>") properties.setProperty("group.id", "Test") properties.setProperty("zookeeper.connect", "<zookeeperServer>") properties.setProperty("security.protocol", "SASL_SSL") properties.setProperty("sasl.kerberos.service.name", "kafka") val consumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties) env.addSource(consumer).writeAsText(params.get("output")) env.execute("Socket Window WordCount") } }
Submit the flink job and pass the below flink command line parameters(-yD)
flink run --d \ -yD security.kerberos.login.keytab=<location/name.keytab> \ -yD security.kerberos.login.principal=<principal> \ -yD env.java.opts="-Djava.security.auth.login.config="<location/kafka_jaas.conf>" \ /<location of jar>/flink-scala-kafka-1.0-SNAPSHOT.jar \ --output "/home/flink_out.txt"
Flink Kafka consumer from the beginning
Flink also gives the flexibility to set the start position for Kafka. There are various configurations that a user can set like
- setStartFromEarliest: start from the earliest record possible
- setStartFromLatest: start from the latest record
- setStartFromGroupOffsets: the default behavior
Below is the code snippet to set the start position of the message
val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties) myConsumer.setStartFromEarliest() myConsumer.setStartFromLatest() myConsumer.setStartFromGroupOffsets() val stream = env.addSource(myConsumer)
Conclusion
I hope you like this tutorial on flink Kafka consumers. In this blog, we have understood how to write a flink Kafka consumer to read data from Kafka using java and scala languages. We have also learned about the various flink Kafka properties and understood how to connect to secured Kafka using flink.
I hope you liked this blog. Please do let me know if you are facing any issues while following along.