February 15, 2024SHOWCASE · DATA-PIPELINE

Kafka ETL: Processing event streams in Python

In this tutorial, you will learn how to write a Kafka ETL in Python using Pathway, an open-source event processing framework. You will use Pathway connectors and transformations to extract, transform, and load event streams across multiple Kafka topics.

Python's simplicity and ease of use make it a popular choice for data processing tasks. However, Kafka, one of the most widely used tools to build ETL pipelines, is implemented in Java and mainly used by Java and Scala users. Pathway, a Python stream processing framework with a Rust engine, makes building ETL pipeline over Kafka in pure Python simple without compromising on performance.

Imagine you've been hired by a fraud-detection company. The company monitors the logs from different servers and raises an alert whenever a pattern is suspicious. Your job is to manage your company's data and ensure the data received by the data science team is clean and ready to use. A new challenge emerges when the company expands its monitoring to servers in Paris. Suddenly, the times you're receiving are no longer uniform. From New York, they bear the Eastern Standard Time signature, while those from Paris are stamped with Central European Time.

Until now, the times you received were from servers in New York:

2024-02-05 10:01:52.884548 -0500

And this is what the new times from the new servers in Paris look like:

2024-02-05 16:02:34.934749 +0100

You must unify these disparate times into a single format to maintain data integrity.

Enter ETL, a three-step process used to clean and unify the data before sharing it for training a model or doing business analytics. First, you need to extract (E) the data, then transform (T) it, before finally loading (L) it. This task is crucial, especially when data comes from different data sources, to ensure the data used within the company follows a given data type and various requirements.

ETL

This article shows how to create a Kafka ETL pipeline in Python to extract data from multiple topics and transform it to load it into a combined Kafka topic. More precisely, you will learn how to do the different ETL steps with Pathway:

  • (E) extracting different data streams from Kafka using Pathway Kafka input connector,
  • (T) converting the times with varying time zones into timestamps using Pathway datetime module,
  • (T) concatenating the resulting data streams using the Pathway concatenation function,
  • (L) loading the final data stream back into Kafka.

The full Pathway script is here for those in a hurry.

ETL architecture with Kafka in, Kafka out

The logs are sent to two distinct Kafka topics, one per time zone. You want to use Pathway to do the Kafka ETL: connect to the topics, extract the data, do the time zone conversion, and concatenate the resulting data streams into one. Finally, you want to return the result to a third Kafka topic.

Kafka ETL with Pathway

Docker containers

The project uses several docker containers:

  • one for Kafka
  • one for Zookeeper
  • one for Pathway, for the ETL
  • one imitating company servers, creating the data. It will be called "producer".

Kafka and Zookeeper are directly managed in the docker-compose.yml file. Pathway and the producer are managed using dedicated docker files. Here is the final architecture of the project:

.
├── pathway-src/
│   ├── Dockerfile
│   ├── etl.py
│   └── read-results.py
├── producer-src/
│   ├── create-stream.py
│   └── Dockerfile
├── docker-compose.yml
└── Makefile

You can find the whole project on GitHub.

Data generation

The data is generated using a Python script. Every second, a new log is generated with the current datetime. The message is randomly associated with one of the two time zones and sent to the associated Kafka topic. To simplify the logs identification, the log also contains a field "message" with the log's number.

create-stream.py
timezone1 = ZoneInfo("America/New_York")
timezone2 = ZoneInfo("Europe/Paris")

producer1 = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    security_protocol="PLAINTEXT",
    api_version=(0, 10, 2),
)
producer2 = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    security_protocol="PLAINTEXT",
    api_version=(0, 10, 2),
)

def send_message(timezone: ZoneInfo, producer: KafkaProducer, i: int):
    timestamp = datetime.now(timezone)
    message_json = {"date": timestamp.strftime(str_repr), "message": str(i)}
    producer.send(topic1, (json.dumps(message_json)).encode("utf-8"))

for i in range(input_size):
    if random.choice([True, False]):
        send_message(timezone1, producer1, i)
    else:
        send_message(timezone2, producer2, i)
    time.sleep(1)

You can find the entire file here.

ETL with Pathway: Concatenating data streams

Now that the setup is ready, Pathway can manage all the rest!

Extract from Kafka

Pathway provides connectors to connect and extract data from different data sources. You can connect to Kafka using Pathway Kafka connectors.

In Pathway, data is represented using tables and you need to define the data types of the extracted data using schemas:

class InputStreamSchema(pw.Schema):
    date: str
    message: str

You need one connector per topic, but the connectors can use the same settings.

rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
    "auto.offset.reset": "earliest",
}

timestamps_timezone_1 = pw.io.kafka.read(
    rdkafka_settings,
    topic="timezone1",
    format="json",
    schema=InputStreamSchema,
    autocommit_duration_ms=100,
)

timestamps_timezone_2 = pw.io.kafka.read(
    rdkafka_settings,
    topic="timezone2",
    format="json",
    schema=InputStreamSchema,
    autocommit_duration_ms=100,
)

You can learn more about Pathway Kafka connectors in our dedicated tutorial.

Transform into a single data stream

Now that you have the logs, you need to do the conversion. Dealing with time can be very frustrating. Fortunately, Pathway provides all the datetime functions to make it easy.

Here is how to concatenate the datetime from different time zones into a single data stream with timestamps with Pathway:

def convert_to_timestamp(table):
    table = table.select(
        date=pw.this.date.dt.strptime(fmt=str_repr, contains_timezone=True),
        message=pw.this.message,
    )
    table_timestamp = table.select(
        timestamp=pw.this.date.dt.timestamp(unit="ms"),
        message=pw.this.message,
    )
    return table_timestamp


timestamps_timezone_1 = convert_to_timestamp(timestamps_timezone_1)
timestamps_timezone_2 = convert_to_timestamp(timestamps_timezone_2)

timestamps_unified = timestamps_timezone_1.concat_reindex(timestamps_timezone_2)

First, since both values (date and message) are strings, you need to convert the date into a datetime using the strptime function of the datetime module. The time zone is automatically determined in this step. Then, you can convert it to a timestamp (independent of time zone) using the timestamp function. Finally, you can concatenate the two tables using the concat_reindex function.

This is a very simple example of what Pathway can do. Pathway supports more complex operations such as stateful (groupby, windows, etc.) and temporal (e.g., ASOF join) operations.

Load to Kafka

Now that you have successfully transformed the data, you still need to send it back to Kafka. Using a Kafka output connector should do the trick:

pw.io.kafka.write(
    timestamps_unified, rdkafka_settings, topic_name="unified_timestamps", format="json"
)

The settings are the same as the input connectors since the data is sent to the same Kafka instance.

Run it

Congratulations, your Kafka ETL pipeline is ready! Until now, you were building the pipeline: defining the connectors and the different operators. No data is actually loaded into the system so if you were to run your Pathway code now, the pipeline would be built, but there would be no computation. To run the pipeline by ingesting data, you need to use the Pathway run function:

pw.run()

Now the input connectors will connect and load the data! Thanks to its powerful Rust engine, Pathway computations are extremely fast. You are not bound by the usual limits of Python, Pathway natively supports multithreading, multiprocessing and distributed computing.

If you are curious about how the pipeline works, don't hesitate to read our article about Pathway core concepts.

Read the output

The unified logs are now available on Kafka's unified_timestamps topic. You can access this topic using your favorite tool.

Nonetheless, you can easily use Pathway to check everything works well. Create a file read-results.py in pathway-src/ to access the data:

read-results.py
table = pw.io.kafka.read(
    rdkafka_settings,
    topic=topic_name,
    schema=InputStreamSchema,
    format="json",
    autocommit_duration_ms=100,
)
pw.io.csv.write(table, "./results.csv")
pw.run()

The entire script is available here. This script reads the data and outputs it as a CSV file:

results.csv
timestamp,message,time,diff
1707217879632.242,"11",1707217879944,1
1707217876629.236,"8",1707217879944,1
1707217872469.24,"4",1707217879944,1
1707217868355.006,"0",1707217879944,1
1707217870466.797,"2",1707217879944,1
1707217873626.241,"5",1707217879944,1
1707217869465.5308,"1",1707217879944,1
1707217871468.065,"3",1707217879944,1
1707217874627.24,"6",1707217879944,1
1707217877630.239,"9",1707217879944,1
1707217875628.488,"7",1707217879944,1
1707217878631.242,"10",1707217879944,1
1707217880633.24,"12",1707217880644,1
1707217881634.5,"13",1707217881644,1
1707217882635.752,"14",1707217882644,1

The times have been uniformized and are now timestamps. 🎉

You can learn more about the output here.

Full solution

The entire project is publicly available on GitHub. Here is the complete etl.py file:

etl.py
import time

import pathway as pw

rdkafka_settings = {
    "bootstrap.servers": "kafka:9092",
    "security.protocol": "plaintext",
    "group.id": "0",
    "session.timeout.ms": "6000",
    "auto.offset.reset": "earliest",
}

str_repr = "%Y-%m-%d %H:%M:%S.%f %z"


class InputStreamSchema(pw.Schema):
    date: str
    message: str


timestamps_timezone_1 = pw.io.kafka.read(
    rdkafka_settings,
    topic="timezone1",
    format="json",
    schema=InputStreamSchema,
    autocommit_duration_ms=100,
)

timestamps_timezone_2 = pw.io.kafka.read(
    rdkafka_settings,
    topic="timezone2",
    format="json",
    schema=InputStreamSchema,
    autocommit_duration_ms=100,
)


def convert_to_timestamp(table):
    table = table.select(
        date=pw.this.date.dt.strptime(fmt=str_repr, contains_timezone=True),
        message=pw.this.message,
    )
    table_timestamp = table.select(
        timestamp=pw.this.date.dt.timestamp(unit="ms"),
        message=pw.this.message,
    )
    return table_timestamp


timestamps_timezone_1 = convert_to_timestamp(timestamps_timezone_1)
timestamps_timezone_2 = convert_to_timestamp(timestamps_timezone_2)

timestamps_unified = timestamps_timezone_1.concat_reindex(timestamps_timezone_2)

pw.io.kafka.write(
    timestamps_unified, rdkafka_settings, topic_name="unified_timestamps", format="json"
)

pw.run()

Going further with Pathway

Congratulations! You are now ready to do Kafka ETL with Pathway.

Your setup probably differs slightly, and your ETL pipeline may require different operators. Pathway offers many connectors for extracting and loading the data from and to various sources. In addition to standard table operations, Pathway also supports temporal operations such as ASOF joins and interval joins.

Don't hesitate to take a look at Pathway documentation and reach out to us on Discord if you don't find the operator you are looking for.

Olivier Ruas

Algorithm and Data Processing Magician

ETLKafkadatetimetime zonetimezoneconcat_reindex
Share this article
Share new articles with me each month

Comments