Skip to main content

Get started with Kafka

Introduction

本文將介紹如何利用容器化方式,啟動Kafka伺服器部屬,並且使用Python來做傳輸訊息(內容包含文字,或圖片),以及使用Go & Python來做解析。另外,也會使用Go來做接收資訊之後,寫入PostgreSQL資料庫。

而關於Kafka運作的原理,相信已經有許多的文章做解釋,甚至可以看官網說明。再查詢文章過程中,眾多的介紹大多是在Container內做啟動Server以外,同時也是在Container內直接做使用,包含Producer & Consumer,而相對較少介紹的部份是如何於外部(host)端做傳輸或接收,因此本文也會介紹這塊的設定。

目前有兩種base image:

  1. wurstmeister/kafka
  2. bitnami/kafka

他們的env variable會有一些不同。不過大致概念相同。

而 Python library也有兩種:

  1. kafka-python
  2. confluent-kafka-python

查了一下,簡單來說 第二個是效能比較好,有一些比較新的function,但如果只是一般使用則可以用第一個。除此,使用參數設定上也會有一些小不同。

本次介紹所使用的Repository.

Requirements

Install the package:

pip3 install kafka-python
  • Go 1.15+
  • Python 3.6+
  • Kafka broker running on localhost:9092 or as specified in the code.
  • PostgreSQL database accessible with the provided credentials.

Docker

Dockerfile
FROM python:3.11

RUN apt update && apt install -y bsdmainutils curl git mercurial make binutils bison gcc build-essential
RUN pip3 install kafka-python Pillow
RUN bash -c 'curl -s -S -L https://raw.githubusercontent.com/moovweb/gvm/master/binscripts/gvm-installer | bash'

ENV GVM_ROOT /root/.gvm
RUN ["/bin/bash", "-c", "source $GVM_ROOT/scripts/gvm && gvm install go1.18 -B && gvm use go1.18 --default"]

Start to build the image.

docker build -t chiehpower/kafka_practice:v0.1 .    

Access to the WebServer container. If you wanna implement the Go file, you can use go mod tidy to set up the relevant dependencies.

important

需要注意一點是,如果想要從外部傳輸與接收,則需要另外多開一個Port (i.e., 9093),並且 KAFKA_ADVERTISED_LISTENERSKAFKA_LISTENERS的outside IP部份,請給Container所制定的IP (173.38.0.2)。

"docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
kafka:
ipv4_address: 173.38.0.10

kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://173.38.0.2:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://kafka:9092,OUTSIDE://173.38.0.2:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_BOOTSTRAP.SERVERS: 'kafka:29091'
KAFKA_BROKER_ID: 1
networks:
kafka:
ipv4_address: 173.38.0.2

WebServer:
image: chiehpower/kafka_practice:v0.1
build: .
tty: true
volumes:
- ${PWD}:/kafka:rw
container_name: WebServer
command: bash -c "bash"
networks:
kafka:
ipv4_address: 173.38.0.3

networks:
kafka:
driver: bridge
ipam:
config:
- subnet: 173.38.0.0/16

Start all services:

docker-compose up -d

Usage

最快速驗證是否網路有通,以及Kafka伺服器是否運行正常,可以直接於容器內做測試。

  1. Start a consumer.

    docker-compose exec kafka kafka-console-consumer.sh --topic baeldung_linux --from-beginning --bootstrap-server kafka:9092
  2. Start a producer.

    docker-compose exec kafka kafka-console-producer.sh --topic baeldung_linux --broker-list kafka:9092

    You can start type any messages, and then you can check the messages in the consumer side.

Producer Part

執行Python檔案。

如果是在outside做執行,要把Port換成9093。

server = '(host IP):9093' # From outside

Producer裡面分兩段:

一塊是單純傳輸Dictionary。

from kafka import KafkaProducer
import time
from datetime import datetime
import json
import base64
import io
from PIL import Image

# server = '0.0.0.0:9092' # From inside
server = '(host IP):9093' # From outside
producer = KafkaProducer(bootstrap_servers=[server])

"""
寫入的內容
1. created: 2023/12/01 16:30:50
2. user_name: test
3. project_name: db
"""

params = {
"created": "2023-12-01 16:30:50",
"user_name": "test",
"project_name": "db",
}

# Convert JSON to string
message = json.dumps(params)
print(f"Message: {message}")
print(f"Convert to json type taking {time.time() - start}")

future = producer.send('baeldung_linux', key=b'my_key', value=bytes(message, 'utf-8'), partition=0)
result = future.get(timeout=10)
print(result)
time.sleep(0.5)

另一塊是傳輸圖片。

print("Start to send an image to Kafka...")
image_path = 'assets/test.jpeg'

with Image.open(image_path) as img:
target_width = 400
target_height = 400
resized_img = img.resize((target_width, target_height))
img.close()

resized_img.save('resized_image.jpg')
start = time.time()

buffered = io.BytesIO()
resized_img.save(buffered, format="JPEG")
resized_image_data = buffered.getvalue()

print("Original Size:", img.size)
print("Resized Size:", resized_img.size)

future = producer.send('image_topic', value=resized_image_data)
result = future.get(timeout=10)
print(result)
print(f"Sending an image takes {time.time() - start}")

producer.close()

執行檔案:

cd /kafka
python3 producer.py

Consumer Part

cd /kafka
go run client.go

如果想要把解析到的資訊儲存到PostgreSQL,請先創建一個 .env 檔案。

DB_USER=test
DB_PASSWORD=test
DB_NAME=test
DB_HOST=192.168.0.1
DB_PORT=5432

(這邊請先自行啟動PostgreSQL Server)

client_db.go
package main

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"log"
"os"
"database/sql"
"strings"
"encoding/json"
"github.com/lib/pq"
"github.com/joho/godotenv"
)

type Message struct {
Created string `json:"created"`
UserName string `json:"user_name"`
ProjectName string `json:"project_name"`
}

func getKafkaReader(kafkaURL, topic string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
})
}

func main() {
kafkaURL := "kafka:9092"
topic := "baeldung_linux"

reader := getKafkaReader(kafkaURL, topic)

defer reader.Close()

err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}

// Get environment variables
user := os.Getenv("DB_USER")
password := os.Getenv("DB_PASSWORD")
dbname := os.Getenv("DB_NAME")
host := os.Getenv("DB_HOST")
port := os.Getenv("DB_PORT")
// Connect to PostgreSQL DB
connStr := fmt.Sprintf("user=%s password=%s dbname=%s host=%s port=%s sslmode=disable", user, password, dbname, host, port)
db, err := sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
defer db.Close()

// 如果有設定這個 則會從最新的開始讀取,而不是從頭。
reader.SetOffset(kafka.LastOffset)

fmt.Println("start consuming ... !!")
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatalln(err)
}
fmt.Printf("message at topic: %v partition: %v offset: %v value: %s\n", m.Topic, m.Partition, m.Offset, string(m.Value))

// Parse the info
var message Message
err = json.Unmarshal([]byte(m.Value), &message)
if err != nil {
log.Println("Error decoding message:", err)
continue
}

// Write the info into PostgreSQL table.
query := `INSERT INTO results (created, user_name, project_name)
VALUES ($1, $2, $3)`
_, err = db.Exec(query, message.Created, message.UserName, message.ProjectName)
if err != nil {
log.Println("Error inserting message into PostgreSQL:", err)
}
}

}

Run Go檔案。

go run client_db.go

可以看一下DB內容,寫入速度其實還滿快速的。

info

沒有這行 reader.SetOffset(kafka.LastOffset) 則會一開始的offset開始,因此如果已經累積很多訊息的話,建議可以加這行。

如果是要解析圖片並儲存,則可以看這段:

client.go
package main

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"log"
"strings"
"io/ioutil"
"os"
)

func getKafkaReader(kafkaURL, topic string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
})
}

func main() {

kafkaURL := "kafka:9092"
// topic := "baeldung_linux"
topic := "image_topic"

reader := getKafkaReader(kafkaURL, topic)

defer reader.Close()

// 如果有設定這個 則會從最新的開始讀取,而不是從頭。
reader.SetOffset(kafka.LastOffset)

fmt.Println("start consuming ... !!")
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatalln(err)
}

fmt.Printf("message at topic: %v partition: %v offset: %v value: %s\n", m.Topic, m.Partition, m.Offset, string(m.Value))

// Save the image.
err = ioutil.WriteFile("received_image_client.jpg", m.Value, os.ModePerm)
if err != nil {
log.Println("Error saving image:", err)
continue
}

fmt.Println("Image received and saved successfully.")
}
}

Run Go檔案。

go run client.go

注意: 圖片大小會影響傳輸速度滿多的,建議可以先resize到小一點的圖片。