Stream Aggregation In Kafka

Narayan Kumar
5 min readOct 31, 2020

Introducing the aggregation in Kafka and explained this in easy way to implement the Aggregation on real time streaming.

In order to aggregate the stream we need do two steps operations.

  1. Group the stream — groupBy(k,v) (if Key exist in stream) or groupByKey() — Data must partitioned by key.

groupBy or groupByKey uses the through() internally to group the stream by different task on different partitions.

2. Aggregate — count(), reduce(), aggregate()

count() — Simple method used to count the elements

Streaming APIs-

  • Key preserving APIs — mapValues(), flatMapValues(), groupByKey() — Use when you change the Values only of the stream and not requires to change the values of stream.
  • Key Changing APIs — map(), flatMap(), groupBy() — Use when you need to change the values of Stream.

Reduce() method

Count() aggregation method used for simple counting of item in the group. I case of counting of complex object, where the stream is based on key X but requires to count the items based on key Y. Means the key required to change before grouping the stream reduce() is the choice.

  • It initialize the store automatically
  • It knows the input and out type

Limitation of reduce method — Does not allow to change the type of steam, means the type of input stream of reduce method.

Input for reduce method — KGroupedStream<String, Notification>

Output of reduce method — KTable<String, Notification> KT0

KStream<String, Notification> KS1=KS0.map((key, inv) -> new KeyValue<>(
inv.getCustomerCardNo(),
Notifications.getNotificationFrom(inv)
));

KGroupedStream<String, Notification> KGS0=KS1.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.Notification()));

KTable<String, Notification> KT0 = KGS0.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
});

KT0.toStream().print(Printed.<String, Notification>toSysOut().withLabel("[Total Earning]"));

Explanation:

KStream<String, Notification> KS1=KS0.map((key, inv) -> new KeyValue<>(
inv.getCustomerCardNo(),
Notifications.getNotificationFrom(inv)
));

map() — method changing the key and values as grouping is required to be done on customer card number. Value is set Notification type of Object and returns a new stream KS1.

KGroupedStream<String, Notification> KGS0=KS1.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.Notification()));

The above code grouping the stream in key(customer card number) as set before this code.

KTable<String, Notification> KT0 = KGS0.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
});

The above code applies the reduce method on grouped stream. reduce() method takes key and values as an argument, which adds the new value of earned loyalty points with the exiting value of it and returns as a KTable.

Aggregate method:

As you can see there are limitation of reduce method, that is overcome in aggregate method. aggregate method does not have any limitation of same type of input and outputs but have the limitation of initialization as this is plus point with reduce method.

aggregation with aggregate method works in three steps.

  1. initialization
  2. aggregate
  3. serialization

Use case:

Suppose the requirement is to return the average and total salary of each department.

input:

101:{“id”:101,”name”:”Emp1”,”department”:”IT”, “salary”:5000}

102:{“id”:102,”name”:”Emp2”,”department”:”Finance”, “salary”:5000}

103:{“id”:103,”name”:”Emp2”,”department”:”IT”, “salary”:5000}

104:{“id”:104,”name”:”Emp4”,”department”:”finance”, “salary”:5000}

Step-1: Consume the topic

streamsBuilder.table(IAppConfigs.empTopicName,
Consumed.with(AppSerdes.String(), AppSerdes.Employee()))

Step-2: groupby

.groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(AppSerdes.String(), AppSerdes.Employee()))

Step-3: aggregete

.aggregate(
//Initializer
() -> new DepartmentAggregate()
.withEmployeeCount(0)
.withTotalSalary(0)
.withAvgSalary(0D),
//Adder
(k, v, aggV) -> new DepartmentAggregate()
.withEmployeeCount(aggV.getEmployeeCount() + 1)
.withTotalSalary(aggV.getTotalSalary() + v.getSalary())
.withAvgSalary((aggV.getTotalSalary() + v.getSalary()) / (aggV.getEmployeeCount() + 1D)),
//Subtractor
(k, v, aggV) -> new DepartmentAggregate()
.withEmployeeCount(aggV.getEmployeeCount() - 1)
.withTotalSalary(aggV.getTotalSalary() - v.getSalary())
.withAvgSalary((aggV.getTotalSalary() - v.getSalary()) / (aggV.getEmployeeCount() - 1D)),
//Serializer
Materialized.<String, DepartmentAggregate, KeyValueStore<Bytes, byte[]>>as(
IAppConfigs.empDeptstateStoreName).withValueSerde(AppSerdes.DepartmentAggregate())
).toStream().print(Printed.<String, DepartmentAggregate>toSysOut().withLabel("Department Aggregate"));

Windowed Aggregation

This divides the infinite aggregations into fix time window of stream, like if you need to aggregate the sales stream on interval of day/hour/min called windowed aggregation. This looks for a time frame, which is created based on start time and end time of event stream.

Before getting into depth of it lets see the time stages of event.

Event Time Stamp

Event Time — Time of event is published by source system

Ingestion Time — The time of event received by Kafka

Processing Time — Processing of event by Kafka Stream APIs

There are several types of time window can be created.

  1. Tumbling Window — Fixed size window.
  2. Tumbling window with Grace Period — Fixed size window with small grace period.
  3. Session based Window — User session based window
  4. Suppressing Window — Suppress the intermediate result
  5. Hoping window — Overlapping Window
  6. Tumbling Window —
  • Fixed size window
  • Defined as below
KTable<Windowed<String>, Long> KT0 = KS0.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.SimpleInvoice()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
  • Kafka decide the start time of window based on the timestamp of the first message

2. Tumbling window with Grace Period —

  • Fixed size window with small grace period
  • Used when the event is created first but delivered late to Kafka, while after late event new event reached first. In this case the grace period can be provided to accommodate the late arrival of event.
  • Defined as below
KTable<Windowed<String>, Long> KT0 = KS0.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.SimpleInvoice()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(2)))
.count();

3. Session based Window —

  • User session based window
  • Aggregates the events and create new stream for complete session
  • Can be used for the aggregating the user action if logs in
  • Below code SessionWindows is defining the session of user
  • The session is defined based on session time out of user logged in
  • The defined time says if there is no action for this duration from source then session will be consider as terminated and Stream APIs process the entire stream
KTable<Windowed<String>, Long> KT01 =  KS0.groupByKey(Grouped.with(AppSerdes.String(),AppSerdes.UserClicks()))
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))
.count();

4. Suppressing Window —

  • Suppress the intermediate result of event, in case in the middle of the defined with need to send the alert to out to Kafka boundary
  • It can be use full if the use case like to get the continuous transaction events from customer but in one session of 5 mins, customer makes more than 10 transaction then sent the fraud alert to customer.
  • Here in use case 5 mins is a window so the alert will not be trigger until 5 transactions completes.
KStream<String, HeartBeat> KS0 = streamsBuilder.stream("transaction.topic",
Consumed.with(AppSerdes.String(), AppSerdes.HeartBeat())
.withTimestampExtractor(new ApplicationTimestampExtractor())
);
KTable<Windowed<String>, Long> KT01 = KS0.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.HeartBeat()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10)))
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
KT01.toStream().foreach(
(wKey, value) -> logger.info(
"App ID: " + wKey.key() + " Window ID: " + wKey.window().hashCode() +
" Window start: " + Instant.ofEpochMilli(wKey.window().start()).atOffset(ZoneOffset.UTC) +
" Window end: " + Instant.ofEpochMilli(wKey.window().end()).atOffset(ZoneOffset.UTC) +
" Count: " + value +
(value>2? " Application is Alive" : " Application Failed - Sending Alert Email...")
)
);
  • In the above code the Stream API is waiting will the 60 seconds of defined window then after it comes to evaluate the condition to send the notification

5. Hoping window —

  • This is also termed as overlapping or sliding window
  • It is fixed size moving window
Hopping Windows

Example of events

key1:{“transactionId”:”Txn-1", “CreatedTime”:”2020–11–03T12:20:10.00z”, “amount”:4000}

key2:{“transactionId”:”Txn-2", “CreatedTime”:”2020–11–03T12:21:12.00z”, “amount”:4000}

key3:{“transactionId”:”Txn-3", “CreatedTime”:”2020–11–03T12:21:13.00z”, “amount”:4000}

key1:{“transactionId”:”Txn-5", “CreatedTime”:”2020–11–03T12:23:16.00z”, “amount”:4000}

key2:{“transactionId”:”Txn-5", “CreatedTime”:”2020–11–03T12:24:18.00z”, “amount”:4000}

KTable<Windowed<String>, Long> KT0 = KS0.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.SimpleInvoice()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(2))).count();

--

--

Narayan Kumar

Middleware Expert in Design and Development, Working on Kafka, Eager to investigate and learn — Software Engineer.