Improving a real-time streaming project with ksqlDB
In a previous article, the components of a real-time streaming project that consumes and processes smartphone sensor data with FastAPI, Kafka, QuestDB, and Docker, were explored. This project was a primary pass at implementing an architecture that may move streaming data from smartphones, through a Kafka log, and right into a time series database where the information could be readily queried and processed. The top product was a dashboard that polled the database and displayed sensor readings in near real-time:
One criticism of the project was the introduction of unnecessary latency because of writing data from Kafka to the database, and querying the database to display essentially the most recent sensor readings. When our primary objective is to investigate data in near real-time, writing to and reading from a database becomes inefficient.
That is one among the issues that ksqlDB was created to resolve. As an alternative of writing data to the database and querying it for evaluation, ksqlDB enables direct processing and evaluation of information streams, eliminating the necessity to persist data in a database before accessing it.
This text will expand on the previous by introducing ksqlDB for querying and processing streaming data. Unlike traditional database polling, implementing push queries in ksqlDB significantly reduces latency within the dashboard and simplifies the backend infrastructure. All code used to construct this project is obtainable on GitHub:
The target of this project is similar as before: to develop a real-time dashboard that visualizes sensor data. Nonetheless, on this iteration, our focus is on minimizing the perceptible latency between the phone and the dashboard by harnessing the ability of ksqlDB. Here’s what the brand new dashboard should appear to be:
Tri-axial accelerometer data from the smartphone is shipped to a FastAPI app, written to Kafka, queried with ksqlDB, and displayed within the dashboard. Notice how quickly the plot responds to the phone’s movement — the delay is sort of undetectable.
This project also supports streaming from multiple smartphones:
The architecture for this project is less complicated than before because QuestDB, and its consumer, are not any longer required to get data to the dashboard.
Each smartphone sends sensor readings (accelerometer, gyroscope, and magnetometer) via POST request to a FastAPI application (the producer). The producer reformats the request body to a ksqlDB-compatible JSON format and asynchronously writes sensor readings to a Kafka topic. Once sensor data arrives within the Kafka topic, it might be readily queried with ksqlDB.
To acquire a continuous stream of sensor data, the client can establish a server-sent event (SSE) reference to the backend (a FastAPI application). The backend initiates a push query through a ksqlDB API that constantly sends sensor data to the frontend.
Here’s the directory for the project:
├── dashboard_backend
│ ├── Dockerfile
│ ├── app
│ │ ├── core
│ │ │ ├── config.py
│ │ │ └── utils.py
│ │ ├── db
│ │ │ └── data_api.py
│ │ ├── foremost.py
│ │ └── models
│ │ └── sensors.py
│ ├── entrypoint.sh
│ └── requirements.txt
├── dashboard_frontend
│ ├── Dockerfile
│ ├── app
│ │ ├── foremost.py
│ │ ├── static
│ │ │ └── js
│ │ │ └── foremost.js
│ │ └── templates
│ │ └── index.html
│ ├── entrypoint.sh
│ └── requirements.txt
├── producer
│ ├── Dockerfile
│ ├── app
│ │ ├── __init__.py
│ │ ├── core
│ │ │ ├── config.py
│ │ │ └── utils.py
│ │ ├── foremost.py
│ │ └── schemas
│ │ └── sensors.py
│ ├── entrypoint.sh
│ └── requirements.txt
├── docker-compose.yml
Three FastAPI applications are written to facilitate data flow and visualization— the producer, the dashboard frontend, and the dashboard backend. These apps, together with Kafak and ksqlDB, are orchestrated via docker-compose:
version: '3.8'services:
zookeeper:
image: bitnami/zookeeper:latest
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_STREAMS_COMMIT_INTERVAL_MS: 100
KSQL_KSQL_IDLE_CONNECTION_TIMEOUT_SECONDS: 600
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- kafka
- ksqldb-server
entrypoint: /bin/sh
tty: true
producer:
construct:
context: ./producer
dockerfile: Dockerfile
command: uvicorn foremost:app --workers 1 --host 0.0.0.0 --port 8000
ports:
- 8000:8000
env_file:
- .env
depends_on:
- kafka
- zookeeper
dashboard_backend:
construct:
context: ./dashboard_backend
dockerfile: Dockerfile
command: uvicorn foremost:app --workers 1 --host 0.0.0.0 --port 5000
ports:
- 5000:5000
env_file:
- .env
depends_on:
- ksqldb-server
dashboard_frontend:
construct:
context: ./dashboard_frontend
dockerfile: Dockerfile
command: uvicorn foremost:app --workers 1 --host 0.0.0.0 --port 4200
ports:
- 4200:4200
env_file:
- .env
depends_on:
- dashboard_backend
Notice the 4 services not explicitly written in code (thankfully): Zookeeper, Kafka, ksqlDB, and Kafka-CLI. These services work along with the producer and dashboard to create the project. Let’s explore these components in additional detail.
Just like before, the producer is a FastAPI app that accepts data sent from smartphones (over POST request) and writes to a Kafka log. Here’s the directory structure:
producer
├── Dockerfile
├── app
│ ├── __init__.py
│ ├── core
│ │ ├── config.py
│ │ └── utils.py
│ ├── foremost.py
│ └── schemas
│ └── sensors.py
├── entrypoint.sh
└── requirements.txt
We won’t undergo every file within the producer directory since every thing is obtainable on GitHub. As an alternative, let’s take a have a look at foremost.py
(the driving script of the producer API):
# producer/app/foremost.py
import json
from fastapi import FastAPI
import asyncio
from aiokafka import AIOKafkaProducer
from schemas.sensors import SensorReading, SensorResponse
from core.config import app_config
from core.utils import flatten_dict
from loguru import logger# Instantiate FastAPI app
app = FastAPI(title=app_config.PROJECT_NAME)
# Create the event loop to make use of async programming
loop = asyncio.get_event_loop()
# Instatiate the Kafka producer object
producer = AIOKafkaProducer(
loop=loop,
client_id=app_config.PROJECT_NAME,
bootstrap_servers=app_config.KAFKA_URL
)
@app.on_event("startup")
async def startup_event():
await producer.start()
await producer.send(app_config.TOPIC_NAME, json.dumps({'status':'ready'}).encode("ascii"))
@app.on_event("shutdown")
async def shutdown_event():
await producer.stop()
@app.post("/phone-producer/")
async def kafka_produce(data: SensorReading):
"""
Produce a message containing readings from a smartphone sensor to Kafka.
Parameters
----------
data : SensorReading
The request body containing sensor readings and metadata.
Returns
-------
response : SensorResponse
The response body corresponding to the processed sensor readings
from the request.
"""
# Extract the messageId, deviceId, and sessionId
message_info = data.dict().copy()
message_info.pop('payload')
# Write each sensor reading within the payload to kafka
for sensor_reading in data.dict()['payload']:
kafka_message = {**flatten_dict(sensor_reading), **message_info}
await producer.send(app_config.TOPIC_NAME,
json.dumps(kafka_message).encode("ascii"))
response = SensorResponse(
messageId=data.messageId,
sessionId=data.sessionId,
deviceId=data.deviceId
)
logger.info(response)
return response
The reason of this code is essentially the identical as within the previous article. The foremost difference is that sensor readings within the request payload must be reformatted before they’re written to Kafka. This latest format allows the sensor data to be queried in a SQL-like fashion by ksqlDB. Each POST request from the phone sends JSON data that appears much like this:
{"messageId": 20,
"sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976",
"deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b",
"payload": [{"name": "accelerometeruncalibrated",
"time": "1671406719721160400",
"values": {"z": -0.9372100830078125,
"y": -0.3241424560546875,
"x": 0.0323486328125}},
{"name": "magnetometeruncalibrated",
"time": "1671406719726579500",
"values": {"z": -5061.64599609375,
"y": 591.083251953125,
"x": 3500.541015625}},
{"name": "gyroscopeuncalibrated",
"time": "1671406719726173400",
"values": {"z": -0.004710599314421415,
"y": -0.013125921599566936,
"x": 0.009486978873610497}},
...
]}
Individual sensor readings are situated under “payload” and are written to Kafka within the kafka_produce()
route:
# Extract the messageId, deviceId, and sessionId
message_info = data.dict().copy()
message_info.pop('payload')# Write each sensor reading within the payload to kafka
for sensor_reading in data.dict()['payload']:
kafka_message = {**flatten_dict(sensor_reading), **message_info}
await producer.send(app_config.TOPIC_NAME,
json.dumps(kafka_message).encode("ascii"))
The flatten_dict()
function situated in producer/app/core/utils.py
takes a raw sensor message within the payload, for example:
{
"name": "accelerometeruncalibrated",
"time": "1683555956851304200",
"values": {
"z": -1.0012664794921875,
"y": -0.467315673828125,
"x": -0.00494384765625
}
}
And reformats the message to be compatible with a ksqlDB schema — that is what’s written to Kafka:
{
"name": "accelerometeruncalibrated",
"time": "1683555956851304200",
"values_z": -1.0012664794921875,
"values_y": -0.467315673828125,
"values_x": -0.00494384765625,
"messageId": 35,
"sessionId": "c931f349-faf5-4e45-b09f-c623a76ef93a",
"deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b"
}
Each entry within the reformatted sensor reading could be regarded as a column that could be queried by ksqlDB. More on this in the subsequent section.
At this point, sensor readings can flow from smartphones to the FastAPI producer, where they’re written to Kafka in a ksqlDB-compatible format. ksqlDB can then query recent and historical data in Kafka.
ksqlDB is an open-source streaming engine designed to process, analyze, and transform real-time data streams from Kafka using a SQL-like syntax. Put simply, ksqlDB enables interaction with data in Kafka topics using familiar relational database concepts like tables, queries, materialized views, queries, joins, and aggregations. The capabilities of ksqlDB are extensive and won’t be fully covered in this text, however the documentation provides good definitions of the core concepts:
For this project, a stream is created over the subject that stores smartphone sensor readings (the information written by the producer):
CREATE STREAM smartphone_sensor_stream (
name VARCHAR,
time BIGINT,
values_x DOUBLE,
values_y DOUBLE,
values_z DOUBLE,
messageId BIGINT,
sessionId VARCHAR,
deviceId VARCHAR
) WITH (
KAFKA_TOPIC = 'smartphone-sensor-data',
VALUE_FORMAT = 'JSON'
);
The above ksqlDB statement creates a stream, smartphone_sensor_stream
, that could be used to question sensor readings written to the smartphone-sensor-data
Kafka topic. Libraries like ksql-python could be leveraged to interface with the ksqlDB REST API and programmatically execute queries:
from ksql import KSQLAPI # pip install ksql# Where ksqldb is running
KSQL_URL = "http://localhost:8088"
# Instantiate the ksqldb API object
client = KSQLAPI(KSQL_URL)
# Create the "smartphone_sensor_stream" stream over the required topic
client.create_stream(table_name="smartphone_sensor_stream",
columns_type=["name varchar",
"time bigint",
"values_x double",
"values_y double",
"values_z double",
"messageId bigint",
"sessionId varchar",
"deviceId varchar"
],
topic="smartphone-sensor-data",
value_format="JSON")
A push query is executed to retrieve sensor readings as they’re written to the subject. In essence, a push query opens a long-lived connection that sends updates to a client any time latest data is received in the subject. This makes push queries a great selection for streaming smartphone data.
select deviceId,
time,
values_x,
values_y,
values_z
from smartphone_sensor_stream
where name = 'accelerometeruncalibrated'
emit changes
The above query “pushes” the device ID, time, and accelerometer values from smartphone_sensor_stream
each time the stream is updated with data. This could be executed with ksql-python:
from ksql import KSQLAPI
from typing import Generator# Where ksqldb is running
KSQL_URL = "http://localhost:8088"
# Instantiate the ksqldb API object
client = KSQLAPI(KSQL_URL)
# Write a push query
push_query = '''select deviceId,
time,
values_x,
values_y,
values_z
from smartphone_sensor_stream
where name = 'accelerometeruncalibrated'
emit changes
'''
# Get the KSQL stream generator
sensor_push_stream: Generator = client.query(push_query, use_http2=True)
# Loop through messages within the generator and print them as they're received
for raw_message in sensor_push_stream:
print(raw_message)
The ksql-python client returns a generator object that yields messages as they’re written to Kafka and browse from the stream. Unlike a standard for loop that iterates over a fixed-size array, this loop will proceed executing so long as data is received within the stream.
The messages yielded from the query look much like the next:
[
'86a5b0e3-6e06-40e2-b226-5a72bd39b65b', # Device ID
1684615020438850600, # Timestamp of the sensor recording
0.993927001953125, # x value
-0.5736083984375, # y value
-0.1787261962890625 # z value
]
One message is returned at a time, each of which could be regarded as a row within the smartphone_sensor_stream
ksql stream. Remember that ksqlDB can perform more complex queries similar to aggregates and joins, but for this project, only a basic select
is required.
The dashboard backend is a FastAPI app that accepts SSE requests for sensor data streams. Once an SSE connection is requested, a ksqlDB push query is opened and constantly sends messages to the frontend as they arrive in Kafka. The directory structure for the dashboard backend looks like this:
dashboard_backend
├── Dockerfile
├── app
│ ├── core
│ │ ├── config.py
│ │ └── utils.py
│ ├── db
│ │ └── data_api.py
│ ├── foremost.py
│ └── models
│ └── sensors.py
├── entrypoint.sh
└── requirements.txt
Let’s first take a have a look at data_api.py
— the interface between ksqlDB and the dashboard backend:
# data_api.pyfrom retry import retry
from ksql import KSQLAPI
from models.sensors import SensorName
from typing import Generator
@retry()
def create_ksql_connection(url: str) -> KSQLAPI:
"""
Create a connection to a KSQL server using the provided URL.
Parameters
----------
url : str
The URL of the KSQL server to hook up with.
Returns
-------
KSQLAPI
An instance of the `KSQLAPI` class representing the connection
to the KSQL server.
"""
return KSQLAPI(url)
def create_ksql_device_stream(client: KSQLAPI,
stream_name: str,
topic_name: str) -> None:
"""
Creates a brand new device stream in KSQL server if it doesn't exist already.
Parameters:
-----------
client : KSQLAPI
A client instance of the KSQLAPI class to attach with KSQL server.
stream_name : str
The name of the device stream to create.
topic_name : str
The name of the Kafka topic to associate with the device stream.
Returns:
--------
None
Raises:
-------
KSQLServerError
If there may be an error while creating the stream in KSQL server.
"""
# Get the present streams
curr_streams = client.ksql('show streams')
curr_stream_names = [stream['name'].lower()
for stream in curr_streams[0]['streams']]
# If the device stream doesn't exist, create it
if stream_name.lower() not in curr_stream_names:
client.create_stream(table_name=stream_name,
columns_type=['name varchar',
'time bigint',
'values_x double',
'values_y double',
'values_z double',
'messageId bigint',
'sessionId varchar',
'deviceId varchar'
],
topic=topic_name,
value_format='JSON')
def ksql_sensor_push(client: KSQLAPI,
stream_name: str,
sensor_name: SensorName) -> Generator:
"""
Generator function that constantly pushes sensor data
for a given sensor name from a KSQL server using the KSQL API client.
Parameters:
-----------
client : KSQLAPI
The KSQL API client instance used to question the KSQL server.
stream_name : str
The name of the KSQL stream to question data from.
sensor_name : SensorName
An enum value representing the name of the sensor to stream data for.
Returns:
--------
Generator:
A generator object that yields the sensor data because it is streamed in real-time.
"""
push_query = f'''
select deviceId,
time,
values_x,
values_y,
values_z
from {stream_name}
where name = '{sensor_name.value}'
emit changes
'''
sensor_push_stream: Generator = client.query(push_query, use_http2=True)
return sensor_push_stream
On this script, create_ksql_device_stream()
creates the ksqlDB stream defined within the previous section, and ksql_sensor_push()
returns a generator that yields the outcomes of the sensor data push query.
Let’s break down the components of foremost.py
— the driving script for the dashboard backend. Listed below are the dependencies:
# foremost.pyimport pandas as pd
import json
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from fastapi.requests import Request
from starlette.middleware.cors import CORSMiddleware
from core.config import app_config
from core.utils import maybe_load_json
from models.sensors import SensorName
from db.data_api import (create_ksql_connection,
create_ksql_device_stream,
ksql_sensor_push)
The first imports to notice are EventSourceResponse
(the category that implements SSE) and the functions from data_api.py
. Next a KSQLAPI object and a FastAPI app are instantiated:
# foremost.py...
# Instantiate KSQLAPI object
KSQL_CLIENT = create_ksql_connection(app_config.KSQL_URL)
# Create the KSQL device stream if it doesn't exist
create_ksql_device_stream(
KSQL_CLIENT, app_config.STREAM_NAME, app_config.TOPIC_NAME
)
# Instantiate FastAPI app
app = FastAPI()
# Configure middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Lastly, the SSE endpoint that sends sensor data from ksqlDB to the frontend is defined:
# foremost.py...
# An SSE endpoint that pushes sensor data from KSQLDB to the client
@app.get("/chart-data/{client_id}")
async def message_stream(request: Request, client_id: str):
async def event_generator():
while True:
# If the client closes the connection, stop sending events
if await request.is_disconnected():
break
try:
# Get the KSQL stream generator
sensor_push_stream = ksql_sensor_push(
KSQL_CLIENT, app_config.STREAM_NAME, SensorName.ACC)
for raw_message in sensor_push_stream:
# If client closes connection, stop sending events
if await request.is_disconnected():
break
# Check if the raw message is the right format
message = maybe_load_json(raw_message)
# If the message is in the right format (i.e. an inventory),
# send to client
if isinstance(message, list):
# Format the sensor timestamp
message[1] = str(pd.to_datetime(message[1]))
# Yield the message as JSON
yield {
"event": "new_message",
"id": "message_id",
"retry": 1500000,
"data": json.dumps(message)
}
except Exception as e:
if await request.is_disconnected():
break
proceed
return EventSourceResponse(event_generator())
The message_stream()
endpoint accepts GET requests that open long-lived connections through which the backend can constantly send sensor data. Inside message_stream()
, a coroutine called event_generator()
is defined and returned. The aim of event_generator()
is to create some time loop that yields sensor data messages as they’re processed by ksqlDB. The one time this loop terminates is when the client closes the connection.
The dashboard frontend is an HTML page hosted by a FastAPI application. Its purpose is to display that data is flowing through the components of this project, and is on no account a comprehensive frontend. The interface looks like this:
When the user clicks “Start Streaming”, an SSE connection is opened with the backend, and data from the push query is send constantly. All code used to create the frontend, together with instructions on the way to get every thing running, is obtainable on GitHub.
This text introduced ksqlDB as an answer to enhance the real-time streaming project that was previously built using FastAPI, Kafka, QuestDB, and Docker. The project aimed to create a dashboard that visualizes sensor data in near real-time. One in every of the challenges faced was the unnecessary latency attributable to writing data from Kafka to a database and querying the database for evaluation.
ksqlDB, a database purpose-built for stream processing, was implemented to handle this issue. As an alternative of persisting data in a database before accessing it, ksqlDB enables direct processing and evaluation of information streams in Kafka. By implementing push queries in ksqlDB, the latency within the dashboard was significantly reduced, leading to an almost undetectable delay in displaying sensor readings.
Using ksqlDB simplified the backend infrastructure and allowed for efficient evaluation of sensor readings in near real-time. Future articles will proceed making improvements on this project and enhancing its functionality with latest tools and features. As all the time, feedback is greatly appreciated. Thanks for reading!
Turn out to be a Member: https://harrisonfhoffman.medium.com/membership
Apache Kafka: https://kafka.apache.org/
Event-Driven Architectures — The Queue vs The Log: https://jack-vanlightly.com/blog/2018/5/20/event-driven-architectures-the-queue-vs-the-log
Lucidchart: https://www.lucidchart.com/
Kafka Poc using FastApi: https://github.com/GavriloviciEduard/fastapi-kafka
geo-stream-kafka: https://github.com/iwpnd/geo-stream-kafka
18 Most Popular IoT Devices in 2022: https://www.softwaretestinghelp.com/iot-devices/#:~:text=Smart%20Mobiles%2C%20smart%20refrigerators%2C%20smartwatches,there%20by%20the%20year%202022%3F
FastAPI: https://fastapi.tiangolo.com/
QuestDB: https://questdb.io/docs/
Row vs Column Oriented Databases: https://dataschool.com/data-modeling-101/row-vs-column-oriented-databases/
ksqlDB: https://docs.ksqldb.io/en/latest/