Kafka cơ bản: Cách sử dụng Kafka với Confluent & Go

congdonglinux123

Administrator
Staff member
Cập nhật kiến thức Kafka cơ bản cho các bạn mới bắt đầu. Bài viết này sẽ hướng dẫn bạn cách sử dụng Kafka với Confluent & Go. Lưu lại ngay nhé!
1717829291337.png

Cập nhật kiến thức Kafka cơ bản cho các bạn mới bắt đầu. Bài viết này sẽ hướng dẫn bạn cách sử dụng Kafka với Confluent & Go. Lưu lại ngay nhé!

Apache Kafka
(gọi tắt là Kafka) là một nền tảng phân phối sự kiện phân tán mã nguồn mở được phát triển bởi Apache Software Foundation và được viết bằng JavaScala.

Kafka được tạo ra để giải quyết những thách thức trong việc xử lý lượng dữ liệu khổng lồ trong thời gian thực (real-time), cho phép các ứng dụng xuất bản (publish), đăng ký (subscribe), lưu trữ (store) và xử lý (process) các luồng bản ghi (streaming event) một cách hiệu quả.




Với sự ra đời của Confluent Cloud, các nhà phát triển có thể tận dụng Kafka mà không phải đối mặt với sự phức tạp của việc quản lý cơ sở hạ tầng. Confluent Cloud cung cấp dịch vụ Kafka được tự chủ quản lý với các tính năng như tự động mở rộng quy mô, lưu giữ dữ liệu và tích hợp liền mạch với các thành phần khác của Confluent Platform.

Trong bài này, chúng ta sẽ khám phá cách sử dụng Kafka với Confluent Cloud và Go. Cộng Đồng Linux sẽ hướng dẫn từ việc tạo tài khoản Confluent Cloud, thiết lập Kafka Cluster đến khởi tạo clients (producers và consumers) trong Go để gửi và đọc tin nhắn từ Kafka.

Ngoài giải pháp có sẵn của Confluent, bạn có thể tự host Kafka Cluster bằng Docker.

Toàn bộ source code của bài nằm ở repo Github kafka-demo.


Yêu cầu kỹ thuật khi sử dụng Kafka với Confluent và Go

Sau đây là những công nghệ bạn cần phải cài đặt nếu muốn sử dụng Kafka với Confluent và Go!

1. Confluent

1.1. Confluent Cloud (CCloud)​

Confluent Cloud (gọi tắt là CCloud) là một dịch vụ truyền dữ liệu linh hoạt, có thể mở rộng, dựa trên Apache Kafka và được phân phối dưới dạng dịch vụ tự chủ quản lý.

Bạn cần phải đăng ký tài khoản Confluent Cloud thông qua giao diện web gọi là Confluent Cloud Console. Sau khi đăng ký thành công, bạn sẽ được tặng $400 đô dùng thử trong vòng 1 tháng.

Confluent cung cấp 3 cách để tương tác với Confluent Cloud:

  1. Confluet CLI (giao diện dòng lệnh)
  2. Confluent Cloud Console (giao diện web)
  3. Confluent Cloud APIs (sử dụng APIs)

1.2. Confluent CLI (không bắt buộc)​

Confluent CLI (Command-line Interface) là giao diện dòng lệnh của Confluent, cho phép các nhà phát triển quản lý Confluent Cloud và Confluent Platform.

Để cài đặt và sử dụng các lệnh cơ bản trong Confluent CLI, xin bạn vui lòng đọc bài Hướng dẫn cài đặt và sử dụng Confluent CLI.

2. Go

Chúng ta sẽ dùng ngôn ngữ Go phiên bản mới nhất (1.20) để tạo producers và subscribers (gọi chung là clients) cho Kafka cluster. Bạn có thể cài đặt Go dễ dàng thông qua hướng dẫn cài đặt trên trang chủ của Go.




Kafka cơ bản: Cách sử dụng Kafka với Confluent & Go

Bước 1: Tạo topic bên trong Kafka Cluster

Ở mục viết về Confluent phía trên, Cộng Đồng Linux đã liệt kê ra 3 cách để tương tác với Confluent Cloud. Trong phạm vi bài viết, Cộng Đồng Linux chỉ hướng dẫn 2 cách phổ biến là:

  1. Dùng Confluent CLI (giao diện dòng lệnh)
  2. Dùng Confluent Cloud Console (giao diện web)

Cách 1: Dùng Confluent CLI

1. Tạo cluster​

Bỏ qua bước này nếu bạn đã có sẵn một cluster.

Bash:
confluent kafka cluster create <name> --cloud <cloud provider> --region <cloud region>

Câu lệnh tạo cluster trong Confluent CLI
Ví dụ
Bash:
confluent kafka cluster create dev0 --cloud aws --region us-east-1

Ví dụ về câu lệnh tạo cluster trong Confluent CLI

2. Tạo topic bên trong cluster vừa tạo ở bước 1​

cluster ID lấy từ output sau khi tạo cluster của bước 2.

Bash:
confluent kafka topic create <name> --cluster <cluster ID>

Câu lệnh tạo topic trong Confluent CLI
Ví dụ
Bash:
confluent kafka topic create test_topic --cluster <ID of the dev0 cluster>

Ví dụ về câu lệnh tạo topic trong Confluent CLI
Nếu bạn đã có sẵn cluster, chạy câu lệnh sau để lấy thông tin (bao gồm ID) của tất cả cluster.

Bash:
confluent kafka cluster list

Câu lệnh lấy thông tin của tất cả các clusters

3. Tạo API key cho cluster​

API key dùng để xác thực kết nối giữa clients với cluster.

Bash:
confluent api-key create --resource <cluster ID>

tạo API key cho cluster

Cách 2: Dùng Cloud Console

1. Tạo cluster​

Có nhiều lựa chọn để tạo cluster tương ứng với các cấu hình khác nhau. Ở phạm vi demo của bài này, chúng ta sẽ chọn cluster Basic.

kafka-co-ban
Giao diện tạo Kafka cluster trên Confluent Cloud

2. Tạo topic bên trong cluster vừa tạo ở bước 1​

Từ menu điều hướng, click vào Topics, sau đó click vào nút Add Topic.

kafka-co-ban

Hiện tại, Confluent Cloud không giới hạn số partitions của topic, giá trị mặc định là 6.

kafka-co-ban
Giao diện tạo Kafka topic trên Confluent Cloud
Việc lựa chọn số partition phụ thuộc vào nhiều yếu tố như:

  • Resource overhead (Chi phí tài nguyên): Mỗi partitions yêu cầu một số tài nguyên hệ thống về bộ nhớ và xử lý tệp. Số lượng partitions rất cao có thể chiếm dụng tài nguyên của brokers.
  • Latency (Độ trễ): Số lượng partitions cao có thể dẫn tới tăng thời gian xử lý gây ra bởi chi phí quản lý và điều phối số lượng lớn partitions.
  • ZooKeeper load (Mức tải của ZooKeeper): ZooKeeper dùng để quản lý metadata của Kafka. Nếu partitions tăng số lượng lớn thì ZooKeeper phải quan lý nhiều metadata hơn dẫn tới mức tải của ZooKeeper sẽ tăng theo.
  • Throughput (Thông lượng): Tăng partitions giúp tăng thông lượng do nhiều consumers cùng đọc đồng thời từ nhiều partitions khác nhau.
  • Scalability (Độ mở rộng): Số parttions tăng sẽ giúp việc xử lý events tốt hơn giữa các brokers.

3. Tạo API key cho cluster

Từ menu điều hướng, click vào API Keys, sau đó click vào nút Add Key. Sau đó, điền mô tả cho key (nếu cần) và tải xuống file chứa key và secret.

kafka-co-ban
Giao diện tạo API key cho cluster

Bước 2: Setup producer

Confluent cung cấp sẵn một thư viện confluent-kafka-go để giúp clients kết nối với Confluent Cloud thông qua ngôn ngữ Go. Ngoài ra, bạn có thể xem code mẫu cho nhiều trường hợp khác nhau ở folder /sample.

Trong bài, Cộng ĐỒng Linux sẽ khởi tạo một project Go có cấu trúc như sau:


Bash:
|-- kafka-using-confluent/
|-- consumer.go
|-- producer.go
|-- README.md
|-- go.mod
|-- go.sum

Cấu trúc của project chứa client code
Mở terminal, khởi tạo dự án Go chứa clients code.

Bash:
# Tạo folder để chứa source code
mkdir kafka-using-confluent

# Khởi tạo Go project
go mod init kafka-demo
[/CODE

Khởi tạo project Go chứa client code
Tạo file producer.go trong folder kafka-using-confluent để gửi message tới Kafka.

[CODE=bash]
touch producer.go

Tạo file `producer.go`
Message của chúng ta có dạng JSON như sau:
Bash:
{
  "id": 5950,
  "quantity": 3,
  "item_type": "guitar",
  "price_per_unit": "BMg="
}

JSON Message
Nội dung file producer.go như sau:

Bash:
# producer.go
package main

import (
    "fmt"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/confluentinc/confluent-kafka-go/schemaregistry"
    "github.com/confluentinc/confluent-kafka-go/schemaregistry/serde"
    "github.com/confluentinc/confluent-kafka-go/schemaregistry/serde/jsonschema"
)

// Purchase is a simple record example
type Purchase struct {
    Id           int    `json:"id"`
    Quantity     int    `json:"quantity"`
    ItemType     string `json:"item_type"`
    PricePerUnit string `json:"price_per_unit"`
}

func main() {
    // Get env
    if len(os.Args) < 6 {
        fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <group> <topics..>\n", os.Args[0])
        os.Exit(1)
    }

    bootstrapServers := os.Args[1]
    clusterApiKey := os.Args[2]
    clusterApiSecret := os.Args[3]
    topic := os.Args[4]
    schemaRegistryUrl := os.Args[5]
    schemaRegistryApiKey := os.Args[6]
    schemaRegistryApiSecret := os.Args[7]

    hostname, err := os.Hostname()
    if err != nil {
        fmt.Printf("Failed to get hostname: %s", err)
        os.Exit(1)
    }

    // Create producer
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": bootstrapServers,
        "sasl.mechanisms":   "PLAIN",
        "security.protocol": "SASL_SSL",
        "client.id":         hostname,
        "sasl.username":     clusterApiKey,
        "sasl.password":     clusterApiSecret,
        "acks":              "all",
    })
    if err != nil {
        fmt.Printf("Failed to create producer: %s", err)
        os.Exit(1)
    }

    // Create schema register client
    client, err := schemaregistry.NewClient(schemaregistry.NewConfigWithAuthentication(
        schemaRegistryUrl,
        schemaRegistryApiKey,
        schemaRegistryApiSecret,
    ))
    if err != nil {
        fmt.Printf("Failed to create schema registry client: %s\n", err)
        os.Exit(1)
    }

    ser, err := jsonschema.NewSerializer(client, serde.ValueSerde, jsonschema.NewSerializerConfig())
    if err != nil {
        fmt.Printf("Failed to create serializer: %s\n", err)
        os.Exit(1)
    }

    // Serialize message
    msg := Purchase{
        Id:           5950,
        Quantity:     3,
        ItemType:     "guitar",
        PricePerUnit: "BMg=",
    }
    msgBytes, err := ser.Serialize(topic, &msg)
    if err != nil {
        fmt.Printf("Failed to serialize payload: %s\n", err)
        os.Exit(1)
    }

    // Asynchronous writes
    deliveryChan := make(chan kafka.Event, 10000)
    err = p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          msgBytes,
        Key:            []byte(time.Now().UTC().Format(time.RFC1123)),
        Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
    }, deliveryChan)
    if err != nil {
        fmt.Printf("Failed to deliver message: %s", err)
        os.Exit(1)
    }

    // Go-routine to handle message delivery reports and possibly other event types (errors, stats, etc)
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver: %v\n", ev.TopicPartition.Error)
                } else {
                    fmt.Printf("Delivered message to topic %s partition [%d] @ offset %v\n",
                        *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
                }
            case kafka.Error:
                // Generic client instance-level errors, such as
                // broker connection failures, authentication issues, etc.
                //
                // These errors should generally be considered informational
                // as the underlying client will automatically try to
                // recover from any errors encountered, the application
                // does not need to take action on them.
                fmt.Printf("Error: %v\n", ev)
            default:
                fmt.Printf("Ignored event: %s\n", ev)
            }
        }
    }()

    // Wait for all messages to be delivered
    // Flush and close the producer and the events channel
    for p.Flush(1000) > 0 {
        fmt.Print("Still waiting to flush outstanding messages\n")
    }
    p.Close()
}

Source code cho file producer.go
Chúng ta sẽ cần chạy 2 terminal. Terminal 1 cho producer tạo message tới Kafka. Terminal 2 cho consumer đọc message từ Kafka.

Chạy dòng lệnh sau trên terminal 1, truyền vào các tham số cần thiết để tạo producer và gửi message tới Kafka.

Bash:
go run producer.go \
<bootstrap-servers> \
<cluster-api-key> \
<cluster-api-secret> \
<topic> \
<schema-registry-url> \
<schema-registry-api-key> \
<schema-registry-api-secret>


Script tạo producer
Trong câu lệnh trên, ta có các tham số cần thiết là:

  1. bootstrap-servers: danh sách các brokers mà clients sẽ tương tác với. Ví dụ, host1:port1,host2:port2.
  2. cluster-api-key: API key của cluster.
  3. cluster-api-secret: API secret của cluster.
  4. topic: topic mà producer sẽ gửi message tới.
  5. schema-registry-url: API endpoint để các ứng dựng tương tác với Schema Registry.
Một event mới tạo ra sẽ trông như sau ở Confluent Console:

kafka-co-ban
Event mới tạo trên Cluster
kafka-co-ban
Value của event mới tạo trên Cluster

Bước 3: Setup consumer

  • Tạo file consumer.go trong folder kafka-using-confluent. File này dùng để tạo consumer để đọc message từ Kafka và hiển thị ra console.
Bash:
# consumer.go
package main

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
if len(os.Args) < 5 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <group> <topics..>\n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
clusterApiKey := os.Args[2]
clusterApiSecret := os.Args[3]
group := os.Args[4]
topics := os.Args[5:]

sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
// Avoid connecting to IPv6 brokers:
// This is needed for the ErrAllBrokersDown show-case below
// when using localhost brokers on OSX, since the OSX resolver
// will return the IPv6 addresses first.
// You typically don't need to specify this configuration property.
"broker.address.family": "v4",
"sasl.mechanisms":       "PLAIN",
"security.protocol":     "SASL_SSL",
"sasl.username":         clusterApiKey,
"sasl.password":         clusterApiSecret,
"group.id":              group,
"session.timeout.ms":    6000,
// Start reading from the first message of each assigned
// partition if there are no previously committed offsets
// for this group.
"auto.offset.reset": "earliest",
// Whether we store offsets automatically.
"enable.auto.offset.store": false,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Consumer %v\n", c)

if err := c.SubscribeTopics(topics, nil); err != nil {
fmt.Fprintf(os.Stderr, "Failed to subcribe topics %s %s\n", topics, err)
os.Exit(1)
}

run := true

for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
// Retrieve records one-by-one
ev := c.Poll(100)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
// Process the message received.
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}

// We can store the offsets of the messages manually or let
// the library do it automatically based on the setting
// enable.auto.offset.store. Once an offset is stored, the
// library takes care of periodically committing it to the broker
// if enable.auto.commit isn't set to false (the default is true).
// By storing the offsets manually after completely processing
// each message, we can ensure atleast once processing.
if _, err := c.StoreMessage(e); err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n", e.TopicPartition)
}
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to automatically recover.
// But in this example we choose to terminate the application if all brokers are down.
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}

fmt.Printf("Closing consumer\n")
c.Close()
}


Source code cho file consumer.go
Mở terminal 2, chạy dòng lệnh sau để tạo consumer đọc message từ Kafka.

Bash:
# Chạy consumer, truyền vào các tham số cần thiết
go run consumer.go \
<bootstrap-servers> \
<cluster-api-key> \
<cluster-api-secret> \
<consumer-group-id> \
<topic-1> \
<topic-2> \
...
<topic-N>



Câu lệnh tạo consumer
Trong câu lệnh trên, ta có các tham số cần thiết là:

  1. bootstrap-servers: danh sách các brokers mà clients sẽ tương tác với. Ví dụ, host1:port1,host2:port2.
  2. cluster-api-key: API key của cluster
  3. cluster-api-secret: API secret của cluster
  4. consumer-group-id: ID của consumer group
  5. topic-1, topic-2, topic-N: consumers subscribe tới nhiều topic 1, 2, ..., N.

Tổng kết​

Trong bài viết, chúng ta đã cùng nhau thiết lập Kafka trên Confluent Cloud, khởi tạo clients bằng ngôn ngữ Go để gửi và nhận messages. Hi vọng bài viết sẽ giúp bạn trang bị kiến thức cơ bản về cách sử dụng Kafka trong Confluent Cloud và Go.
 
Last edited:
Back
Top