Kafka — Custom Serializer and Deserializer

Narayan Kumar
5 min readOct 26, 2020

Kafka sends the events though network which requires the message to be serialize before sending over the network. Publisher API provides the serializer like IntegerSerializer, StringSerializer etc, same sense of deserializer.

Serializer is used by publisher while Deserializer by consumer.

If the default serializer may not full-fill the need specially in case of any custom object required to pass through network. For example employee, customer objects etc. In this case we can build the customer serailizer and configure as serializer.

Example:

Lets consider a json based message need to send to Kafka topic, then follow the below steps.

  • Create Json
  • Generate java class using the Maven plugin
  • Create the Custom serializer and deserializer
  • Configure these in producer and consumer side respectively
  • Produces can create the java object and send it.

Create Json

{
"type": "object",
"javaType": "com.demo.message.types.OrderInvoice",
"properties": {
"InvoiceNumber": {
"type": "string"
},
"CreatedTime": {
"javaType": "java.lang.Long",
"type": "object"
},
"StoreID": {
"type": "string"
},
"PosID": {
"type": "string"
},
"CashierID": {
"type": "string"
},
"CustomerType": {
"type": "string"
},
"CustomerCardNo": {
"type": "string"
},
"TotalAmount": {
"type": "number"
},
"NumberOfItems": {
"type": "integer"
},
"PaymentMethod": {
"type": "string"
},
"TaxableAmount": {
"type": "number"
},
"CGST": {
"type": "number"
},
"SGST": {
"type": "number"
},
"CESS": {
"type": "number"
},
"DeliveryType": {
"type": "string"
},
"DeliveryAddress": {
"type": "object",
"javaType": "com.demo.message.types.DeliveryAddress",
"properties": {
"AddressLine": {
"type": "string"
},
"City": {
"type": "string"
},
"State": {
"type": "string"
},
"PinCode": {
"type": "string"
},
"ContactNumber": {
"type": "string"
}
}
},
"InvoiceLineItems": {
"type": "array",
"items": {
"type": "object",
"javaType": "com.demo.message.types.Items",
"properties": {
"ItemCode": {
"type": "string"
},
"ItemDescription": {
"type": "string"
},
"ItemPrice": {
"type": "number"
},
"ItemQty": {
"type": "integer"
},
"TotalValue": {
"type": "number"
}
}
}
}
}
}

Add the dependencies and maven plugin in pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>Transactions-Solution</artifactId>
<groupId>org.kafka</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>org.online.order</groupId>
<artifactId>online-order</artifactId>
<properties>
<java.version>1.8</java.version>
<kafka.version>2.6.0</kafka.version>
<jersey.version>2.27</jersey.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>

<!-- Apache Kafka Streams-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Apache Log4J2 binding for SLF4J -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jsonschema2pojo/jsonschema2pojo-maven-plugin -->
<dependency>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>0.5.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.11.4</version>
</dependency>
<!--spark java framework for embedded -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.5</version>
</dependency>
<!-- javax.ws.rs -->
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1</version>
</dependency>
<!--Jeersey dependencies -->
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>

<version>3.8.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Json Schema to POJO plugin-->
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>

<version>0.5.1</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includeAdditionalProperties>false</includeAdditionalProperties>
<includeHashcodeAndEquals>false</includeHashcodeAndEquals>
<outputEncoding>${project.build.sourceEncoding}</outputEncoding>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Generate the java classes from defined json. Select the json file and compile it as below.

Generate JSON to Java

The above steps creates the below java file, implements Serializable inteface to all three classes.

Generated Java Classes

Create the CustomSerailizer class

package com.demo.serialization; 
import java.io.Serializable;
import java.util.Map;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;
public class CustomSerializer<T extends Serializable> implements Serializer<T> {
public CustomSerializer(){}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public byte[] serialize(String topic, T data) {
return SerializationUtils.serialize(data);
}

@Override
public void close() {
}
}

Create the CustomDeserailizer class

package com.demo.serialization;
import java.io.Serializable;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;

public class CustomDeserializer<T extends Serializable> implements Deserializer<T> {

private ObjectMapper objectMapper = new ObjectMapper();
public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(String topic, byte[] objectData) {
return (objectData == null) ? null : (T) SerializationUtils.deserialize(objectData);
}

@Override
public void close() {
}
}

Producer Code

package com.demo.producer;

import com.demo.connections.AppConfiguration;
import com.demo.connections.IAppConfigs;
import com.demo.message.types.DeliveryAddress;
import com.demo.message.types.OrderInvoice;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class OrderProducer {

static Logger logger= LogManager.getLogger(OrderProducer.class.getName());
public static void main(String[] args) throws InterruptedException {
KafkaProducer<String, OrderInvoice> kafkaProducer= getKafkaProducer();
for(int order=10015;order<=10020;order++) {
OrderInvoice orderInvoice = getOrderInvoice(order);

ProducerRecord<String, OrderInvoice> orderRecord = new ProducerRecord<>(IAppConfigs.ORDER_RECEIVED_TOPIC,
orderInvoice.getInvoiceNumber(), orderInvoice);
kafkaProducer.send(orderRecord);
}
logger.info("Event published...");
kafkaProducer.flush();
kafkaProducer.close();
logger.info("Producer closed...");
Thread.sleep(2);

}
private static OrderInvoice getOrderInvoice(int order){
OrderInvoice orderInvoice=new OrderInvoice();
orderInvoice.setInvoiceNumber(Integer.toString(order));
orderInvoice.setDeliveryType("Home");
orderInvoice.setCustomerType("Premium");
DeliveryAddress address=new DeliveryAddress();
address.setAddressLine("Dubai");
orderInvoice.setDeliveryAddress(address);

return orderInvoice;
}
}
private static Properties getKafkaProducerConfig(){
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, IAppConfigs.APPLICATION_ID_CONFIG);
return props;
}

public static KafkaProducer<String, OrderInvoice> getKafkaProducer(){
KafkaProducer<String, OrderInvoice> kafkaProducer=new KafkaProducer<String,OrderInvoice>(getKafkaProducerConfig());
return kafkaProducer;
}

Constant class

package com.demo.connections;

public interface IAppConfigs {
String BOOTSTAP_SERVER="localhost:9092";
String APPLICATION_ID_CONFIG="order-id-config";
String ORDER_RECEIVED_TOPIC="order.received.v10";
}

Consumer Class

package com.demo.producer; 
import com.demo.connections.AppConfiguration;
import com.demo.connections.IAppConfigs;
import com.demo.message.types.OrderInvoice;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;

public class OrderConsumer {
public static void main(String[] args) {
startConsumer();
}
public static void startConsumer() {
KafkaConsumer<String, OrderInvoice> kafkaConsumer= getKafkaConsumer();
kafkaConsumer.subscribe(Collections.singletonList(IAppConfigs.ORDER_RECEIVED_TOPIC));

while(true) {
try {
ConsumerRecords<String, OrderInvoice> orderRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
orderRecords.forEach(record -> {
System.out.println(record.value().getInvoiceNumber());
System.out.println(record.value().getCustomerType());
System.out.println(record.value().getDeliveryType());
System.out.println(record.value().getDeliveryAddress().getAddressLine());
});
}catch(NullPointerException npe){
npe.printStackTrace();
}
}
}
public static KafkaConsumer<String,OrderInvoice> getKafkaConsumer(){
KafkaConsumer<String, OrderInvoice> kafkaConsumer=new KafkaConsumer<String,OrderInvoice>(getKafkaConsumerConfig());
return kafkaConsumer;
}
private static Properties getKafkaConsumerConfig(){
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, IAppConfigs.APPLICATION_ID_CONFIG);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
// consumerProps.put(CustomDeserializer.VALUE_CLASS_NAME_CONFIG, OrderInvoice.class);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "Sample-grp_id");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return consumerProps;
}
}

Start Producer

Producer published the events

Start Consumer

Message Consumer

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.