Kafka Streaming API: Introduction

  • Understand the event stream type and NFR
  • Identify and model Event
  • Transport event
  • Process the event stream
  • Customer Check-in is an event stream
  • Customer check-in system generate the event
  • Model the event
  • Transport the event though HTTP, JMS, DB etc
  • Process and publish the event
  1. Create the stream configuration
public static Properties getStreamConfigProps(){
Properties props=new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, IAppConfigs.BOOTSTAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, IAppConfigs.APPLICATION_ID_CONFIG);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
return props;
}
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName);
kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
Topology topology = streamsBuilder.build();       
KafkaStreams streams = new KafkaStreams(topology, props);
logger.info("Starting stream.");
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down stream");
streams.close();
}));
package com.finance.consumers; 
import com.finance.util.connection.AppConfiguration;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;

public class HelloStreams {
private static final Logger logger = LogManager.getLogger(HelloStreams.class);

public static void main(String[] args) {

Properties props= AppConfiguration.getStreamConfigProps();
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> kStream = streamsBuilder.stream("test.stream");
kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
logger.info("Starting stream.");
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down stream");
streams.close();
}));
}
}
C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows>C:\Tools\Kafka\kafka_2.12-2.6.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test.stream --property parse.key=true --property key.separator=":"
>name:narayan
>age:21
>address:UAE
[2020-10-24 10:25:41,165] (com.finance.consumers.HelloStreams) - INFO Starting stream. 
Key= name Value= narayan
Key= age Value= 21
Key= address Value= UAE

Kafka DSL-Streaming

Event Stream — Continuous flow of events, unbounded dataset and immutable data records.

Stream APIs
Stateful and Stateless APIs
final KStream<String, String> source=streamsBuilder.stream(IAppConfigs.wordCountTopicName);KStream<String, String> ks0=source.filter((k,v)->v.toLowerCase().endsWith("yan"));
final KStream<String, String> source=streamsBuilder.stream(IAppConfigs.wordCountTopicName);source.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.toLowerCase().endsWith("yan");
}
});
final KStream<String, String> source=streamsBuilder.stream(IAppConfigs.wordCountTopicName);KStream<String, String> ks0=source.filterNot((k,v)->v.toLowerCase().endsWith("yan"));
source.filterNot(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.toLowerCase().endsWith("yan");
}
});
KStream<String, CustomerMessage> KS1=KS0.map((key, invoice) -> new KeyValue<>(
invoice.getCustomerCardNo(),
CreateMessages.getCustomerMessage(invoice)
));
stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toLowerCase(), v.toUpperCase());
}
});
source.mapValues(value -> value.toLowerCase());
source.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
});

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Narayan Kumar

Narayan Kumar

93 Followers

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.