Building a Real-Time Data Streaming Pipeline using Kafka, Flink and Postgres | Stream 100K records

Kavit (zenwraight)
8 min readJan 31, 2024

For the folks who are interested in following a video, please have a look below:-

In the era of big data, organizations are increasingly turning to real-time data processing to gain instant insights and make data-driven decisions. In this comprehensive guide, we’ll walk through the process of building a robust real-time data streaming pipeline using three powerful technologies: Apache Kafka, Apache Flink, and Postgres.

Why Real-Time Data Streaming?

Real-time data streaming enables organizations to process and analyze data as it is generated, providing instantaneous insights and facilitating quicker decision-making. Whether you’re in finance, e-commerce, or IoT, a well-designed real-time data pipeline is essential for staying competitive in today’s fast-paced business landscape.

What is our Use Case?

We have 100K rows of Orders placed onto a e-commerce platform. We will be storing this data into Postgres DB. Now, we will setup a Kafka Producer to read each row one by one from our DB and then send the data to an order topic. Now, we will have a flink consumer, which will consume each data from the order topic and then based on Tumbling window of 5 seconds, will aggregate sum of cost of the orders based on category.

Prerequisites

Before we dive into the implementation, make sure you have a basic understanding of the following technologies:

  • Apache Kafka: An open-source stream processing platform that facilitates the building of real-time data pipelines and streaming applications.
  • Apache Flink: A distributed stream processing framework that enables powerful analytics and event-driven applications.
  • Postgres: A powerful, open-source relational database management system.

Step 1: Designing Postgres Schema

We will start by creating a orders table sql file, so that when the container comes up, it executes the below mentioned sql file and creates a table inside postgres container.

-- Create the 'orders' table
CREATE TABLE orders (
id INT,
customer_id INT,
category VARCHAR(255),
cost DOUBLE PRECISION,
item_name VARCHAR(255)
);

Step 2: Create Dockerfile for Postgres container

This Dockerfile path will be mentioned inside docker-compose.yml file.

FROM postgres:latest

COPY create_table.sql /docker-entrypoint-initdb.d/

Step 3: Setting up Python Data Generation

This service will load 100K records of orders into Postgres DB.

Here will be the folder structure of our service

- Dockerfile
- insert_data.py
- requirements.txt

Here is the code for insert_data.py

from sqlalchemy import create_engine, Column, Integer, String, Float, MetaData, Table
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy.sql import func
from random import randint
import time

# Replace 'your_postgres_user', 'your_postgres_password', 'your_postgres_database', and 'your_postgres_host' with your actual PostgreSQL credentials
DATABASE_URL = "postgresql+psycopg2://postgres:postgres@postgres:5432/postgres"

try:
time.sleep(5)
# SQLAlchemy engine
engine = create_engine(DATABASE_URL)

# Declare a base
Base = declarative_base()

# Create the table (if not exists)
Base.metadata.create_all(engine)

# Create a session
session = Session(engine)

# Define the 'orders' table
class Order(Base):
__tablename__ = "orders"
customer_id = Column(Integer, primary_key=True)
category = Column(String(255))
cost = Column(Float)
item_name = Column(String(255))

# Insert 100,000 rows of dummy data
for i in range(1, 100001):
order = Order(
customer_id=i,
category=f"Category {randint(1, 10)}",
cost=i * 10.5,
item_name=f"Item {i}"
)
session.add(order)

# Commit the transaction
session.commit()

# Query and print the first 5 rows
orders = session.query(Order).limit(5).all()
for order in orders:
print(order.customer_id, order.category, order.cost, order.item_name)

# Close the session
session.close()
except Exception as e:
print(e)

Step 4: Setting up Python Kafka Producer service

This service will read all the rows from Postgres DB one by one and pushes that row to the order topic for Flink service to be consumed.

Here will be the folder structure of our service:-

- Dockerfile
- python-producer.py
- requirements.txt
- wait-for-it.sh

We are using wait-for-it.sh file to run before we run our service, because this file makes sure that zookeeper and Kafka cluster comes up before getting our service online.

Code for python-producer.py :-

from sqlalchemy import create_engine, Column, Integer, String, Float, MetaData, Table
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy.sql import func
from random import randint
import time

import time
from json import dumps

from kafka import KafkaProducer

# Replace 'your_postgres_user', 'your_postgres_password', 'your_postgres_database', and 'your_postgres_host' with your actual PostgreSQL credentials
DATABASE_URL = "postgresql+psycopg2://postgres:postgres@postgres:5432/postgres"


kafka_nodes = "kafka:9092"
myTopic = "order"

def gen_data(order):

prod = KafkaProducer(bootstrap_servers=kafka_nodes, value_serializer=lambda x:dumps(x).encode('utf-8'))
my_data = {
'category': order.category,
'cost': order.cost
}
prod.send(topic=myTopic, value=my_data)
print(order.customer_id, order.category, order.cost, order.item_name)
prod.flush()

try:
time.sleep(20)
# SQLAlchemy engine
engine = create_engine(DATABASE_URL)

# Declare a base
Base = declarative_base()

# Create the table (if not exists)
Base.metadata.create_all(engine)

# Create a session
session = Session(engine)

# Define the 'orders' table
class Order(Base):
__tablename__ = "orders"
customer_id = Column(Integer, primary_key=True)
category = Column(String(255))
cost = Column(Float)
item_name = Column(String(255))

orders = session.query(Order).all()
for order in orders:
gen_data(order)

# Close the session
session.close()
except Exception as e:
print(e)

Step 5: Setting up Flink Kafka consumer

Now, that we have our Python Kafka producer setup and periodically generating pair of (customer_id, category, cost, item_name) message, we will setup our Flink consumer which will basically consume the messages and aggregate the total cost per category over the period of 5 minutes.

Now, we will be requiring four main files for our usecase.

  1. pom.xml — which will contain all the dependency required by our Main.java file.
  2. Main.java — contain main logic regarding consuming messages, deserialize our message to Order class, aggregate the total cost per category over the period of 5 minutes and then sink to console and print it.
  3. Order.java — Order class file which we will use to create Order objects based on each message that we consume.
  4. OrderDeserializationSchema — File contains logic to deserialize raw byte Kafka message to Order class instance.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.kavit</groupId>
<artifactId>flink-kafka2postgres</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.16.0</flink.version>
<jackson.version>2.13.4</jackson.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<kafka.version>3.2.0</kafka.version>
<log4j.version>2.17.2</log4j.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>8</target.java.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Adding project-specific Apache Flink dependencies in the provided scope. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.0</version>
</dependency>



<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- Adding jackson dependencies. They must be in the default scope (compile). -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- jackson needs jackson-datatype-jsr310 for Java 8 java.time.Instant support -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- This enables the WebUI during tests. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<argLine>--illegal-access=permit</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<argLine>--illegal-access=permit</argLine>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder. -->
<!-- Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Code formatter -->
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.23.0</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<java>
<googleJavaFormat>
<style>AOSP</style>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

Main.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.TopicPartition;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.functions.MapFunction;


import java.util.Arrays;
import java.util.HashSet;

public class Main {
static final String BROKERS = "kafka:9092";

public static void main(String[] args) throws Exception {
Thread.sleep(60000); // Service sleeps for first 60 seconds

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers(BROKERS)
.setProperty("partition.discovery.interval.ms", "1000")
.setTopics("order")
.setGroupId("groupdId-919292")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new OrderDeserializationSchema())
.build();

DataStreamSource<Order> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");

DataStream<Tuple2<String, Double>> sumCostAggregatorStream = kafka.keyBy(myEvent -> myEvent.category)
.window(TumblingProcessingTimeWindows.of(Time.seconds(300)))
.aggregate(new CostAggregator());

sumCostAggregatorStream.print();

env.execute("Kafka-flink-postgres");
}

public static class CostAggregator implements AggregateFunction<Order, Tuple2<String, Double>, Tuple2<String, Double>> {

@Override
public Tuple2<String, Double> createAccumulator() {
return new Tuple2<>("", 0.0);
}

@Override
public Tuple2<String, Double> add(Order event, Tuple2<String, Double> accumulator) {
return new Tuple2<>(event.category, accumulator.f1 + event.cost);
}

@Override
public Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {
return accumulator;
}

@Override
public Tuple2<String, Double> merge(Tuple2<String, Double> a, Tuple2<String, Double> b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
}
}

Conclusion

Today we looked at how to build a real time data streaming pipeline to stream 100K records using Apache Kafka, Apache Flink and Postgres.

Follow me for updates like this.

Connect with me on:-

Twitter 👦🏻:- https://twitter.com/kmmtmm92

Youtube 📹:- https://www.youtube.com/channel/UCpmw7QtwoHXV05D3NUWZ3oQ

Github 💭:- https://github.com/Kavit900

Instagram 📸:- https://www.instagram.com/code_with_kavit/

--

--