Kafka Streaming API: Advance

Kafka DSL-Streaming

Event Stream — Continuous flow of events, unbounded dataset and immutable data records.

  • By setting default SerDes via a StreamsConfig instance. Default Serdes configured as below.
Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
  • By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults. eg.
to(IAppConfigs.loggingTopic, Produced.with(MessageSerdes.String(), MessageSerdes.LogRecord()))

Defining Custom Serdes

Lets project requires a custom class as an event and developer need to process the event in this case the custom class requires the custom serdes in order to publish or consume the message from topic.

package com.demo.stream.serdes;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();

public JsonSerializer() {

}

@Override
public void configure(Map<String, ?> config, boolean isKey) {
//Nothing to Configure
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {

}
}
package com.demo.stream.serdes;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class JsonDeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
private Class<T> className;
public static final String KEY_CLASS_NAME_CONFIG = "key.class.name";
public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";

public JsonDeserializer() {

}
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
if (isKey)
className = (Class<T>) props.get(KEY_CLASS_NAME_CONFIG);
else
className = (Class<T>) props.get(VALUE_CLASS_NAME_CONFIG);
}


@Override
public T deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
try {
return objectMapper.readValue(data, className);
} catch (Exception e) {
throw new SerializationException(e);
}
}

@Override
public void close() {
//nothing to close
}
}
package com.demo.stream.serdes;

import com.demo.stream.types.PosInvoice;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

import java.util.HashMap;
import java.util.Map;
public class AppSerdes extends Serdes {
static final class PosInvoiceSerde extends WrapperSerde<PosInvoice> {
PosInvoiceSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>());
}
}

public static Serde<PosInvoice> PosInvoice() {
PosInvoiceSerde serde = new PosInvoiceSerde();

Map<String, Object> serdeConfigs = new HashMap<>();
serdeConfigs.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, PosInvoice.class);
serde.configure(serdeConfigs, false);

return serde;
}
}
KStream<String, PosInvoice> KS0 = builder.stream(IAppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));
-------------------------------------------------------------------
KS0.filter((k, v) ->
v.getCustomerType().equalsIgnoreCase(IAppConfigs.CUSTOMER_TYPE_PRIME))
.mapValues(invoice -> RecordBuilder.getNotification(invoice))
.to(IAppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

Kafka Streaming

KStream

  • This can be defined as an abstraction of stream of Kafka message record
  • Some of the KStream returns the new KStream — filter(), map(), flatmap()
  • Some KStream return void — foreach(), to()
Kafka Stream APIs
{
"type": "object",
"javaType": "com.demo.stream.types.PosInvoice",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CreatedTime": {
"javaType": "java.lang.Long",
"type": "object"
},
"StoreID": {
"type": "string"
},
"PosID": {
"type": "string"
},
"CashierID": {
"type": "string"
},
"CustomerType": {
"type": "string"
},
"CustomerCardNo": {
"type": "string"
},
"TotalAmount": {
"type": "number"
},
"NumberOfItems": {
"type": "integer"
},
"PaymentMethod": {
"type": "string"
},
"TaxableAmount": {
"type": "number"
},
"CGST": {
"type": "number"
},
"SGST": {
"type": "number"
},
"CESS": {
"type": "number"
},
"DeliveryType": {
"type": "string"
},
"DeliveryAddress": {
"type": "object",
"javaType": "com.demo.stream.types.DeliveryAddress",
"properties": {
"AddressLine": {
"type": "string"
},
"City": {
"type": "string"
},
"State": {
"type": "string"
},
"PinCode": {
"type": "string"
},
"ContactNumber": {
"type": "string"
}
}
},
"InvoiceLineItems": {
"type": "array",
"items": {
"type": "object",
"javaType": "com.demo.stream.types.LineItem",
"properties": {
"ItemCode": {
"type": "string"
},
"ItemDescription": {
"type": "string"
},
"ItemPrice": {
"type": "number"
},
"ItemQty": {
"type": "integer"
},
"TotalValue": {
"type": "number"
}
}
}
}
}
}
{
"type": "object",
"javaType": "com.demo.stream.types.Notification",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CustomerCardNo": {
"type": "string"
},
"TotalAmount": {
"type": "number"
},
"EarnedLoyaltyPoints": {
"type": "number"
}
}
}
{
"type": "object",
"javaType": "com.demo.stream.types.HadoopRecord",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CreatedTime": {
"javaType": "java.lang.Long",
"type": "object"
},
"StoreID": {
"type": "string"
},
"PosID": {
"type": "string"
},
"CustomerType": {
"type": "string"
},
"PaymentMethod": {
"type": "string"
},
"DeliveryType": {
"type": "string"
},
"City": {
"type": "string"
},
"State": {
"type": "string"
},
"PinCode": {
"type": "string"
},
"ItemCode": {
"type": "string"
},
"ItemDescription": {
"type": "string"
},
"ItemPrice": {
"type": "number"
},
"ItemQty": {
"type": "integer"
},
"TotalValue": {
"type": "number"
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>KafkaStreamingApplication</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
<kafka.version>2.6.0</kafka.version>
<jersey.version>2.27</jersey.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<!-- Apache Kafka Clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- Apache Kafka Streams-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Apache Log4J2 binding for SLF4J -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.11.4</version>
</dependency>
<!--spark java framework for embedded -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.5</version>
</dependency>
<!-- javax.ws.rs -->
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1</version>
</dependency>
<!--Jeersey dependencies -->
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<version>${jersey.version}</version>
</dependency>

</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Json Schema to POJO plugin-->
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.5.1</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includeAdditionalProperties>false</includeAdditionalProperties>
<includeHashcodeAndEquals>false</includeHashcodeAndEquals>
<outputEncoding>${project.build.sourceEncoding}</outputEncoding>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Json to Java classes
  • JsonSerializer
  • JsonSerializer
  • AppSerdes
package com.demo.stream.producer; 
import com.demo.stream.serdes.JsonSerializer;
import com.demo.stream.types.DeliveryAddress;
import com.demo.stream.types.PosInvoice;
import com.demo.stream.util.IAppConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class SampleStreamProducer {
private static Logger logger= LogManager.getLogger(SampleStreamProducer.class);
public static void main(String[] args) {
KafkaProducer<String, PosInvoice> kafkaProducer=new KafkaProducer<String,PosInvoice>(getKafkaProducerConfig());
for(int order=10041;order<=10050;order++) {
PosInvoice orderInvoice = getOrderInvoice(order);
ProducerRecord<String, PosInvoice> orderRecord = new ProducerRecord<>(IAppConfigs.posTopicName,
orderInvoice.getInvoiceNumber(), orderInvoice);
kafkaProducer.send(orderRecord);
}
logger.info("Event published...");
kafkaProducer.flush();
kafkaProducer.close();
logger.info("Producer closed...");


}
private static Properties getKafkaProducerConfig(){
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "fanout-app1");
return props;
}

private static PosInvoice getOrderInvoice(int order){
PosInvoice orderInvoice=new PosInvoice();
orderInvoice.setInvoiceNumber(Integer.toString(order));
if(order%2==0)
orderInvoice.setDeliveryType("Home");
orderInvoice.setCustomerType("Premium");
DeliveryAddress address=new DeliveryAddress();
address.setAddressLine("Dubai");
orderInvoice.setDeliveryAddress(address);
return orderInvoice;
}

}
package com.demo.stream.consumer;

import com.demo.stream.serdes.AppSerdes;
import com.demo.stream.types.PosInvoice;
import com.demo.stream.util.IAppConfigs;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class POSProcessor{
private static final Logger logger = LogManager.getLogger();

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, IAppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.bootstrapServers);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, PosInvoice> KS0 = builder.stream(IAppConfigs.posTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.PosInvoice()));

KS0.foreach((k,v)->{
System.out.println(v.getCustomerType());
});

KS0.filter((k, v) ->
(v.getDeliveryType()!=null) && v.getDeliveryType().equalsIgnoreCase("Home"))
.print(Printed.toSysOut());

KS0.filter((k, v) ->
(v.getDeliveryType()!=null) && v.getDeliveryType().equalsIgnoreCase(IAppConfigs.DELIVERY_TYPE_HOME_DELIVERY))
.to("order.delivered", Produced.with(AppSerdes.String(), AppSerdes.PosInvoice()));

KS0.filter((k, v) ->
v.getCustomerType().equalsIgnoreCase(IAppConfigs.CUSTOMER_TYPE_PRIME))
.mapValues(invoice -> RecordBuilder.getNotification(invoice))
.to(IAppConfigs.notificationTopic, Produced.with(AppSerdes.String(), AppSerdes.Notification()));

KS0.mapValues(invoice -> RecordBuilder.getMaskedInvoice(invoice))
.flatMapValues(invoice -> RecordBuilder.getHadoopRecords(invoice))
.to(IAppConfigs.hadoopTopic, Produced.with(AppSerdes.String(), AppSerdes.HadoopRecord()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Stopping Stream");
streams.close();
}));

}
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Narayan Kumar

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.