Kafka Streams — Working with Time

SPOUD
4 min readJan 29, 2024

--

Time based aggregations are frequently required in streaming applications. We may want count daily visitors, create monthly revenue reports, calculate moving averages or collect all user actions of a browsing session. Kafka Streams and the streams DSL support these use cases with time based windowing operators.

Window Operations

With stream.groupByKey() we tell Kafka Streams to create buckets for each key so that we can use stateful operations (i.e. aggregations) on the grouped data. With windowing we can further sub-group our data by time into so called windows.
The Streams DSL provides three different windowing operations:

Tumbling: A new window starts when the previous one closes. WIndows do not overlap.

Hopping: Windows have a fixed length but overlap if advance by is smaller than the window size.

Session: Windows have a variable size. They end when a session times out, i.e. no new events occur for a configured time.

Source: ksqlDB

Event-Time vs. Wallclock-Time

In a streaming application, time can either be taken from timestamps in events (event-time) or from the clock (processing or wallclock-time).
These times can be completely different for various reasons:

  • e.g. a sensor network might record data every 10s, but only send them once an hour
  • a new streams app starts and processes historical data

Switching from the default event-time processing to wallclock time (or even a custom implementation) is possible with a configuration property (developer documentation).

Windowing with Event-Time

Assume your window has a size of 5 seconds [0,5000] and one event arrives with a timestamp of 4000.
After 10 (wallclock) seconds, that window is still considered open because no other event has arrived.
After 12 (wallclock) seconds a new event with a timestamp of 20100 arrives, progresses the stream time and causes the window to be closed.

Windowed Count Example

In this example all events have the same key (for simplicity) but arrive at different times.

Windowed Count
  • Timestamps are denoted as t1 , t2, t3, …​
  • The windowing operator puts events from t1-t3 into window w1, and the event with t4 into a 2nd window w2
  • The stateful count operator counts how many events have occurred in a window and emits updated counts immediately (does not wait until the window closes)

Suppressing results

If you are only interested in the last count at the end of a window, you can use the suppress method from the streams DSL

Window Count Suppress

However, note that the final result is only emitted when the 2nd window starts! An event with a new timestamp is required to advance the stream-time.
This may not be desirable when certain keys get updated very infrequently and has caused confusion in the past.

Buffer results and forward when no more updates are received

We will change this behavior into a mix of event-time windowing and wallclock-time suppression. This should result in a behavior as in the diagram below.

Windowed Count Buffered

We still have a tumbling window which works on event-time and a count operator to count events in each time window.
A custom transformation buffers emitted count events. They are forwarded downstream when no update has been received for i.e. 1 minute (based on wallclock time).

Implement Buffering with a custom transformer

  • punctuations are used to regularly check if the last update time for a key is older than i.e. 1 minute
  • something to consider: What happens when the topic is re-read so that all events are never more than 1 minute apart?
  • processors: custom sink operators, they don’t forward events downstream, avoid using it to interact with external systems (sink connectors are better suited for these tasks)
  • transformer: operator which forwards none or many events with potentially different structure downstream
  • we apply the transformer after the count aggregation, so it will receive windowed keys as input
  • to buffer events we need a state store
  • to regularly check if events can be forwarded we need to schedule a task — this is what punctuations can be used for they run based on wallclock time

Link to the repo with sample code: https://github.com/spoud/kafka-streams-time-based-processor

Want to talk to the author about it? Reach out to us: https://www.linkedin.com/company/spoud.io

--

--