Kafka — Producer
Prerequisites
Producer Flow
Event is published as ProducerRecord to the topic which resides in and managed by broker. The ProducerRecord is formed and pushed by Producer. It is followed in a step of steps that is depicted as below.
- Create the Configuration Properties
- Create the Producer as below
- Send the message
How does Kafka process the record and send the event to broker?
The publisher API creates the ProducerRecord and call the send() method.
Steps Like:
- Create the ProducerRecord
- Serialization
- Partitioning
- Success and failed scenarios
A complete example is written at the end of the story.
Kafka Producer Configuration
bootstrap.servers
key.serializer
value.serializer
client.id
ack
- ack=0 message is published to leader partition only
- ack=1 message is published to leader and one of the follower partition
- ack=all message is published to leader and all of the follower partition
Three primary methods of sending messages:
- Fire-and-forget
- Synchronous send
- Asynchronous send
Idempotence
Once message is published to any of the partition as per the configured acknowledge, the acknowledgement message might not send to publisher and hence the publisher may send duplicate message, this is called idempotence.
Below configuration is required to implement idempotence.
enable.idempotence=true
- Max.block.ms
- Max.block.bytes
- Linger.ms
- Retry.backoff.ms
- Request.timeout.ms
- max.in.flight.requests.per.connection
Serialization
- Built in Serialization
- Custom Serialization
Callback: The Asynchronous send of the message broker send the metadata back to producer. The callback API is called once the broker send the acknowledgement back to producer.
Producer application can use this API in case if it requires to do any action on the Broker acknowledgement.
package com.finance.common.callback;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CardTopicCallback implements Callback {
static Logger logger= LogManager.getLogger(CardTopicCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
logger.info("Key "+recordMetadata.partition()+
" Value "+recordMetadata.partition()+
" Topic "+recordMetadata.topic()+
" Offset "+recordMetadata.offset());
}
}
Producer application can use this like below.
producer.send(producerRecord, new CardTopicCallback());
Partitions: This can be used to customise the partitioning logic for Kafka. It helps Kafka to publish the particular event to specific topic’s partition. By default the partition is taken care by Kafka in round robin.
Kafka provides the Partitioner interface to implement the partitioning logic.
public class TopicPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return (Integer) key % cluster.partitionsForTopic(topic).size();
}@Override
public void close() {}@Override
public void configure(Map<String, ?> map) {}
}
Interceptor
Helps to change the behaviour of Kafka client application without changing the code. It can be used for the message enrichment or to any this before sending the message or after the message received from Broker.
ProducerInterceptor — Can be implemented for interceptorProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
void onAcknowledgement(RecordMetadata metadata, Exception exception)
onSend() —
- Invoked before the produced record is sent to Kafka, even before serialized.
- Message can be Capture and modify
- The record that this method returns will be serialized and sent to Kafka
onAcknowledgement() —
- Invoked if and when Kafka responds with an acknowledgement for a send.
- The method does not allow modifying the response from Kafka
Enhancing the message with standard headers - This method is called when the record sent to the server has been acknowledged, or when sending the record fails before it gets sent to the server
- This method will generally execute in the background I/O thread, so the implementation should be reasonably fast. Otherwise, sending of messages from other threads could be delayed.
Quotas and Throttling
- Kafka brokers have the ability to limit the rate in which messages are produced and consumed.
- The quota can be set on Producer, consumer and request level.
Default quota
quota.producer.default=2M
Quota for each client
quota.producer.override=”clientA:4M,clientB:10M”
Example:bin/kafka-configs - -bootstrap-server localhost:9092 --alter --add-config ‘producer_byte_rate=1024’ --entity-name clientC --entity-type clients
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config ‘producer_byte_rate=1024,consumer_byte_rate=2048’ --entity-name user1 --entity-type users
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config ‘consumer_byte_rate=2048 --entity-type users
Kafka provides in-built producer and consumers
Creating the topic
kafka-topics --create --bootstrap-server localhost:9092 --topic invoiceTopic -- partitions 5 -.-replication-factor 1 --config segment.bytes=100000
Kafka Console Producer
kafka-console-producer --topic my-topic --broker-list localhost:9092 -property parse.key=true --property key.separator=”:”
name:messging
value:kafka
Kafka Console consumer
kafka-console-consumer --topic my-topic --bootstrap-server localhost:9092 --from-beginning --property print.key=true --property key.separator=”:” --partition=1--offset=4
A Kafka Producer Example
Sending message to a topic →my_topicConsume the topic my kafka-console-consumer
Create a maven based project and add below dependencies.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency><dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency><dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
Create AppConfiguration.java class to get the Producer with required configuration: AppConfiguration.java
import com.finance.util.configuration.IAppConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Properties;
public class AppConfiguration {
Logger logger= LogManager.getLogger(AppConfiguration.class);
public static Properties getProducerConfig(){
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG, IAppConfigs.CARD_PRODUCER_APP_CLIENT_ID);
// create safe Producer
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, IAppConfigs.CARDAPP_ACK_TYPE);
props.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,5);
// high throughput producer (at the expense of a bit of latency and CPU usage)
props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
return props;
}
public static KafkaProducer<String, String> getStringProducer(){
Properties pros= AppConfiguration.getProducerConfig();
KafkaProducer<String, String> producer=new KafkaProducer<String, String>(pros);
return producer;
}
}
Create the constant interface: IAppConfigs.java
package com.finance.util.configuration;
public interface IAppConfigs {
String BOOTSTAP_SERVER="localhost:9092";
String CARD_PRODUCER_APP_CLIENT_ID="card_app_id";
String CARD_STATUS_TOPIC="org.cc.type.status";
}
Create the callback class: CardTopicCallback.java
package com.finance.common.callback;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CardTopicCallback implements Callback {
static Logger logger= LogManager.getLogger(CardTopicCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
logger.info("Key "+recordMetadata.partition()+
" Value "+recordMetadata.partition()+
" Topic "+recordMetadata.topic()+
" Offset "+recordMetadata.offset());
}
}
Create the Producer class: CardStatusChanged.java
package com.finance.producer;
import com.finance.common.callback.CardTopicCallback;
import com.finance.util.configuration.IAppConfigs;
import com.finance.util.connection.AppConfiguration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CardStatusChanged {
static Logger logger= LogManager.getLogger(CardStatusChanged.class);
public static void main(String[] args) {
ProducerRecord<String, String> producerRecord=
new ProducerRecord<String, String>(IAppConfigs.CARD_STATUS_TOPIC,
"",getCardStatusMessage());
logger.info("Preparing producer...");
KafkaProducer<String, String> producer= AppConfiguration.getStringProducer();
producer.send(producerRecord, new CardTopicCallback());
logger.info("Event published...");
producer.flush();
producer.close();
logger.info("Producer closed...");
}
public static String getCardStatusMessage(){
String message="{" +
"\"status\":{\"cardNumber\":\"1234-2234-3454-3234\",\"status\":\"active\"}" +
"}";
return message;
}
}
- Create the topic:
kafka-topics.bat --create --topic --bootstrap-server localhost:9092 org.cc.type.status --partitions 1 --replication-factor 1 --config segment.bytes=1000000
- Start the producer class: CardStatusChanged.java
- Start the console consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic invoice