flink-socket-wordcount

Flink socket wordcount | Complete tutorial in [2022]

With each passing day popularity of flink is growing exponentially as flink helps us to process Tera-bytes of data in real time. This session will teach how to write a flink socket streaming wordcount job. So let’s get started.

Flink socket wordcount scala

We can use flink to connect to the server socket and reads read data from it. The flink job will consume the data from the socket in real time. For this tutorial, we will be using the Netcat tool to read the socket data.

You can follow the below tutorial to understand how to set up the flink development environment

Flink scala wordcount

Also please make sure that the netcat utility is installed in your system. In macOS, type the below command to install the Netcat.

brew install netcat

Let’s proceed further and understand how to write a flink socket word count application in scala. Open the IntelliJ ID and click on the new class.

Provide the class name as socket-wordCount and select the object and click on the ok button.

Paste the below code in the socket-wordCount File

package streaming

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object socket-wordCount {
  def main(args: Array[String]) {


    // the host and the port to connect to
    var hostname: String = "localhost"
    var port: Int = 0


    try {
      val params = ParameterTool.fromArgs(args)
      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
      port = params.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
          "--hostname <hostname> --port <port>', where hostname (localhost by default) and port " +
          "is the address of the text server")
        System.err.println(
          "To start a simple text server, run 'netcat -l <port>' " +
            "and type the input text into the command line")
        return
      }
    }

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

    // parse the data, group it, window it, and aggregate the counts
    val windowCounts = text
      .flatMap(w => w.split("\\s"))
      .map(w => WordWithCount(w, 1))
      .keyBy(_.word)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print()

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}

Modify the class name in the pom.xml file

<mainClass>streaming.socket-wordCount</mainClass>

Flink Socket wordcount jar

Now it’s time to compile and package your code into a jar file. Type the below command to package and compile the socket-wordcount example.

Goto the socket-wordcount project path

cd ~/flink/socket-wordCount

Now package the application using maven

mvn clean package

A jar file will be generated under the path target path with the name

socket-wordCount-1.0-SNAPSHOT.jar

Target directory structure

➜  target git:(main) ✗ ll     
total 72
drwxr-xr-x  4 XXXX  XXXX   128B Jul 16 22:44 classes
-rw-r--r--  1 XXXX  XXXX     1B Jul 16 22:44 classes.697043722.timestamp
-rw-r--r--  1 XXXX  XXXX    14K Jul 16 22:44 socket-wordCount-1.0-SNAPSHOT.jar
drwxr-xr-x  3 XXXX  XXXX    96B Jul 16 22:44 maven-archiver
drwxr-xr-x  3 XXXX  XXXX    96B Jul 16 22:44 maven-status
-rw-r--r--  1 XXXX  XXXX    14K Jul 16 22:44 original-socket-wordCount-1.0-SNAPSHOT.jar

Flink Socket wordcount submit job

Finally, it’s time to submit the flink job. Below submitting the flink job open a terminal and start the Netcat utility by typing the below command

nc -l 9999

Now submit the flink job by typing the below commands

./bin/flink run --detached ~/socket-wordCount-1.0-SNAPSHOT.jar --hostname 127.0.0.1 --port 9999

Start sending some messages from the netcat utility

Flink socket wordcount

Goto the flink task manager the verify the output under the  std_out session.

More information about the socket wordcount streaming job can be found here

Conclusion

I hope you like this tutorial on how to write a streaming socket word count in flink using scala language. In this blog, we have learned how to write, generate a jar and submit a socket wordcount flink job.

Please do let me know if you are facing any issues while following along.

More to Explore

How to write a flink Kafka consumer

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top