Kafka — Producer

Narayan Kumar
5 min readOct 18, 2020

Prerequisites

Producer Flow

Event is published as ProducerRecord to the topic which resides in and managed by broker. The ProducerRecord is formed and pushed by Producer. It is followed in a step of steps that is depicted as below.

  • Create the Configuration Properties
  • Create the Producer as below
  • Send the message

How does Kafka process the record and send the event to broker?

The publisher API creates the ProducerRecord and call the send() method.

Steps Like:

  • Create the ProducerRecord
  • Serialization
  • Partitioning
  • Success and failed scenarios

A complete example is written at the end of the story.

Kafka Producer Configuration

bootstrap.servers
key.serializer
value.serializer
client.id
ack

  1. ack=0 message is published to leader partition only
  2. ack=1 message is published to leader and one of the follower partition
  3. ack=all message is published to leader and all of the follower partition

Three primary methods of sending messages:

  1. Fire-and-forget
  2. Synchronous send
  3. Asynchronous send

Idempotence

Once message is published to any of the partition as per the configured acknowledge, the acknowledgement message might not send to publisher and hence the publisher may send duplicate message, this is called idempotence.

Below configuration is required to implement idempotence.

enable.idempotence=true

  • Max.block.ms
  • Max.block.bytes
  • Linger.ms
  • Retry.backoff.ms
  • Request.timeout.ms
  • max.in.flight.requests.per.connection

Serialization

  • Built in Serialization
  • Custom Serialization

Callback: The Asynchronous send of the message broker send the metadata back to producer. The callback API is called once the broker send the acknowledgement back to producer.

Producer application can use this API in case if it requires to do any action on the Broker acknowledgement.

package com.finance.common.callback;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CardTopicCallback implements Callback {
static Logger logger= LogManager.getLogger(CardTopicCallback.class);

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
logger.info("Key "+recordMetadata.partition()+
" Value "+recordMetadata.partition()+
" Topic "+recordMetadata.topic()+
" Offset "+recordMetadata.offset());
}
}

Producer application can use this like below.

producer.send(producerRecord, new CardTopicCallback());

Partitions: This can be used to customise the partitioning logic for Kafka. It helps Kafka to publish the particular event to specific topic’s partition. By default the partition is taken care by Kafka in round robin.

Kafka provides the Partitioner interface to implement the partitioning logic.

public class TopicPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return (Integer) key % cluster.partitionsForTopic(topic).size();
}
@Override
public void close() {
}@Override
public void configure(Map<String, ?> map) {
}
}

Interceptor

Helps to change the behaviour of Kafka client application without changing the code. It can be used for the message enrichment or to any this before sending the message or after the message received from Broker.

ProducerInterceptor — Can be implemented for interceptorProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
void onAcknowledgement(RecordMetadata metadata, Exception exception)

onSend() —

  • Invoked before the produced record is sent to Kafka, even before serialized.
  • Message can be Capture and modify
  • The record that this method returns will be serialized and sent to Kafka

onAcknowledgement() —

  • Invoked if and when Kafka responds with an acknowledgement for a send.
  • The method does not allow modifying the response from Kafka
    Enhancing the message with standard headers
  • This method is called when the record sent to the server has been acknowledged, or when sending the record fails before it gets sent to the server
  • This method will generally execute in the background I/O thread, so the implementation should be reasonably fast. Otherwise, sending of messages from other threads could be delayed.

Quotas and Throttling

  • Kafka brokers have the ability to limit the rate in which messages are produced and consumed.
  • The quota can be set on Producer, consumer and request level.

Default quota

quota.producer.default=2M

Quota for each client
quota.producer.override=”clientA:4M,clientB:10M”

Example:
bin/kafka-configs - -bootstrap-server localhost:9092 --alter --add-config ‘producer_byte_rate=1024’ --entity-name clientC --entity-type clients

bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config ‘producer_byte_rate=1024,consumer_byte_rate=2048’ --entity-name user1 --entity-type users

bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config ‘consumer_byte_rate=2048 --entity-type users

Kafka provides in-built producer and consumers

Creating the topic

kafka-topics --create --bootstrap-server localhost:9092 --topic invoiceTopic -- partitions 5 -.-replication-factor 1 --config segment.bytes=100000

Kafka Console Producer

kafka-console-producer --topic my-topic --broker-list localhost:9092 -property parse.key=true --property key.separator=”:”
name:messging
value:kafka

Kafka Console consumer

kafka-console-consumer --topic my-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=”:” --partition=1--offset=4

A Kafka Producer Example

Sending message to a topic →my_topicConsume the topic my kafka-console-consumer

Create a maven based project and add below dependencies.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>

Create AppConfiguration.java class to get the Producer with required configuration: AppConfiguration.java

import com.finance.util.configuration.IAppConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;
public class AppConfiguration {
Logger logger= LogManager.getLogger(AppConfiguration.class);

public static Properties getProducerConfig(){
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG, IAppConfigs.CARD_PRODUCER_APP_CLIENT_ID);

// create safe Producer
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, IAppConfigs.CARDAPP_ACK_TYPE);
props.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,5);

// high throughput producer (at the expense of a bit of latency and CPU usage)
props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
return props;
}

public static KafkaProducer<String, String> getStringProducer(){
Properties pros= AppConfiguration.getProducerConfig();
KafkaProducer<String, String> producer=new KafkaProducer<String, String>(pros);
return producer;
}
}

Create the constant interface: IAppConfigs.java

package com.finance.util.configuration;

public interface IAppConfigs {
String BOOTSTAP_SERVER="localhost:9092";
String CARD_PRODUCER_APP_CLIENT_ID="card_app_id";
String CARD_STATUS_TOPIC="org.cc.type.status";
}

Create the callback class: CardTopicCallback.java

package com.finance.common.callback;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CardTopicCallback implements Callback {
static Logger logger= LogManager.getLogger(CardTopicCallback.class);

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
logger.info("Key "+recordMetadata.partition()+
" Value "+recordMetadata.partition()+
" Topic "+recordMetadata.topic()+
" Offset "+recordMetadata.offset());
}
}

Create the Producer class: CardStatusChanged.java

package com.finance.producer;

import com.finance.common.callback.CardTopicCallback;
import com.finance.util.configuration.IAppConfigs;
import com.finance.util.connection.AppConfiguration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CardStatusChanged {
static Logger logger= LogManager.getLogger(CardStatusChanged.class);
public static void main(String[] args) {
ProducerRecord<String, String> producerRecord=
new ProducerRecord<String, String>(IAppConfigs.CARD_STATUS_TOPIC,
"",getCardStatusMessage());
logger.info("Preparing producer...");
KafkaProducer<String, String> producer= AppConfiguration.getStringProducer();
producer.send(producerRecord, new CardTopicCallback());
logger.info("Event published...");
producer.flush();
producer.close();
logger.info("Producer closed...");
}

public static String getCardStatusMessage(){
String message="{" +
"\"status\":{\"cardNumber\":\"1234-2234-3454-3234\",\"status\":\"active\"}" +
"}";
return message;
}
}
  • Create the topic:
kafka-topics.bat --create --topic --bootstrap-server localhost:9092 org.cc.type.status --partitions 1 --replication-factor 1 --config segment.bytes=1000000
  • Start the producer class: CardStatusChanged.java
  • Start the console consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic invoice

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.