Kafka Streaming API: Introduction

Narayan Kumar
4 min readOct 24, 2020

--

A sample Streaming Application

Concept: While doing the Kafka stream client development, need to take step wise approach. Steps can be-

  • Understand the event stream type and NFR
  • Identify and model Event
  • Transport event
  • Process the event stream

Example:

  • Customer Check-in is an event stream
  • Customer check-in system generate the event
  • Model the event
  • Transport the event though HTTP, JMS, DB etc
  • Process and publish the event

A sample stream client application

Follow the below steps in order to build the Stream client application.

  1. Create the stream configuration
public static Properties getStreamConfigProps(){
Properties props=new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, IAppConfigs.APPLICATION_ID_CONFIG);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
return props;
}

2. Create the Stream Builder

StreamsBuilder streamsBuilder = new StreamsBuilder();

3. Open a Stream for topic

KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);

4. Process the stream

kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));

5. Create the topology

Topology topology = streamsBuilder.build();       

6. Create the Kafka Stream with topology and properties.

KafkaStreams streams = new KafkaStreams(topology, props);
logger.info("Starting stream.");
streams.start();

7. Add the Runtime

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down stream");
streams.close();
}));

The Complete code:

package com.finance.consumers; 
import com.finance.util.connection.AppConfiguration;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class HelloStreams {
private static final Logger logger = LogManager.getLogger(HelloStreams.class);

public static void main(String[] args) {

Properties props= AppConfiguration.getStreamConfigProps();
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> kStream = streamsBuilder.stream("test.stream");
kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
logger.info("Starting stream.");
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down stream");
streams.close();
}));
}
}

Test the code:

Start the producer

C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows>C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test.stream --property parse.key=true --property key.separator=":"
>name:narayan
>age:21
>address:UAE

Start the Stream application

[2020-10-24 10:25:41,165] (com.finance.consumers.HelloStreams) - INFO Starting stream. 
Key= name Value= narayan
Key= age Value= 21
Key= address Value= UAE

Kafka DSL-Streaming

Event Stream — Continuous flow of events, unbounded dataset and immutable data records.

Streaming Operations — Stateless, State full and window based. Used for transform, aggregate, filter and enrich the stream.

Stream APIs
Stateful and Stateless APIs

filter() — Filters the KStream and returns the new KStream which will have only the filtered value as part of stream.

Example-1: In the below stream ks0 is created which will have the value that ends with “yan”.

final KStream<String, String> source=streamsBuilder.stream(IAppConfigs.wordCountTopicName);KStream<String, String> ks0=source.filter((k,v)->v.toLowerCase().endsWith("yan"));

Example-2:

Without lambda approach-

final KStream<String, String> source=streamsBuilder.stream(IAppConfigs.wordCountTopicName);source.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.toLowerCase().endsWith("yan");
}
});

filterNot(): This works just reverse of filter DSL API. It returns the KStream a new KStream which says to not to filter the provided value as part of stream.

Example-1: In the below stream ks0 is created which will have the value that ends with “yan”.

final KStream<String, String> source=streamsBuilder.stream(IAppConfigs.wordCountTopicName);KStream<String, String> ks0=source.filterNot((k,v)->v.toLowerCase().endsWith("yan"));

Example-2:

Without lambda approach-

source.filterNot(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.toLowerCase().endsWith("yan");
}
});

map() — Returns the new KStream with new structure means it can transform the key and value to new one. Re-partition happens based on the new key and value. Mapper function is applied on each function.

KStream<String, CustomerMessage> KS1=KS0.map((key, invoice) -> new KeyValue<>(
invoice.getCustomerCardNo(),
CreateMessages.getCustomerMessage(invoice)
));

Without Lambda style

stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toLowerCase(), v.toUpperCase());
}
});

mapValues(): Used for the Transformation for a value in the stream but it does not impact the key.

source.mapValues(value -> value.toLowerCase());

flatMap(): This works similar to map() function. It transform each record of the input stream into zero or more records in the output stream. Both the key and values can be changed in output records.

Below example returns the collection of key and value group form from one stream. For example a key has values names-John, Murthi, Utaam. The below example will create (names, John),(names, Murthi) and (names, Utaam).

source.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
});

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.