With each passing day popularity of flink is growing exponentially as flink helps us to process Tera-bytes of data in real time. This session will teach how to write a flink socket streaming wordcount job. So let’s get started.
Flink socket wordcount scala
We can use flink to connect to the server socket and reads read data from it. The flink job will consume the data from the socket in real time. For this tutorial, we will be using the Netcat tool to read the socket data.
You can follow the below tutorial to understand how to set up the flink development environment
Also please make sure that the netcat utility is installed in your system. In macOS, type the below command to install the Netcat.
brew install netcat
Let’s proceed further and understand how to write a flink socket word count application in scala. Open the IntelliJ ID and click on the new class.
Provide the class name as socket-wordCount and select the object and click on the ok button.
Paste the below code in the socket-wordCount File
package streaming import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object socket-wordCount { def main(args: Array[String]) { // the host and the port to connect to var hostname: String = "localhost" var port: Int = 0 try { val params = ParameterTool.fromArgs(args) hostname = if (params.has("hostname")) params.get("hostname") else "localhost" port = params.getInt("port") } catch { case e: Exception => { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) and port " + "is the address of the text server") System.err.println( "To start a simple text server, run 'netcat -l <port>' " + "and type the input text into the command line") return } } // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream(hostname, port, '\n') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap(w => w.split("\\s")) .map(w => WordWithCount(w, 1)) .keyBy(_.word) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("count") // print the results with a single thread, rather than in parallel windowCounts.print() env.execute("Socket Window WordCount") } /** Data type for words with count */ case class WordWithCount(word: String, count: Long) }
Modify the class name in the pom.xml file
<mainClass>streaming.socket-wordCount</mainClass>
Flink Socket wordcount jar
Now it’s time to compile and package your code into a jar file. Type the below command to package and compile the socket-wordcount example.
Goto the socket-wordcount project path
cd ~/flink/socket-wordCount
Now package the application using maven
mvn clean package
A jar file will be generated under the path target path with the name
socket-wordCount-1.0-SNAPSHOT.jar
Target directory structure
➜ target git:(main) ✗ ll total 72 drwxr-xr-x 4 XXXX XXXX 128B Jul 16 22:44 classes -rw-r--r-- 1 XXXX XXXX 1B Jul 16 22:44 classes.697043722.timestamp -rw-r--r-- 1 XXXX XXXX 14K Jul 16 22:44 socket-wordCount-1.0-SNAPSHOT.jar drwxr-xr-x 3 XXXX XXXX 96B Jul 16 22:44 maven-archiver drwxr-xr-x 3 XXXX XXXX 96B Jul 16 22:44 maven-status -rw-r--r-- 1 XXXX XXXX 14K Jul 16 22:44 original-socket-wordCount-1.0-SNAPSHOT.jar
Flink Socket wordcount submit job
Finally, it’s time to submit the flink job. Below submitting the flink job open a terminal and start the Netcat utility by typing the below command
nc -l 9999
Now submit the flink job by typing the below commands
./bin/flink run --detached ~/socket-wordCount-1.0-SNAPSHOT.jar --hostname 127.0.0.1 --port 9999
Start sending some messages from the netcat utility
Goto the flink task manager the verify the output under the std_out session.
More information about the socket wordcount streaming job can be found here
Conclusion
I hope you like this tutorial on how to write a streaming socket word count in flink using scala language. In this blog, we have learned how to write, generate a jar and submit a socket wordcount flink job.
Please do let me know if you are facing any issues while following along.