티스토리 뷰
🍇 아파치 카프카 (Apache Kafka)?
아파치 카프카는 분산 데이터 스트리밍 플랫폼으로, 대용량의 데이터를 안정적으로 실시간 전송하고 저장하기 위한 오픈 소스 시스템입니다.
아파치 카프카는 메시지를 파일 시스템에 저장함으로써 기존의 AMQP 기반 메시징 큐와 달리 데이터를 재사용할 수 있다는 특징을 가지고 있습니다.
이외에도 많은 특징이 있지만, 이번 게시글에서는 아파치 카프카에 대한 소개를 보다는, 간단한 예제를 통해 카프카를 메시징 큐로써 사용하는 방법을 연습해 보겠습니다.
🎁 패키지 선택
Go에서 사용할 수 있는 아파치 카프카 관련 패키지는 대표적으로 3개를 꼽을 수 있습니다.
- sarama: 가장 많은 스타를 보유하고 있습니다. 예제가 있긴 하지만 위키랑 README에 글로만 설명을 해놔서 접근성은 떨어지는 것 같습니다.
- kafka-go: sarama 다음으로 많은 스타를 보유하고 있습니다. 설치해서 사용을 해봤는데 개인적으로는 Producer, Consumer를 Writer, Reader라고 명명해 놓은 것이 메시징 큐를 사용하는 데 있어서 직관적이지 않은 것 같습니다. 컨텍스트를 사용해서 흐름을 제어할 수 있기 때문에 다른 두 패키지와의 차별점이 있습니다.
- confluent-kafka-go: cgo로 C 라이브러리를 래핑 했다는 점에서 순수 Go로 작성된 다른 두 패키지와 차이가 있습니다. confluent사에서 공식으로 지원하고 있는 패키지이므로 상업적인 지원을 받을 수 있고 공식 문서와 튜토리얼도 잘 정리되어 있습니다.
저는 아파치 카프카를 처음 다뤄보는 입장이라 3번 confluent-kafka-go를 선택했습니다. 예제와 튜토리얼이 잘 되어있고 로컬에서 도커를 사용하여 카프카를 실행하는 내용까지 친절하게 설명해 놓았으니까요. 예제 관련 링크는 아래 참고 자료에 추가해 놓았습니다!
🛕 서비스 아키텍쳐
시나리오
- 손님(client)이 카운터에서 커피를 주문합니다(counter 서비스의 '/order' 경로로 post 요청).
- 카운터 점원은 손님의 주문을 받아 주문 목록에 올려놓습니다(이벤트 토픽을 'order.received'로 지정하고 주문 정보와 함께 주문이 접수되었다는 이벤트를 발행).
- 그리고 손님에게 영수증과 진동벨을 건네줍니다(client의 요청에 대한 응답으로 202 Accepted 코드와 주문 ID를 반환).
- 바리스타는 주문 목록을 확인하고 커피를 내립니다(토픽이 'order.received'인 이벤트를 구독).
- 커피가 준비되면 바리스타는 손님의 진동벨을 울립니다(토픽을 'order.processed'로 지정하고 주문 정보와 함께 주문이 처리되었다는 이벤트를 발행).
- 손님은 진동벨이 울리는 것을 확인하고 주문한 커피를 픽업합니다(토픽이 'order.processed'인 이벤트를 구독).
🐱👤 전체 코드
📑 코드 살펴보기
1. event 패키지
event 패키지에는 다른 서비스들에서 공통으로 사용할 토픽 변수와 Order 구조체를 정의해 놓았습니다.
package event
import (
"encoding/json"
"github.com/google/uuid"
)
var (
OrderReceivedTopic string = "order.received"
OrderProcessedTopic string = "order.processed"
)
type OrderStatus int
const (
OrderReceived OrderStatus = iota
OrderProcessed
)
func (os OrderStatus) String() string {
return [...]string{"OrderReceived", "OrderProcessed"}[os]
}
type Order struct {
OrderID string `json:"order_id"`
Amount int `json:"amount"`
Status OrderStatus `json:"status"`
}
func NewOrder(amount int) Order {
return Order{
OrderID: uuid.New().String(),
Amount: amount,
Status: OrderReceived,
}
}
func (o *Order) MarshalBinary() ([]byte, error) {
return json.Marshal(o)
}
func (o *Order) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, o)
}
2. counter 서비스
counter 서비스는 localhost의 8080번 포트에서 실행되는 http 서버와 order.received 이벤트를 발행하는 카프카 Producer로 구성되어 있습니다.
type Counter struct {
*http.Server
producer *kafka.Producer
deliveryChan chan kafka.Event
}
메시지 동기 발행
deliveryChan는 다음과 같이 메시지를 발행할 때 Produce 메서드의 두 번째 인수로 넘겨주고 메시지 발행 결과를 받아와서 동기 처리하기 위해 사용됩니다.
err = c.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &event.OrderReceivedTopic,
Partition: kafka.PartitionAny,
},
Key: []byte(order.OrderID),
Value: val,
}, c.deliveryChan)
if err != nil {
http.Error(w, "Failed to produce message", http.StatusInternalServerError)
return
}
e := <-c.deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
http.Error(w, "Failed to produce message", http.StatusInternalServerError)
} else {
log.Printf("Produced message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
w.WriteHeader(http.StatusAccepted)
w.Header().Add("Content-Type", "application/json")
w.Write(val)
}
3. barista 서비스
barista 서비스는 새로 생성된 주문을 받는 Consumer와 주문받은 커피가 준비되었음을 알리는 Producer가 각각 필요합니다.
func newConsumer() *kafka.Consumer {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "barista_group",
"auto.offset.reset": "earliest",
"broker.address.family": "v4",
})
if err != nil {
panic(err)
}
return consumer
}
func newProducer() *kafka.Producer {
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
})
if err != nil {
panic(err)
}
return producer
}
메시지 구독 및 비동기 발행
barista 서비스는 Consumer로 order.received 토픽을 구독하여 메시지를 가져오며, 가져온 메시지를 goroutine 안에서 비동기 처리한 뒤 Producer를 통해 order.processed 토픽을 지정하여 커피가 준비되었음을 알립니다.
err := consumer.SubscribeTopics([]string{event.OrderReceivedTopic}, nil)
if err != nil {
panic(err)
}
go func() {
// for async writes
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
} else {
log.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
run := true
for run {
select {
case <-sigChan:
run = false
default:
msg, err := consumer.ReadMessage(100 * time.Millisecond)
if err != nil {
continue
}
order := event.Order{}
err = order.UnmarshalBinary(msg.Value)
if err != nil {
log.Printf("Error unmarshalling order: %s\n", err)
continue
}
log.Printf("Order received: %s\n", order.OrderID)
go func() {
time.Sleep(time.Duration(order.Amount) * time.Second)
order.Status = event.OrderProcessed
val, err := order.MarshalBinary()
if err != nil {
log.Printf("Error marshalling order: %s\n", err)
return
}
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &event.OrderProcessedTopic,
Partition: kafka.PartitionAny,
},
Key: []byte(order.OrderID),
Value: val,
}, nil)
if err != nil {
log.Printf("Error producing order: %s\n", err)
return
}
log.Printf("Order processed: %s\n", order.OrderID)
producer.Flush(15 * 1000)
}()
}
}
4. client 서비스
client 서비스는 counter 서비스로 http 요청을 보낸 뒤, order.processed 토픽을 지정한 Consumer를 통해 커피가 준비되었다는 메시지를 받을 수 있습니다.
func newConsumer() *kafka.Consumer {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "client_group",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
return consumer
}
func main() {
c := flag.Int("c", 1, "number of clients")
flag.Parse()
log.Printf("Starting %d clients\n", *c)
consumer := newConsumer()
defer consumer.Close()
err := consumer.SubscribeTopics([]string{event.OrderProcessedTopic}, nil)
if err != nil {
panic(err)
}
stopChan := make(chan struct{})
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
for {
select {
case <-sigChan:
log.Println("Shutting down client...")
close(stopChan)
return
default:
msg, err := consumer.ReadMessage(100 * time.Millisecond)
if err != nil {
continue
}
order := event.Order{}
err = order.UnmarshalBinary(msg.Value)
if err != nil {
log.Printf("Error unmarshalling order: %s\n", err)
continue
}
log.Printf("Order ID %s is ready for pickup. Enjoy your coffee!\n", order.OrderID)
}
}
}()
client := http.DefaultClient
for i := 0; i < *c; i++ {
go func() {
amount := rand.Intn(5) + 1
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:8080/order?amount=%d", amount), nil)
if err != nil {
panic(err)
}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
panic("order not accepted")
}
var order event.Order
err = json.NewDecoder(resp.Body).Decode(&order)
if err != nil {
panic(err)
}
log.Printf("Order accepted: %s\n", order.OrderID)
}()
time.Sleep(1 * time.Second)
}
<-stopChan
}
💻 코드 실행
1. 카프카 컨테이너 실행
$ docker compose up -d
2. counter와 barista 실행
각자 다른 터미널에서 실행!
$ go run ./cmd/counter
$ go run ./cmd/barista
3. client 실행
$ go run ./cmd/client/ --c 10
🙏 마치며
아파치 카프카를 사용해 간단한 이벤트 기반 서비스를 구현해 보았습니다. 개인적으로는 산문으로된 문서를 읽어내려가는 것보다는 코드를 보고 따라치면서 이해하는 쪽을 선호하는데, confluent사에서 제공해주는 예제와 튜토리얼이 너무 잘 되어있어서 큰 어려움없이 카프카의 세계에 입문할 수 있었던 것 같습니다. confulent사에 커다란 감사!
📖 참고자료
글에서 수정이 필요한 부분이나 설명이 부족한 부분이 있다면 댓글로 남겨주세요!
'Go > 코딩 하기' 카테고리의 다른 글
[Go] Polygon ID와 Websocket을 사용한 신원 인증 - 1. Polygon ID (0) | 2023.12.20 |
---|---|
[Go] 우아하게 종료하기 (Graceful shutdown) (0) | 2023.10.24 |
[Go] gRPC 파헤치기 - 프로토콜 버퍼 (Protocol Buffers) (0) | 2023.10.12 |
[Go] gRPC 파헤치기 - gRPC란? (0) | 2023.10.11 |
[Go] 문자열과 바이트 슬라이스를 상호 변환하는 여러가지 방법 (0) | 2023.10.01 |