Kafka Streaming API: Advance

Narayan Kumar
7 min readOct 26, 2020

--

About kafka Streaming

Kafka DSL-Streaming

Event Stream — Continuous flow of events, unbounded dataset and immutable data records.

Streaming Operations — Stateless, State full and window based. Used for transform, aggregate, filter and enrich the stream.

Stream APIs

Stateful and Stateless APIs

filter() — Filters the KStream and returns the new KStream.

map() — Returns the new map with new structure. Re-partition based on the new key and value.

Kafka Streams publisher application or Kafka Stream consumer application should use SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String) to materialize the data when necessary.

Developer can provide SerDes by using either of these methods:

  • By setting default SerDes via a StreamsConfig instance. Default Serdes configured as below.
Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
  • By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults. eg.
to(IAppConfigs.loggingTopic, Produced.with(MessageSerdes.String(), MessageSerdes.LogRecord()))

Defining Custom Serdes

Lets project requires a custom class as an event and developer need to process the event in this case the custom class requires the custom serdes in order to publish or consume the message from topic.

CusomSerializer class — JsonSerializer.java

package com.demo.stream.serdes;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();

public JsonSerializer() {

}

@Override
public void configure(Map<String, ?> config, boolean isKey) {
//Nothing to Configure
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {

}
}

CusomDeserializer class — JsonDeserializer.java

package com.demo.stream.serdes;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class JsonDeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> className;
public static final String KEY_CLASS_NAME_CONFIG = "key.class.name";
public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";

public JsonDeserializer() {

}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
if (isKey)
className = (Class<T>) props.get(KEY_CLASS_NAME_CONFIG);
else
className = (Class<T>) props.get(VALUE_CLASS_NAME_CONFIG);
}


@Override
public T deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try {
return objectMapper.readValue(data, className);
} catch (Exception e) {
throw new SerializationException(e);
}
}

@Override
public void close() {
//nothing to close
}
}

Define the AppSerdes class for both above defined Serializer and Deserializer classes.

package com.demo.stream.serdes;

import com.demo.stream.types.PosInvoice;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

import java.util.HashMap;
import java.util.Map;
public class AppSerdes extends Serdes {
static final class PosInvoiceSerde extends WrapperSerde<PosInvoice> {
PosInvoiceSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>());
}
}

public static Serde<PosInvoice> PosInvoice() {
PosInvoiceSerde serde = new PosInvoiceSerde();

Map<String, Object> serdeConfigs = new HashMap<>();
serdeConfigs.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, PosInvoice.class);
serde.configure(serdeConfigs, false);

return serde;
}
}

Use AppSerdes class to publish or consume the message from/to topic.

KStream<String, PosInvoice> KS0 = builder.stream(IAppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));
-------------------------------------------------------------------
KS0.filter((k, v) ->
v.getCustomerType().equalsIgnoreCase(IAppConfigs.CUSTOMER_TYPE_PRIME))
.mapValues(invoice -> RecordBuilder.getNotification(invoice))
.to(IAppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

Kafka Streaming

KStream

  • This can be defined as an abstraction of stream of Kafka message record
  • Some of the KStream returns the new KStream — filter(), map(), flatmap()
  • Some KStream return void — foreach(), to()

Use Case:

PosInvoice is generated at multiple source in the business. Kafka streaming API receives the event for a defined Json format and gets this process as shown below image.

Kafka Stream APIs

All KStream are independent of each other. One Stream does not impact or change to other stream.

Flow-1: KS0- This filters returns the KS1 after filtering the stream of HOME DELIVERY. KS1 uses the to() sink processor to send the message to SHIPMENT department.

Flow-2: KS0- This filters returns the KS3 after filtering the stream of PRIME CUSTOMER. KS3 uses the mapValues() to enrich or change the event structure and finally uses the to() sink processor to send the message to LOYALITY department.

Flow-3: KS0- This uses the mapValues() returns the KS6 after filtering the stream of missing department. KS6 uses the flatMapValues() to enrich or change the event structure and finally uses the to() sink processor to send the message to HADOOP SINK department.

JSON Structure:

invoice.json

{
"type": "object",
"javaType": "com.demo.stream.types.PosInvoice",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CreatedTime": {
"javaType": "java.lang.Long",
"type": "object"
},
"StoreID": {
"type": "string"
},
"PosID": {
"type": "string"
},
"CashierID": {
"type": "string"
},
"CustomerType": {
"type": "string"
},
"CustomerCardNo": {
"type": "string"
},
"TotalAmount": {
"type": "number"
},
"NumberOfItems": {
"type": "integer"
},
"PaymentMethod": {
"type": "string"
},
"TaxableAmount": {
"type": "number"
},
"CGST": {
"type": "number"
},
"SGST": {
"type": "number"
},
"CESS": {
"type": "number"
},
"DeliveryType": {
"type": "string"
},
"DeliveryAddress": {
"type": "object",
"javaType": "com.demo.stream.types.DeliveryAddress",
"properties": {
"AddressLine": {
"type": "string"
},
"City": {
"type": "string"
},
"State": {
"type": "string"
},
"PinCode": {
"type": "string"
},
"ContactNumber": {
"type": "string"
}
}
},
"InvoiceLineItems": {
"type": "array",
"items": {
"type": "object",
"javaType": "com.demo.stream.types.LineItem",
"properties": {
"ItemCode": {
"type": "string"
},
"ItemDescription": {
"type": "string"
},
"ItemPrice": {
"type": "number"
},
"ItemQty": {
"type": "integer"
},
"TotalValue": {
"type": "number"
}
}
}
}
}
}

Notification.json

{
"type": "object",
"javaType": "com.demo.stream.types.Notification",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CustomerCardNo": {
"type": "string"
},
"TotalAmount": {
"type": "number"
},
"EarnedLoyaltyPoints": {
"type": "number"
}
}
}

Logging in Hadoop

{
"type": "object",
"javaType": "com.demo.stream.types.HadoopRecord",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CreatedTime": {
"javaType": "java.lang.Long",
"type": "object"
},
"StoreID": {
"type": "string"
},
"PosID": {
"type": "string"
},
"CustomerType": {
"type": "string"
},
"PaymentMethod": {
"type": "string"
},
"DeliveryType": {
"type": "string"
},
"City": {
"type": "string"
},
"State": {
"type": "string"
},
"PinCode": {
"type": "string"
},
"ItemCode": {
"type": "string"
},
"ItemDescription": {
"type": "string"
},
"ItemPrice": {
"type": "number"
},
"ItemQty": {
"type": "integer"
},
"TotalValue": {
"type": "number"
}
}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>KafkaStreamingApplication</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
<kafka.version>2.6.0</kafka.version>
<jersey.version>2.27</jersey.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- Apache Kafka Clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- Apache Kafka Streams-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Apache Log4J2 binding for SLF4J -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.11.4</version>
</dependency>
<!--spark java framework for embedded -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.5</version>
</dependency>
<!-- javax.ws.rs -->
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1</version>
</dependency>
<!--Jeersey dependencies -->
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<version>${jersey.version}</version>
</dependency>

</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Json Schema to POJO plugin-->
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.5.1</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includeAdditionalProperties>false</includeAdditionalProperties>
<includeHashcodeAndEquals>false</includeHashcodeAndEquals>
<outputEncoding>${project.build.sourceEncoding}</outputEncoding>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Compile the json to generate the java class from Json.

Json to Java classes

Define the below class as shown in SerDes explanation above.

  • JsonSerializer
  • JsonSerializer
  • AppSerdes

Create the SampleStreamProducer class

package com.demo.stream.producer; 
import com.demo.stream.serdes.JsonSerializer;
import com.demo.stream.types.DeliveryAddress;
import com.demo.stream.types.PosInvoice;
import com.demo.stream.util.IAppConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class SampleStreamProducer {
private static Logger logger= LogManager.getLogger(SampleStreamProducer.class);
public static void main(String[] args) {
KafkaProducer<String, PosInvoice> kafkaProducer=new KafkaProducer<String,PosInvoice>(getKafkaProducerConfig());
for(int order=10041;order<=10050;order++) {
PosInvoice orderInvoice = getOrderInvoice(order);
ProducerRecord<String, PosInvoice> orderRecord = new ProducerRecord<>(IAppConfigs.posTopicName,
orderInvoice.getInvoiceNumber(), orderInvoice);
kafkaProducer.send(orderRecord);
}
logger.info("Event published...");
kafkaProducer.flush();
kafkaProducer.close();
logger.info("Producer closed...");


}
private static Properties getKafkaProducerConfig(){
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "fanout-app1");
return props;
}

private static PosInvoice getOrderInvoice(int order){
PosInvoice orderInvoice=new PosInvoice();
orderInvoice.setInvoiceNumber(Integer.toString(order));
if(order%2==0)
orderInvoice.setDeliveryType("Home");
orderInvoice.setCustomerType("Premium");
DeliveryAddress address=new DeliveryAddress();
address.setAddressLine("Dubai");
orderInvoice.setDeliveryAddress(address);
return orderInvoice;
}

}

Define the POSProcessor class

package com.demo.stream.consumer;

import com.demo.stream.serdes.AppSerdes;
import com.demo.stream.types.PosInvoice;
import com.demo.stream.util.IAppConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class POSProcessor{
private static final Logger logger = LogManager.getLogger();

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, IAppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.bootstrapServers);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, PosInvoice> KS0 = builder.stream(IAppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));

KS0.foreach((k,v)->{
System.out.println(v.getCustomerType());
});

KS0.filter((k, v) ->
(v.getDeliveryType()!=null) && v.getDeliveryType().equalsIgnoreCase("Home"))
.print(Printed.toSysOut());

KS0.filter((k, v) ->
(v.getDeliveryType()!=null) && v.getDeliveryType().equalsIgnoreCase(IAppConfigs.DELIVERY_TYPE_HOME_DELIVERY))
.to("order.delivered", Produced.with(AppSerdes.String(), AppSerdes.PosInvoice()));

KS0.filter((k, v) ->
v.getCustomerType().equalsIgnoreCase(IAppConfigs.CUSTOMER_TYPE_PRIME))
.mapValues(invoice -> RecordBuilder.getNotification(invoice))
.to(IAppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

KS0.mapValues(invoice -> RecordBuilder.getMaskedInvoice(invoice))
.flatMapValues(invoice -> RecordBuilder.getHadoopRecords(invoice))
.to(IAppConfigs.hadoopTopic, Produced.with(AppSerdes.String(), AppSerdes.HadoopRecord()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping Stream");
streams.close();
}));

}
}

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.