Stream and Apply Real-Time Prediction Models on High-Throughput Time-Series Data

Many of the stream processing libraries usually are not python friendly while nearly all of machine learning and data mining libraries are python based. Although the Faust library goals to bring Kafka Streaming ideas into the Python ecosystem, it could pose challenges by way of ease of use. This document serves as a tutorial and offers best practices for effectively utilizing Faust.
In the primary section, I present an introductory overview of stream processing concepts, drawing extensively from the book Designing Data-Intensive Applications [1]. Following that, I explore the important thing functionalities of the Faust library, placing emphasis on Faust windows, which are sometimes difficult to know from the available documentation and utilize efficiently. Consequently, I propose another approach to utilizing Faust windows by leveraging the library’s own functions. Lastly, I share my experience implementing an analogous pipeline on the Google Cloud Platform.
A stream refers to unbounded data that’s incrementally made available over time. An event is a small, self-contained object that accommodates the main points of something happened sooner or later in time e.g. user interaction. An event is generated by a producer (e.g. temperature sensor) and will be consumed by some consumers (e.g. online dashboard). Traditional databases are ill-suited for storing events in high throughput event streams. That is because of the necessity for consumers to periodically poll the database to discover recent events, leading to significant overhead. As an alternative, it is healthier for consumers to be notified when recent events appear and messaging systems are designed for doing this.
A message broker is a widely adopted system for messaging, during which producers write messages to the broker, and consumers are notified by the broker and receive these messages. AMQP-based message brokers, like RabbitMQ, are commonly employed for asynchronous message passing between services and task queues. Unlike databases, they adopt a transient messaging mindset and delete a message only after it has been acknowledged by its consumers. When processing messages becomes resource-intensive, parallelization will be achieved by employing multiple consumers that read from the identical topic in a load-balanced manner. On this approach, messages are randomly assigned to consumers for processing, potentially leading to a distinct order of processing in comparison with the order of receiving.
Alternatively, log-based message brokers akin to Apache Kafka mix the sturdiness of database storage with the low-latency notification capabilities of messaging systems. They utilize a partitioned-log structure, where each partition represents an append-only sequence of records stored on disk. This design enables the re-reading of old messages. Load balancing in Kafka is achieved by assigning a consumer to every partition and in this manner, the order of message processing aligns with the order of receiving, however the variety of consumers is proscribed to the variety of partitions available.
Stream processing involves performing actions on a stream, akin to processing a stream and generate a brand new one, storing event data in a database, or visualizing data on a dashboard. Stream analytics is a standard use case where we aggregate information from a sequence of events inside an outlined time window. Tumbling windows (non-overlapping) and hopping windows (overlapping) are popular window types utilized in stream analytics. Examples of stream analytics use cases will be simply counting the variety of events within the previous hour, or applying a posh time-series prediction model on events.
Stream analytics faces the challenge of distinguishing between event creation time (event time) and event processing time because the processing of events may introduce delays because of queuing or network issues. Defining windows based on processing time is an easier approach, especially when the processing delay is minimal. Nonetheless, defining windows based on event time poses a greater challenge. It’s because it’s uncertain whether all the information inside a window has been received or if there are still pending events. Hence, it becomes essential to handle straggler events that arrive after the window has been considered complete.
In applications involving complex stream analytics, akin to time-series prediction, it is usually essential to process a sequence of ordered messages inside a window as a cohesive unit. In this example, the messages exhibit strong inter-dependencies, making it difficult to acknowledge and take away individual messages from the broker. Consequently, a log-based message broker presents itself as a preferable option for utilization. Moreover, parallel processing might not be feasible or overly intricate to implement on this context, as all of the messages inside a window have to be considered together. Nonetheless, applying a posh ML model to the information will be computationally intensive, necessitating another approach to parallel processing. This document goals to propose an answer for effectively employing a resource-intensive machine learning model in a high-throughput stream processing application.
There are several stream processing libraries available, akin to Apache Kafka Streams, Flink, Samza, Storm, and Spark Streaming. Each of those libraries has its own strengths and weaknesses, but lots of them usually are not particularly Python-friendly. Nonetheless, Faust is a Python-based stream processing library that use Kafka because the underlying messaging system and goals to bring the ideas of Kafka Streams to the Python ecosystem. Unfortunately, Faust’s documentation will be confusing, and the source code will be difficult to understand. For example, understanding how windows work in Faust is difficult without referring to the complex source code. Moreover, there are many open issues within the Faust (v1) and the Faust-Streaming (v2) repositories, and resolving these issues is just not a simple process. In the next, essential knowledge about Faust’s underlying structure will probably be provided, together with code snippets to help in effectively utilizing the Faust library.
To utilize Faust, the initial step involves creating an App and configuring the project by specifying the broker and other essential parameters. One in all the useful parameters is the table_cleanup_interval
that will probably be discussed later.
app = faust.App(
app_name,
broker=broker_address,
store=rocksdb_address,
table_cleanup_interval=table_cleanup_interval
)
Then you definitely can define a stream processor using the agent decorator to eat from a Kafka topic and do something for each event it receives.
schema = faust.Schema(value_serializer='json')
topic = app.topic(topic_name, schema=schema)@app.agent(topic)
async def processor(stream):
async for event in stream:
print(event)
For keeping state in a stream processor, we are able to use Faust Table. A table is a distributed in-memory dictionary, backed by a Kafka changelog topic. You may consider table
as a python dictionary that will be set inside a stream processor.
table = app.Table(table_name, default=int)@app.agent(topic)
async def processor(stream):
async for event in stream:
table[key] += event
Faust Windows
Let’s consider a time-series problem where every second, we require samples from the previous 10 seconds to predict something. So we want 10s overlapping windows with 1s overlap. To realize this functionality, we are able to utilize Faust windowed tables that are inadequately explained within the Faust documentation and infrequently result in confusion.
Ideally, a stream processing library should robotically perform the next tasks:
- Maintain a state for every window (list of events);
- Discover the relevant windows for a brand new event (the last 10 windows);
- Update the state of those windows (append the brand new event to the top of their respective lists);
- Apply a function when a window is closed, using the window’s state as input.
Within the code snippet below, you’ll be able to observe the suggested approach within the Faust documentation for constructing a window and utilizing it in a streaming processor (confer with this instance from the Faust library):
# Based on Fuast example
# Don't use thiswindow_wrapper = app.Table(
table_name, default=list, on_window_close=window_close
).hopping(
10, 1, expires=expire_time
)
@app.agent(topic)
async def processor(stream):
async for event in stream:
window_set = window_wrapper[key]
prev = window_set.value()
prev.append(event)
window_wrapper[key] = prev
Within the provided code, the article window_wrapper
is an instance of the WindowWrapper class that gives a number of the required functionalities. The expires
parameter determines the duration of a window’s lifespan, ranging from its creation. Once this specified time has elapsed, the window is taken into account closed. Faust performs periodic checks on the table_cleanup_interval
duration to discover closed windows. It then applies the window_close
function, using the window state as its input.
Once you call window_wrapper[key]
it returns an object of type WindowSet, which internally accommodates all of the relevant windows. By calling window_set.value()
, you’ll be able to access the state of the most recent window, and it’s also possible to access previous window states by calling window_set.delta(30)
which supplies the state at 30 seconds ago. Moreover, you’ll be able to update the state of the latest window by assigning a brand new value to window_wrapper[key]
. This approach works advantageous for tumbling windows. Nonetheless, it doesn’t work for hopping windows where we want to update the state of multiple windows.
[Faust Documentation:] At this point, when accessing data from a hopping table, we all the time access the most recent window for a given timestamp and we have now no way of modifying this behavior.
While Faust provides support for maintaining the state of windows, identifying relevant windows, and applying a function on closed windows, it doesn’t fully address the third functionality which involves updating the state of all relevant windows. In the next, I propose a brand new approach for utilizing Faust windows that encompasses this functionality as well.
Windows Reinvented
Comprehending the functionality and operation of Faust windows proved difficult for me until I delved into the source code. Faust windows are built upon an underlying Faust table, which I’ll confer with because the inner table moving forward. Surprisingly, the Faust documentation doesn’t emphasize the inner table or provide a transparent explanation of its role in implementing windows. Nonetheless, it’s essentially the most crucial component within the window implementation. Due to this fact, in the next section, I’ll begin by defining the inner table after which proceed to debate the window wrappers.
inner_table = app.Table(
table_name, default=list, partitions=1, on_window_close=window_close
)# for tumbling window:
window_wrapper = inner_table.tumbling(
window_size, key_index=True, expires=timedelta(seconds=window_size)
)
# for hopping window:
window_wrapper = inner_table.hopping(
window_size, slide, key_index=True, expires=timedelta(seconds=window_size)
)
Let’s now examine how Faust handles the primary and second functionalities (keeping state and identifying relevant windows). Faust utilizes the concept of a window range, represented by an easy (start, end) tuple, to find out which windows are related to a given timestamp. If the timestamp falls throughout the start and end times of a window, that window is taken into account relevant. Faust creates a record throughout the inner table using a key composed of the pair (key, window range) and updates it accordingly.
Nonetheless, when invoking window_wrapper[key]
, it merely retrieves the current window range by counting on the present timestamp, and subsequently returns inner_table[(key, current_window_range)]
. This poses a difficulty since utilizing the window wrapper only impacts essentially the most recent window, even when the event pertains to multiple windows. Due to this fact, in the next function, I opted to employ the inner_table
as a substitute. This permits me to acquire all of the relevant window ranges and directly update each associated window using the inner table:
async def update_table(events, key, window_wrapper, inner_table):
t = window_wrapper.get_timestamp()
for window_range in inner_table._window_ranges(t):
prev = inner_table[(key, window_range)]
prev.extend(events)
inner_table[(key, window_range)] = prev
Inside this function, the initial line is accountable for locating the present timestamp, while inner_table._window_ranges(t)
retrieves all pertinent window ranges for that timestamp. We subsequently proceed to update each relevant window inside a for loop. This approach allows us to utilize the update_table
function for each tumbling and hopping windows effectively.
It’s value noting that update_table
accepts a listing of events as a substitute of only one, and employs the extends
method as a substitute of append
. This alternative is motivated by the incontrovertible fact that when attempting to update a table incrementally inside a high-throughput pipeline, you frequently encounter the warning “producer buffer full size” which significantly hampers efficiency. Consequently, it’s advisable to update tables in mini-batches, as demonstrated in the next:
@app.agent(topic)
async def processor(stream):
batch = []
async for event in stream:
batch.append(event)
if len(batch) >= 200:
await update_table(batch, key, window_wrapper, inner_table)
batch = []
Multiprocessing
In Faust, each employee operates with a single process. Consequently, if the processing of a window is computationally intensive, it may possibly end in a delay which is unacceptable for real-time applications. To deal with this issue, I propose leveraging the Python multiprocessing library throughout the window_close
function. By doing so, we are able to distribute the processing load across multiple processes and mitigate the delay brought on by heavy window processing, ensuring higher real-time performance.
from multiprocessing import Poolasync def window_close(key, events):
pool.apply_async(compute, (events,), callback=produce)
def compute(events):
# implement the logic here
return result
def produce(result):
if isinstance(result, Exception):
print(f'EXCEPTION {result}')
return
# producer is a KafkaProducer
producer.send(topic_name, value=result, key='result'.encode())
pool = Pool(processes=num_process)
Within the provided code, a pool of processes is created. Throughout the window_close
function, pool.apply_async
is utilized to delegate the job to a brand new employee and retrieve the result. A callback function is invoked when the result is prepared.
On this specific code, the result is distributed to a brand new Kafka topic using a Kafka producer. This setup enables the creation of a sequence of Kafka topics, where each topic serves because the input for an additional stream processor. This permits for a sequential flow of information between the Kafka topics, facilitating efficient data processing and enabling the chaining of multiple stream processors.
I would really like to briefly discuss my negative experience with the Google Cloud Platform (GCP). GCP recommends using Google Pub/Sub because the message broker, Apache Beam because the stream processing library, Google Dataflow for execution, and Google BigQuery because the database. Nonetheless, after I attempted to make use of this stack, I encountered quite a few issues that made it quite difficult.
Working with Google Pub/Sub in Python proved to be slow (check this and this), leading me to desert it in favor of Kafka. Apache Beam is a well-documented library, nevertheless, using it with Kafka presented its own set of problems. The direct runner was buggy, requiring the usage of Dataflow and leading to significant time delays as I waited for machine provisioning. Moreover, I experienced issues with delayed triggering of windows, despite my unsuccessful attempts to resolve the issue (check this GitHub issue and this Stack Overflow post). Also debugging your entire system was a serious challenge because of the complex integration of multiple components, leaving me with limited control over the logs and making it difficult to pinpoint the foundation reason for issues inside Pub/Sub, Beam, Dataflow, or BigQuery. In summary, my experience with the Google Cloud Platform was marred by the slow performance of Google Pub/Sub in Python, the bugs encountered when using Apache Beam with Kafka, and the general difficulty in debugging the interconnected systems.