This story talk about advance feature of Kafka like -

Interceptor

Transaction

Partitioning

Idempotent Producer

Compression

Batching

Interceptor

Helps to change the behaviour of Kafka client application without changing the code. It can be used for the message enrichment or to any this before sending the message or after the message received from Broker.

In order to implement the logic for interceptor in application we need to use the ProducerInterceptor interface on a class and use the class where ever is required.

This interface has two methods to be implemented on the class wherein we can write the logic to change the message as per the requirement of the application.

Expatiation-

  • Invoked before the produced record is sent to Kafka, even before serialized.
  • It captures the message and can modify
  • The record that this method returns will be serialized and sent to Kafka
  • Invoked if and when Kafka responds with an acknowledgement for a send.
  • The method does not allow modifying the response from Kafka
    Enhancing the message with standard headers
  • This method is called when the record sent to the server has been acknowledged, or when sending the record fails before it gets sent to the server
  • This method will generally execute in the background I/O thread, so the implementation should be reasonably fast. Otherwise, sending of messages from other threads could be delayed.

Example:

Requires a java project with class

Explanation

The class InvoiceProduce.java prepare and sends the business message to a topic invoice. This class put the name of interceptor also. Rest logic to function the interceptor class is mentioned in respective class.

Once the message is send by publisher class the onSend method of the configured interceptor class is called and adds some extra headers in message and send to invoice topic.

Once the acknowledge of the send message is received, onAcknowledgement method of the interceptor is called. which sends the audit message to a respective topic-invoice-audit.

Complete Code:

Create the class -InvoiceProducer.java

Create the AuditInterceptorConfig.java

Create the InvoiceInterceptor.java

Create the utility interface: Time.java

Create the utility class: Utils.java

Transaction

The transaction is one of the key critical item while sending the message from publisher. As a developer of publisher code, need to take care for successful publishing of the message to topic.

In case the message is posted successfully, developer has do consider the right way of message handling, that also depends of the case to case basis.

Let’s consider a topic invoice consider to be created with below properties, which says minimum insync replica should not be more than 2.

In case it fails to post the message to broker we can abort the transaction.

Need to add the below method of APIs to producer client in order to implement the transaction.

Implementation

Partitioning

As per the topic anatomy, topic is divided into number of partitions and message is published to a particular offset of the partition.

As per business case, a type of message may required to place on the particular partition and other type of message to different partition of the topic.

The partitioning of the message is based on the key of the message, so the message can be place to intended partition.

There are two way of partitioning.

Default:

  • Partition of message is based on hashing of key if key is provided
  • Message is placed on round robin passion among the partition if key does not exist
  • Kafka tries to distribute the message evenly across the partition

Custom

  • Publisher provides the key and partition logic is provided based of the hashing provided by key
  • The partition key can be primitive type or can be can be based on custom class
  • Producer need to configure with custom partitioner.

Complete code for partition class.

Producer configured with partition class in bold font. You can refer the complete code of InvoiceProducer.java in the same story.

Idempotent Producer Client

If you make the Producer as idempotent then it sends a unique id along with the message to the broker. On the basis of id broker can identify the request whether it is first request or duplicate one.

Generally it is used in case of exactly once delivery semantics implementation (at most once, at least once, exactly once). Suppose the broker is sending the acks and somehow it it fails then idempotent producer will send the the same message again with the same id and broker will identify the message as duplicate message and will not publish to topic.

Implementation

A complete code

Compression

  • Kafka supports compression for any transaction of the message though out the channel and also it save the message in file segment in compressed format.
  • Consumer does not require any change in code. It understands the compression and decompress the message to use it
  • Main compression types are — gzip 7z, zip, snapy. snapy is recommended one. default is null
  • Compression ratio upto 4x!
  • Better transmission and throughput of message’
  • Producer and consumer need some extra time for compress or decompress the message

Implementation

Batching

  • Batching enhances the performance of IO and the through put of the producer.
  • By default Kafka sends the message to broker as soon as it receives.
  • Batches has higher compression ratio and so it possess better efficiency.
  • Linger.ms and batch.size helps Kafka for batching. linger.ms-waits for milliseconds to create the batch before send the message. bacth.size also helps kafka to create the batch of defined size. Either of these configuration becomes true Kafka sends the message.
  • Default batch size is 16kb. A message of bigger the defined size is not batched

Implementation

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.