Kafka Producer-Advance

Narayan Kumar
9 min readOct 23, 2020

This story talk about advance feature of Kafka like -

Interceptor

Transaction

Partitioning

Idempotent Producer

Compression

Batching

Interceptor

Helps to change the behaviour of Kafka client application without changing the code. It can be used for the message enrichment or to any this before sending the message or after the message received from Broker.

In order to implement the logic for interceptor in application we need to use the ProducerInterceptor interface on a class and use the class where ever is required.

This interface has two methods to be implemented on the class wherein we can write the logic to change the message as per the requirement of the application.

1. interceptorProducerRecord<K, V> onSend(ProducerRecord<K, V> record)2. void onAcknowledgement(RecordMetadata metadata, Exception exception)

Expatiation-

interceptorProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
  • Invoked before the produced record is sent to Kafka, even before serialized.
  • It captures the message and can modify
  • The record that this method returns will be serialized and sent to Kafka
void onAcknowledgement(RecordMetadata metadata, Exception exception)
  • Invoked if and when Kafka responds with an acknowledgement for a send.
  • The method does not allow modifying the response from Kafka
    Enhancing the message with standard headers
  • This method is called when the record sent to the server has been acknowledged, or when sending the record fails before it gets sent to the server
  • This method will generally execute in the background I/O thread, so the implementation should be reasonably fast. Otherwise, sending of messages from other threads could be delayed.

Example:

Requires a java project with class

Explanation

The class InvoiceProduce.java prepare and sends the business message to a topic invoice. This class put the name of interceptor also. Rest logic to function the interceptor class is mentioned in respective class.

Once the message is send by publisher class the onSend method of the configured interceptor class is called and adds some extra headers in message and send to invoice topic.

Once the acknowledge of the send message is received, onAcknowledgement method of the interceptor is called. which sends the audit message to a respective topic-invoice-audit.

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, InvoiceInterceptor.class.getName());
props.put(AuditInterceptorConfig.AUDIT_TOPIC_CONFIG, "invoice-audit");
props.put(AuditInterceptorConfig.AUDIT_APPLICATION_ID_CONFIG,
"invoice-interceptor-id");

Complete Code:

Create the class -InvoiceProducer.java

package com.finance.common.interceptor; import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class InvoiceProducer {
private static CountDownLatch latch = new CountDownLatch(1);

public static void main(String[] args) throws InterruptedException {

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(newProducerConfig());

// Add shutdown hook to respond to SIGTERM and gracefully stop the application.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Closing application gracefully (SIGTERM)");
latch.countDown();
producer.close();
System.out.println("Closed");
}));

while (latch.getCount() > 0) {
producer.send(new ProducerRecord<>("invoice", "invoice-number", "invoice-1"), (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Successfully send record to topic=%s, partition=%s with offset=%d\n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
if (latch.await(1, TimeUnit.SECONDS)) {
break;
}
}
System.out.println("Stop to produce records");
}

private static Properties newProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");

// Configure interceptor and attached configuration.
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, InvoiceInterceptor.class.getName());
props.put(AuditInterceptorConfig.AUDIT_TOPIC_CONFIG, "invoice-audit");
props.put(AuditInterceptorConfig.AUDIT_APPLICATION_ID_CONFIG,
"invoice-interceptor-id");
return props;
}
}

Create the AuditInterceptorConfig.java

package com.finance.common.interceptor;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

public class AuditInterceptorConfig extends AbstractConfig {

public static final String AUDIT_APPLICATION_ID_CONFIG = "audit.application.id";
public static final String AUDIT_APPLICATION_ID_DOC = "The application id used to identify the producer";

public static final String AUDIT_TOPIC_CONFIG = "audit.topic";
public static final String AUDIT_TOPIC_DOC = "The topic name";

private static final ConfigDef CONFIG;
public static final String AUDIT_PRODUCER_PREFIX = "audit.producer.";

static {
CONFIG = new ConfigDef()
.define(AUDIT_APPLICATION_ID_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, AUDIT_APPLICATION_ID_DOC)
.define(AUDIT_TOPIC_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, AUDIT_TOPIC_DOC);

}

/**
* Creates a new {@link AuditInterceptorConfig} instance.
*
* @param originals the interceptor configuration.
*/
AuditInterceptorConfig(final Map<String, ?> originals) {
super(CONFIG, originals);
}

/**
* Creates a new {@link AuditInterceptorConfig} instance.
*
* @param definition the {@link ConfigDef} instance.
* @param originals the interceptor configuration.
*/
private AuditInterceptorConfig(final ConfigDef definition,
final Map<String, ?> originals) {
super(definition, originals);
}

public Map<String, Object> getOverrideProducerConfigs() {
return originalsWithPrefix(AUDIT_PRODUCER_PREFIX);
}


public String getAuditTopic() {
return this.getString(AUDIT_TOPIC_CONFIG);
}

public String getAuditApplicationId() {
return this.getString(AUDIT_APPLICATION_ID_CONFIG);
}
}

Create the InvoiceInterceptor.java

package com.finance.common.interceptor;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

public class InvoiceInterceptor implements ProducerInterceptor {

private static final Charset CHARSET = Charset.forName("UTF-8");

private static final String TRACKING_CORRELATION_ID = "trackingCorrelationId";
private static final String TRACKING_ON_SEND_EPOCH_TIME = "trackingOnSendEpochTime";
private static final String TRACKING_APPLICATION_ID = "trackingApplicationId";

// Record Metadata
private static final String TRACKING_PARTITION = "partition";
private static final String TRACKING_OFFSET = "offset";
private static final String TRACKING_TIMESTAMP = "timestamp";
private static final String TRACKING_TOPIC = "topic";
private static final String TRACKING_SERIALIZED_KEY_SIZE_ = "serializedKeySize";
private static final String TRACKING_SERIALIZED_VALUE_SIZE = "serializedValueSize";

private static final String JSON_OPEN_BRACKET = "{";
private static final String JSON_CLOSE_BRACKET = "}";

private String originalsClientId;

private AuditInterceptorConfig configs;

private Producer<String, String> producer;


@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
producerRecord.headers()
.add(TRACKING_CORRELATION_ID, CorrelationIdGenerator.getId())
.add(TRACKING_APPLICATION_ID, configs.getAuditApplicationId().getBytes(CHARSET))
.add(TRACKING_ON_SEND_EPOCH_TIME, Utils.longToBytes(Time.SYSTEM.milliseconds()));
return producerRecord;
}



/**
* {@inheritDoc}
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
final String value = getJsonTrackingMessage(metadata);
producer.send(new ProducerRecord<>(configs.getAuditTopic(), value));
}
}

private String getJsonTrackingMessage(RecordMetadata metadata) {
return JSON_OPEN_BRACKET +
"\"" + "timestamp" + "\":\"" + Time.SYSTEM.milliseconds() + "\"" +
"\",client\":" +
JSON_OPEN_BRACKET +
"\"" + "clientId" + "\":\"" + originalsClientId + "\"" +
",\"" + "applicationId" + "\":\"" + configs.getAuditApplicationId() + "\"" +
",\"" + "type" + "\":\"producer\"" +
JSON_CLOSE_BRACKET +
",\"record\":" +
JSON_OPEN_BRACKET +
"\"" + TRACKING_PARTITION + "\":\"" + metadata.partition() + "\"" +
",\"" + TRACKING_TOPIC + "\":\"" + metadata.topic() + "\"" +
",\"" + TRACKING_SERIALIZED_KEY_SIZE_ + "\":\"" + metadata.serializedKeySize() + "\"" +
",\"" + TRACKING_SERIALIZED_VALUE_SIZE + "\":\"" + metadata.serializedValueSize() + "\"" +
",\"" + TRACKING_OFFSET + "\":\"" + (metadata.hasOffset() ? metadata.offset() : -1) + "\"" +
",\"" + TRACKING_TIMESTAMP + "\":\"" + (metadata.hasTimestamp() ? metadata.timestamp() : -1) + "\"" +
JSON_CLOSE_BRACKET +
JSON_CLOSE_BRACKET;
}

/**
* {@inheritDoc}
*/
@Override
public void close() {
if (producer != null) {
producer.close();
}
}

/**
* {@inheritDoc}
*/
@Override
public void configure(final Map<String, ?> configs) {

final Map<String, Object> copyConfigs = new HashMap<>(configs);
// Drop interceptor classes configuration to not introduce loop.
copyConfigs.remove(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG);

this.originalsClientId = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);

String interceptorClientId = (originalsClientId == null) ?
"interceptor-producer-" + ClientIdGenerator.nextClientId() :
"interceptor-" + originalsClientId;

copyConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, interceptorClientId);

this.configs = new AuditInterceptorConfig(copyConfigs);

copyConfigs.putAll(this.configs.getOverrideProducerConfigs());

// Enforce some properties to get a non-blocking producer;
copyConfigs.put(ProducerConfig.RETRIES_CONFIG, "0");
copyConfigs.put(ProducerConfig.ACKS_CONFIG, "1");
copyConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "0");
copyConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
copyConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

this.producer = new KafkaProducer<>(copyConfigs);
}

private static class ClientIdGenerator {

private static final AtomicInteger IDS = new AtomicInteger(0);

static int nextClientId() {
return IDS.getAndIncrement();
}
}

private static class CorrelationIdGenerator {

private static final Charset CHARSET = Charset.forName("UTF-8");

static byte[] getId() {
return UUID.randomUUID().toString().getBytes(CHARSET);
}
}
}

Create the utility interface: Time.java

package com.finance.common.interceptor;
public interface Time {

Time SYSTEM = new SystemTime();
long milliseconds();
void sleep(long ms);

class SystemTime implements Time {

@Override
public long milliseconds() {
return System.currentTimeMillis();
}

@Override
public void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// just wake up early
Thread.currentThread().interrupt();
}
}
}
}

Create the utility class: Utils.java

package com.finance.common.interceptor;

import java.nio.ByteBuffer;

public class Utils {

public static byte[] longToBytes(long l) {
return ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(l).array();
}
}

Transaction

The transaction is one of the key critical item while sending the message from publisher. As a developer of publisher code, need to take care for successful publishing of the message to topic.

In case the message is posted successfully, developer has do consider the right way of message handling, that also depends of the case to case basis.

Let’s consider a topic invoice consider to be created with below properties, which says minimum insync replica should not be more than 2.

In case it fails to post the message to broker we can abort the transaction.

C:\>kafka-topics.bat --create --bootstrap-server localhost:9092 --topic invoice-topic --partitions 5 --replication-factor 3 -config min.insync.replicas=2

Need to add the below method of APIs to producer client in order to implement the transaction.

producer.initTransactions();
producer.beginTransaction();
producer.commitTransaction();

Implementation

private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException, ExecutionException {

KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(newProducerConfig());

// Add shutdown hook to respond to SIGTERM and gracefully stop the application.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
latch.countDown();
producer.close();
System.out.println("Closed");
}));
//initiate transactions
producer.initTransactions();
// Add shutdown hook to respond to SIGTERM and gracefully stop the application.
int i=0;
while(latch.getCount() > 0) {
producer.beginTransaction();
producer.send(new ProducerRecord<>("test.topic", i++, "invoice-"+i), new CardTopicCallback());
System.out.println("Message Sent - Invoice - "+i);
producer.commitTransaction();

if (latch.await(1, TimeUnit.SECONDS)) {
break;
}
}
}
--------------------------------------------------------------------
private static Properties newProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

//Transaction implementation
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"uniq_txn_id-4");
return props;
}
---------------------------------------------------------------------------------------------CardTopicCallback.java-----------------------------------------------------------------------------------------
package com.finance.common.callback;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CardTopicCallback implements Callback {
static Logger logger= LogManager.getLogger(CardTopicCallback.class);

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
logger.info("Key "+recordMetadata.partition()+
" Value "+recordMetadata.partition()+
" Topic "+recordMetadata.topic()+
" Offset "+recordMetadata.offset());
}
}

Partitioning

As per the topic anatomy, topic is divided into number of partitions and message is published to a particular offset of the partition.

As per business case, a type of message may required to place on the particular partition and other type of message to different partition of the topic.

The partitioning of the message is based on the key of the message, so the message can be place to intended partition.

There are two way of partitioning.

Default:

  • Partition of message is based on hashing of key if key is provided
  • Message is placed on round robin passion among the partition if key does not exist
  • Kafka tries to distribute the message evenly across the partition

Custom

  • Publisher provides the key and partition logic is provided based of the hashing provided by key
  • The partition key can be primitive type or can be can be based on custom class
  • Producer need to configure with custom partitioner.
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CardTopicPartition.class);

Complete code for partition class.

public class TopicPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {if ((keyBytes == null) || (!(key instanceof Integer))) {System.out.println("Partition .size() "+cluster.partitionsForTopic(topic).size() );throw new InvalidTopicException("Topic Key must have a valid Integer value.");}return (Integer) key % cluster.partitionsForTopic(topic).size();}@Overridepublic void close() { }@Overridepublic void configure(Map<String, ?> map) { }}

Producer configured with partition class in bold font. You can refer the complete code of InvoiceProducer.java in the same story.

private static Properties newProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CardTopicPartition.class);

// Configure interceptor and attached configuration.
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, InvoiceInterceptor.class.getName());
props.put(AuditInterceptorConfig.AUDIT_TOPIC_CONFIG, "invoice-audit");
props.put(AuditInterceptorConfig.AUDIT_APPLICATION_ID_CONFIG, "invoice-interceptor-id");
return props;
}

Idempotent Producer Client

If you make the Producer as idempotent then it sends a unique id along with the message to the broker. On the basis of id broker can identify the request whether it is first request or duplicate one.

Generally it is used in case of exactly once delivery semantics implementation (at most once, at least once, exactly once). Suppose the broker is sending the acks and somehow it it fails then idempotent producer will send the the same message again with the same id and broker will identify the message as duplicate message and will not publish to topic.

Implementation

props.put(ProducerConfig.ACKS_CONFIG,"all"); //required all to set for idempotent client
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); //maximum 1 request is tried at any time. preventing message
//re-ordering in case of retries.

A complete code

package com.finance.common.interceptor;
import com.finance.common.callback.CardTopicCallback;
import com.finance.common.partition.CardTopicPartition;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class InvoiceProducer {
private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException, ExecutionException {

KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(newProducerConfig());

// Add shutdown hook to respond to SIGTERM and gracefully stop the application.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Closing application gracefully (SIGTERM)");
latch.countDown();
producer.close();
System.out.println("Closed");
}));

producer.initTransactions(); //initiate transactions
// Add shutdown hook to respond to SIGTERM and gracefully stop the application.

int i=0;
while(latch.getCount() > 0) {
producer.beginTransaction();
producer.send(new ProducerRecord<>("test.topic", i++, "invoice-"+i), new CardTopicCallback());
System.out.println("Message Sent - Invoice - "+i);
producer.commitTransaction();

if (latch.await(1, TimeUnit.SECONDS)) {
break;
}

}
}

private static Properties newProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

//Idempotent
props.put(ProducerConfig.ACKS_CONFIG,"all"); //required all to set for idempotence client
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); //maximum 1 request is tried at any time. preventing message
//re-ordering in case of retries.


return props;
}
}

Compression

  • Kafka supports compression for any transaction of the message though out the channel and also it save the message in file segment in compressed format.
  • Consumer does not require any change in code. It understands the compression and decompress the message to use it
  • Main compression types are — gzip 7z, zip, snapy. snapy is recommended one. default is null
  • Compression ratio upto 4x!
  • Better transmission and throughput of message’
  • Producer and consumer need some extra time for compress or decompress the message

Implementation

props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); //Enable compression.

Batching

  • Batching enhances the performance of IO and the through put of the producer.
  • By default Kafka sends the message to broker as soon as it receives.
  • Batches has higher compression ratio and so it possess better efficiency.
  • Linger.ms and batch.size helps Kafka for batching. linger.ms-waits for milliseconds to create the batch before send the message. bacth.size also helps kafka to create the batch of defined size. Either of these configuration becomes true Kafka sends the message.
  • Default batch size is 16kb. A message of bigger the defined size is not batched

Implementation

props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));//32 KB size

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.