Kafka Streaming — Advance

Narayan Kumar
5 min readOct 30, 2020

--

Store, KTable, GlobalKTable,

If application requires aggregation, Kafka uses the store in order to aggregate the stream for further processing. The use case like word count, live trend of any event and live voting can be consider for the candidate for aggregation.

The Stream processing basically requires to consider for below point as a part of solution development.

  • Time sensitivity
  • Decoupling
  • Data format evaluation
  • Reliability/Fault Tolerant
  • Scalability
  • Distributive

Why Aggregation?

As stream is flow of event which possess below features, so it requires aggregation.

  • Unbounded
  • Continuous flow
  • Endless Flow

Use-case case discussion:

Pin validation:

  • Customer Check-in is an event stream
  • Customer check-in system generate the event
  • Model the event
  • Process and publish the event

Let’s start with a very popular example of word count:

Producer publishes the message on a topic consumer consumes the message and count the occurrence of words published by publisher.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>Transactions-Solution</artifactId>
<groupId>org.kafka</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo.store</groupId>
<artifactId>store-based-app</artifactId>
<properties>
<java.version>1.8</java.version>
<kafka.version>2.6.0</kafka.version>
<jersey.version>2.27</jersey.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Apache Kafka Streams-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Apache Log4J2 binding for SLF4J -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

Consumer class: WordCount.java

package com.demo;
import com.demo.connections.AppConfiguration;
import com.demo.connections.IAppConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
public class WordCount {
static Logger logger= LogManager.getLogger(WordCount.class);
public static void main(String[] args) {
StreamsBuilder streamsBuilder=new StreamsBuilder();
KStream<String,String> ks0=streamsBuilder.stream(IAppConfigs.ORDER_RECEIVED_TOPIC);
/*KStream<String, String> ks0=streamsBuilder.stream(IAppConfigs.ORDER_RECEIVED_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));*/ks0.foreach((k, v)->{
System.out.println("Key "+ k +" Value "+v);
});
KStream<String, String> ks1=ks0.flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")));
KGroupedStream<String, String> ks2=ks1.groupBy((k, v)->v);
final KTable<String, Long> kt0=ks2.count();

kt0.toStream().print(Printed.toSysOut());
Topology topology=streamsBuilder.build();
KafkaStreams kafkaStreams=new KafkaStreams(topology, getStreamConfig());
kafkaStreams.start();
//kt0.toStream().to("WORDCOUNT.TOPIC", Produced.with(Serdes.String(), Serdes.Long()));Runtime.getRuntime().addShutdownHook(new Thread(()->{
logger.info("Shutdown hook invoked... Application shutting down");
kafkaStreams.close();
}));
}
public static Properties getStreamConfig(){
Properties props=new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, IAppConfigs.APPLICATION_ID_CONFIG);
//props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
return props;
}
}

Explanation

flatMapValues() — is based on lambda function, which takes values of the stream as an argument and converts the values to lower case and split the words on space and creates the list and finally return the KStream<String, String> ks1.

flatMapValues(v->Arrays.asList(v.toLowerCase().split(" ")))

groupBy() — is based on key value lambda function which groups the elements of same values and is counting the list based on values and returns the KGroupedStream.

KGroupedStream<String, String> ks2=ks1.groupBy((k, v)->v)

GroupByKey()- Used for grouping the stream based on key. This function is not applicable as there is no key for message pushed for word count

KGroupedStream<String, String> ks2=ks1.groupByKey();

count() — counts the elements the in KGroupStream object and returns the KTable. This table can be send as stream to any sink.

final KTable<String, Long> kt0=ks2.count();

Run the example

Create the topic:

C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows\kafka-topics.bat --create  --bootstrap-server localhost:9093 --topic order.received --partitions 5 --replication-factor 3 --config segment.bytes=1000000

Start publisher

C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic log.received --property parse.key=true --property key.separator=":"

Start the consumer:WordCount

Key name Value narayan kumar
Key name Value manish kumar
Key name Value sumit kumer
Key name Value sumit kumar
[KTABLE-TOSTREAM-0000000009]: manish, 3
[KTABLE-TOSTREAM-0000000009]: kumar, 3
[KTABLE-TOSTREAM-0000000009]: kumer, 1
[KTABLE-TOSTREAM-0000000009]: sumit, 2
[KTABLE-TOSTREAM-0000000009]: narayan, 1

Example-2: Group count on age based.

package com.demo; 
import com.demo.connections.AppConfiguration;
import com.demo.connections.IAppConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Printed;
import java.util.Arrays;

public class AgeCountAggrgation {
public static void main(String ar[]){
StreamsBuilder sb=new StreamsBuilder();
KStream<String, String> source=sb.stream(IAppConfigs.ORDER_RECEIVED_TOPIC);
KStream<String, String> ks0=source.flatMapValues(v-> Arrays.asList(v.toLowerCase().split(" ")));
KGroupedStream<String, String> kGroupedStream=ks0.groupBy((k,v)->v);

KTable<String, Long> kt0=kGroupedStream.count();
kt0.toStream().print(Printed.<String,Long>toSysOut().withLabel("Age count"));

Topology tp=sb.build();
KafkaStreams kafkaStreams=new KafkaStreams(tp, getStreamConfig());
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(()->{
kafkaStreams.close();
}));

}
public static Properties getStreamConfig(){
Properties props=new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, IAppConfigs.APPLICATION_ID_CONFIG);
//props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
return props;
}
}

Run Example

C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows>C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic log.received --property parse.key=true --property key.separator=":"
>ram:30
>mohan:40
>Ram:30
>Ram:30
>Ram:30
>Ram:30
>mohan:40
>mohan:40
>ram:30
Output
[Age count]: 40, 1
[Age count]: 30, 2
[Age count]: 40, 3
[Age count]: 30, 6

State of Event

Banking organisations offers the cash back accumulation on a credit card expenses on each transactions. Once the customer redeem the cash back points in many ways and then the cash back accumulation recycled.

Accumulate the Cashback

Kafka accumulates the states of events in store. Below are the types if stores can be used.

  • In-memory
  • Local file Store
  • Remote Database

Developer need to take care of the the store in below sense.

  • Performance
  • Fault tolerance

The in-memory stores are fault tolerant but it is not persistence. So the store need to be creates which are trusted in both sense.

Kafka provides the implementation class for both which are best in performance and fault tolerance.

Store classes implementation in Kafka

To handle the fault tolerance, developer need to think to preserve the state of events. Because the KTable uses the local state for the Kafka instance, so in this case if the one of the Kafka instance goes down, you will lose the data persisted to the local instance because a new Kafka instance is formed on new machine. This machine will not have the existed states of objects which got down.

Fault Tolerance

One of the computer(Computer-3) running Task-03 is down and new computer-4 took place to run Task-03 with blank state. It can be managed by putting the state to new topic in order to keep as backup of the state.

KTable

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key.

A record with a null as value represents a “DELETE” or tombstone for the record’s key.

KTable Operations
  • Customer ID C002 with value null will delete the record from KTable.

GlobalKTable

In case of KTable every computer maintains the state of each event local to the computer. Sometimes solution requires the global dataset across all the tasks. which can be used by all the task. The GlobalKTable comes into the picture.

  • Used for small dataset

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.