본문 바로가기
Infrastructure/Apache Kafka

Apache Kafka 가볍게 알아보기(작성중)

by p4cho 2024. 1. 19.
728x90

Apache Kafka란?

Event Streaming Platform

그래서 어떤 작업을 하는지 단계별로 보면,

  1. Publish & Subscribe - 이벤트 스트림을 안전하게 전송
  2. Write to Disk - 이벤트 스트림을 디스트에 저장
  3. Processing & Analysis - 이벤트 스트림을 분석 및 처리

Kafka는 링크드인 내에서 비즈니스에서 발생하는 다양한 이벤트 스트림(하루 4.5조 개 이상)을 처리할때 기존의 RabitMQ만으로는 한계가 있어서 대량의 이벤트 스트림을 “잘 쓰기” 위해 개발. 2011년에 Apache Software Foundation에 기부되어 오픈소스화.

출처: https://www.confluent.io/blog/kafka-fastest-messaging-system

 

Benchmarking RabbitMQ vs Kafka vs Pulsar Performance

A complete benchmark of RabbitMQ, Kafka, and Pulsar to determine performance, throughput, and latency at scale. View the comparison results!

www.confluent.io

Kafka는 쓰기(write)작업에 최적화된 시스템, 창시자인 Jay Kreps(Confluent CEO)가 좋아했던 작가(Franz Kafka)에서 따온 이름. 오픈소스 프로젝트로서 듣기에 좋은 이름이었다함.

What is the relation between Kafka, the writer, and Apache Kafka, the distributed messaging system?

 

What is the relation between Kafka, the writer, and Apache Kafka, the distributed messaging system?

Answer by Jay Kreps, I am a committer on Apache Kafka

www.quora.com

참고:

Apache Kafka

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

Apache Kafka는 어떻게 동작하는가?

Kafka는 서버와 클라이언트로 구성된 분산 시스템으로 베어메탈 하드웨어나 온프레미스 또는 클라우드 환경의 VM, 컨테이너 등 어디에든 구성할 수 있다.

서버는 하나 이상의 서버(Broker)들로 구성된 클러스터(Cluster) 형태로 실행되고, 다양한 클라이언트에서 필요한 데이터를 읽어갈 수 있게 잘쓰는 역할을 한다.

클라이언트는 크게 Producer와 Consumer 로 나눌 수 있는데, 이름대로 Producer는 이벤트(event, record, message, data, …)를 만들어 서버에 전송하고, Consumer는 이벤트를 처리(사용, 분석)하기위해 서버에서 읽어간다. Consumer는 관련있는 Consumer끼리 모여 하나의 Consumer Group에 속한다.

Topic, Partition, Segment

출처:

Apache Kafka

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

Topic은 이벤트가 저장되는 장소로서 이벤트를 종류별로 저장하는 논리적인 개념이다.

Topic을 생성할때 병렬처리를 위해 여러 Partition을 구성할 수 있으며 Partition은 commit log 자료구조로 메세지를 저장한다. commit log는 저장 순서에 따라 추가만 가능한 자료구조이다. Partition Key를 지정해서 여러 Partition에 저장될 이벤트를 구별할 수 있고, Partition Key를 따로 지정하지 않으면 랜덤하게 분배된다.

Partition에 저장되는 이벤트들이 실제로 저장되는 파일을 Segment라고 한다. 설정에따라 Segement의 용량이나 Segment가 활성화되고 경과한 시간을 기준으로 새로운 활성화된 Segment를 생성한다. 활성화된 Segment에도 이벤트를 추가하는 작업만 가능하고, 변경할 수 없다.(immutable)

Broker, Zookeeper

위에서 말한 클러스터를 구성하는 서버들이 바로 Broker이다. Broker가 Partition을 읽고 쓰는 작업을 수행한다. 최소 3대 이상의 Broker를 하나의 Cluster로 구성해야한다.

Topic 생성 시 설정한 수만큼 Partition이 생성되고, Broker들이 골고루 나누어 가진다.

Broker는 Bootstrap 서버라고도 부르는데, 하나의 Broker에만 연결해도 Cluster 전체에 연결된다. 클라이언트(Producer, Consuemr)가 어느 Broker에게 요청을 하더라도 모든 Broker, Topic, Partition에 대한 Metadata를 얻을 수 있다.

Zookeeper는 Broker들의 목록, 설정을 관리하는 소프트웨어로서 여러대의 Zookeeper 서버(Zookeeper Ensemble; 주키퍼 앙상블)가 각각 한 대 이상의 Broker를 관리한다. Zookeeper 는 쿼럼(Qourum; 정족수)으로 구성하며 최소 3대 이상, 권장 5대로 구성한다.

Kafka 2.8 버전 이전까지는 Zookeeper를 반드시 함께 구성해야했지만 2.8버전 부터 Zookeeper 없이 Kraft(Kafka Raft) 프로토콜을 사용할 수 있게되어 Kraft 모드로 구성하면 각 Broker의 Quorum Controller가 메타데이터들을 저장하고 관리한다. 이는 kafka 3.3.1버전부터 Production에서 구성 가능하며 4버전 부터는 Zookeeper를 완전히 사용하지 않게된다.

Producer

Producer는 Broker에게 메세지를 전달하는 클라이언트다. Producer Application에는 Record(message)를 만들고 Serializer에의해 byte array로 변환, Partione가 어느 Partition에 쓰여져야 하는지 정하고 Record Accumulator에 Batch 작업단위로 저장해두었다가 Kafka에 전송한다. Kafka에 전송이 성공하면 metadata를 받고, 실패 시 조건에 따라 전송을 재시도 할 수 있다.

  • Producer Record: Kafka에 이벤트를 보내는 구조
Producer Record
├── Headers
│   ├── Partition
│   ├── Timestamp
│   └── Topic*
├── Key
└── Value*

*: Required
  • Serializer: Kafka는 Record를 byte array로 저장한다. Serializer가 다양한 타입(JSON, String, Avro, Protobuf .. )의 이벤트를 byte array로 변환한다.
  • Partitioner: Record를 Topic의 어떤 Partition으로 보낼지 결정한다. Key가 있다면 Key의 해시값을 Partition들의 수로 나눈 나머지 값이 타겟 Partition의 index가 된다. Key가 null이면 Round-Robin(2.4버전 이전) 또는 Sticky(2.4버전 이후) 정책으로 보내진다. Partitioner는 원하는 방식으로 커스텀 가능하다.
  • Record Accumulator: accumulator(누산기)는 보통 CPU에 있는 레지스터의 일종으로 CPU가 처리하는 계산의 중간 결과를 저장하는 용도이다. 여기서는 Record를 Kafka로 전송하기전에 Topic, partition별로 잠시 저장해두는 용도이다. Topic, Partition기준으로 Kafka에 전송하는 Batch 작업 단위를 만들어 두었다가 Kafka로 전송한다.

Consumer

Consumer는 Partition의 특정 위치(segment)에 저장 되어있는 데이터를 읽어온다. 이때 Consumer는 데이터를 읽은 Partition마다 데이터를 읽은 위치(consumer-offset)을 따로 저장하고 있다.

같은 group.id를 가진 Consumer들은 하나의 Consumer Group으로 묶인다. 하나의 Topic에서 데이터를 읽어오는 Consumer Group을 구성하면 Topic의 Partition의 수에따라 Consumer Group에 속한 Consumer 들은 0개 이상의 Partition에서 데이터를 읽도록 밸런스를 맞춘다. Consumer Group 내의 Consumer 구성이 변경되었을때 설정한 전략에 따라 리밸런싱을 할 수 있다.

Message Ordering

메세지가 저장되는 Partition이 2개 이상인 경우 모든 메세지에 대한 순서를 보장할 수 없다. Partition을 1개만 구성한다면 순서를 보장할 수 있는 대신 처리량이 떨어진다. Producer Record에 Key를 지정하여, 같은 Key값의 메세지는 같은 Partition에 저장 되도록 구성할 수 있다. 하나의 Partition에 저장된 메세지들은 하나의 Consumer Group 내에서 하나의 Consumer만 접근하기 때문에 Key 기준으로 순서를 보장할 수 있다.

하지만 Key Cardinality(같은 Key 값의 메세지들중 유니크한 값의 개수)가 고르지 못할 수록 Consumer들 간의 처리량이 차이가 날 수 있다. 따라서 Partition 전체에 고르게 분배될 수 있는 Key를 만드는 것이 중요하다.

Replication

Partition을 복제(Replication)하여 다른 Broker상에서 복제물(Replicas)을 만들어서 장애를 미리 대비한다. Replica는 Leader Partition과 Follower Partition들로 구성된다. Producer는 Leader에만 Write하고 Consumer는 Leader로부터만 Read한다. (kafka 2.4버전 부터는 Consumer가 Follower로부터 Read 할 수 있는 옵션이 있다. Follower는 Leader의 CommitLog에서 데이터를 가져오기 요청(FetchRequest)으로 메세지를 복제한다.

Rack Awareness

Replica들(Leader, Followers)은 최대한 Rack간에 균형을 유지하여 Rack장애 대비하는 Rack Awareness 기능이 있다. kafka Broker가 여러 서버 Rack에 분산되어 있는 경우 Rack자체가 다운된다면 Rack내의 모든 Broker의 운영에 문제가 생기기 때문에 Rack을 지정해두면 최대한 Leader, Follower들이 여러 Rack에 걸쳐서 분산 된다.

In-Sync Replicas(ISR)

ISR은 Leader Partition의 High Water Mark라고 하는 지점(Committed)까지 동일하게 복제된 Replicas(Leader, Followers)의 목록을 말한다. Leader에 장에가 발생하면 ISR 중에서 새로운 Leader를 선출한다.

ISR 목록의 어떤 Follower의 가장 마지막으로 복제된 커밋이 High Water Mark보다 이전 지점인 경우 OSR(Out of Sync Replica)로 판단할 수 있는데, 이 조건에대한 두가지 설정 옵션이 있다.

  • replica.lag.max.messages : Leader가 저장(commit)한 메세지 수와 어떤 Follower가 복제한 메세지 수의 차이가 설정값 이상으로 나는 경우 이 Follower는 OSR로 판단한다. 순간적인 메세지 유입량의 증가로 잠깐의 지연이 발생할 수도 있어서 Broker의 상태는 정상이나 불필요한 에러와 ISR 업데이트가 일어날 수 있다.
  • replica.lag.time.max.ms: Follower가 Leader로 Fetch 요청을 보내는 주기를 확인. 설정값보다 오랜시간동안 Fetch요청하지 않는 Follower는 OSR로 판단한다.

ISR은 Leader가 관리하는데, ISR목록에 변경이 있는 경우 이를 Zookeeper에 새 목록을 전달하고 Zookeeper는 Controller에 변경된 내용(Partition Metadata)을 전달한다.

Committed

ISR 목록의 모든 Replicas가 메세지를 받으면 Committed라고 한다.(→ High Water Mark)

High Water Mark

가장 최근의 Committed 메세지의 Offset. replication-offset-checkpoint 파일에 기록.

Controller

Kafka Cluster내의 Broker중 하나. Controller는 Zookeeper를 통해 Broker가 동작중임(liveness)을 모니터링한다. Zookeeper로부터 받은 Replicas 정보를 다른 Broker들에게 전달한다.

Leader 장애시 Controller가 새 Leader를 선출한다. Controller에 장애가 발생하면 다른 Active Broker들 중에서 Controller를 재선출한다.

Leader Epoch

새 Leader가 선출된 시점의 Offset. leader-epoch-checkpoint 파일에 기록.

복구 중인 Broker는 Leader Epoch를 참조하여 이전 Leader로부터 메세지를 복제하여 동기화한다. Leader Epoch가 일치하면 다시 정상적으로 쓰기, 읽기 가능한 상태가 된다.

Producer Acks

acks=0

→ Producer는 메세지가 잘 저장되었는지 확인하지 않는다.

acks=1

→ Broker(Leader Partition)는 Produce로부터 받은 메세지를 Commit한 후 Producer에게 ack를 보내 응답한다.

acks=-1, acks=all

→ Broker(Leader Partition)는 메세지를 Commit한 후 다른 모든 Broker(Follower Partition)에서 복제 Commit에 대한 ack 응답을 받고나서 Producer에게 ack를 보내 응답한다.

Producer Retry

retries (default: MAX_INT)

→ 메세지 전송(send)을 재시도하는 횟수

retry.backoff.ms (default: 100)

→ 재시도하기 전 대기 시간

request.timeout.ms (default: 30,000)

→ Producer가 응답(ack)를 기다리는 최대 시간

delivery.timeout.ms (default: 120,000)

→ 메시지 전송(send) 후 성공 또는 실패를 응답 받는 전체 시간의 최대 시간

Producer Batch

RPC(Remote Procedure Call)수를 줄여서 Broker가 처리하는 작업을 줄이기 위해 메세지를 모아서 한 번에 전송(Batch)한다.

linger.ms (default: 0, 즉시 전송)

→ 메세지가 Batch 처리될 때까지 대기 시간. 일반적인 설정은 100

batch.size (default: 16 (KB))

→ Batch 메세지 최대 크기. 일반적인 설정은 1,000,000 (1GB)

Producer Delivery Timeout

send() 호출 후 결과 응답에 대한 timeout.

delivery.timeout.ms

= linger.ms + await send + retry.backoff.ms(retires) + request.timeout.ms(in-flight)

 

Message Send Order

max.in.flight.requests.per.connection (default: 5)

→ 동시에 전송될 수 있는 Batch의 수. 한 번에 여러 Batch를 전송하면 Broker에서 처리 결과에 따라 Commit Log에 다른 순서로 추가될 수 있다.

enable.idempotence

→ 위처럼 순서가 뒤바뀔 수 있는 상황을 피하려면 true로 설정하여 앞선 Batch가 실패 시 후속 Batch에 대해 OutOfOrderSequenceException을 발생시킨다.

Page Cache

메세지는 Partition이 Segment(file)에 쓴다고 했는데, 성능을 위해 Segment는 OS Page Cache에 기록한다. 이후에 Disk에 옮겨적는 행위를 Flush라고 한다.

Broker는 이 메세지에 대해 아무런 연산을 하지 않기 때문에 CPU가 개입하지 않고 Page Cache에 기록되었다가 Disk에 바로 쓰여지기(Zero-Copy) 때문에 Broker의 Heap 메모리를 절약할 수 있다.

Flush

Page Cache에 있는 Log Segement(메세지)를 Disk에 쓰는 행위

Flush 되기 전에 Broker에 장애가 발생한 경우 데이터가 손실될 수 있지만, Partition Replication을 설정하여 이를 방지할 수 있다.

log.flush.interval.messages

→ 마지막 Flush 이후의 메세지 수(가 되면 Flush(fsync))

log.flush.interval.ms

→ 마지막 Flush 이후의 시간(이 되면 Flush(fsync))

하지만 Kafka는 OS의 background Flush 기능(pdflush)을 더 효율적으로 허용하는 것을 선호한다. (fsync 비활성화)

*.log 파일에서 Flush 전/후의 모든 메세지를 확인할 수 있다.

Replica Failure

Broker의 장애는 Zookeeper가 관제한다.

Follower Failure:

  • Leader에 의해 ISR 리스트에서 해당 Follower를 삭제.

Leader Failure:

  • Controller가 새로운 Leader를 선출하여 새로운 ISR 리스트를 작성하고 전파.

Partition Leader가 없으면 새 Leader가 선출될 때까지 해당 Partition을 사용할 수 없음. Producer에서는 해당 Partition에 send() 요청 시 실패 응답을 받으므로 재시도 또는 NetworkException 발생.

Replica Recovery

Leader가 장애가 발생하면 새로운 Leader가 선출되고, Leader Epoch가 변경된다.

이때 acks 설정 값에 따라 producer는 마지막 메세지를 재전송(retry)하거나 하지 않는데, acks=all 이라면 새로운 Leader를 포함한 나머지 Follower들이 전 Leader로부터 메세지를 다 받지 못해서 ack를 보내지 못한 경우 Producer는 재시도 하여 새로운 Leader Partition에는 중복으로 commit되는 메세지가 생길 수도 있다. 하지만 메세지가 누락되는 것은 막을 수 있다.

acks=1이라면 이전 Leader가 commit후 Producer에게 ack를 보낸 뒤 Follower가 미처 복제해가기 전에 Leader에 장애가 발생했다면 Producer는 이전 Leader에게 마지막으로 보낸 메세지 다음 메세지부터 보내게 되고 메세지 누락이 발생한다.

Availability와 Durability

Topic Parameter:

  • unclean.leader.election.enable (default: false) → Leader 선출 시 ISR 리스트에 없는 Replica를 선출 대상에 포함할 것인지. → false 라면, ISR에 Replica가 하나도 없으면 서비스 중단. → true 라면, OSR Replica를 Leader로 선출, 데이터 유실이 발생할 수 있다.
  • min.insync.replicas (default: 1) → 최소 요구되는 ISR의 개수에 대한 옵션 → ISR이 설정값보다 적은 경우 Producer는 NotEnoughReplicas Exception을 수신한다. 2이상으로 설정하고, acks=all 옵션과 함께 사용한다면 좀 더 안정적이다.
  • replication.factor → Partition의 복제본을 몇 개나 생성할지.

Consumer Heartbeats

Consumer의 장애를 인지하기 위함

poll()과 별도로 Thread에 서 Heartbeats를 보냄

  • heartbeat.interval.ms (default: 3000)
  • session.timeout.ms (default: 10000) 설정값 안에 Heartbeats가 수신되지 않으면 Consumer는 Consumer Group에서 삭제
  • max.poll.interval.ms (default: 300000) poll()이 설정값 안에 다시 호출 되지 않으면 Rebalancing 할 수 있다.

Consumer Rebalance

Rebalancing Trigger

  • Consumer가 Consumer Group에서 탈퇴
  • 신규 Consumer가 Consumer Group에 합류
  • Consumer가 Topic 구독을 변경
  • Consumer Group은 Topic 메타데이터의 변경 사항을 인지 (ex: Partition 증가)

Rebalancing Process

  1. Group Coordinator(Broker)는 heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보냄
  2. Consumer가 일시 중지하고 Offset을 Commit
  3. Consumer는 Consumer Group의 새로운 Generation에 다시 합류
  4. Partition 재할당
  5. Consumer는 새 Partition에서 다시 Consume 시작

과도한 Rebalancing은 성능을 떨어뜨릴 수 있다.

  • Consumer Group의 각 Consumer에게 고유한 group.instance.id를 부여, 기존 Consumer가 Rejoin 시에는 Rebalance를 trigger 하지 않음
  • session.timeout.ms, heartbeat.interval.ms 값 조정
  • max.poll.interval.ms 값 조정

Partition Assignment Strategy

Assignors

  • Range (default)
  • Round Robin
  • Sticky
  • Cooperative Sticky (Basic-v2.4^, Incremental -v2.5^)
  • Custom

Kafka Log File

Data File

각 Broker의 server.properties 파일 안에 log.dirs 파라미터로 정의 (, seperate)

ex) topic_a Topic의 Partition 0은 /data/kafka/kafka-log-a/topic_a-0/

directory로 생성된다.

Partition directory 안에는 .index, .timeindex, .log 파일이 offset 범위별로 생긴다. offset 번호가 파일명이 된다.

  • Log Segment File: 메세지와 metadata 저장 (.log)
  • Index File: 각 메세지의 Offset을 Log Segment File상의 위치를 인덱싱
  • Time-based Index File: 각 메세지의 timestamp를 기반으로 인덱싱
  • Leader Epoch Checkpoint File: Leader Epoch과 관련 Offset 정보를 저장
  • Idempotent Producer 사용: .snapshot 파일 생성
  • Transactional Producer 사용: .txnindex

Log Segment File Rolling

  • log.segment.bytes (default: 1GB)
  • log.roll.ms (default: 168시간)
  • log.index.size.max.bytes (default: 10 MB)
  • offsets.topic.segment.bytes (default: 100 MB) __consumer_offset (Offset Topic)

Checkpoint File

  • replication-offset-checkpoint High Water Mark 관리
  • recovery-point-offset-checkpoint 마지막 Flush 지점 관리 - 복구 시 메세지 손실 확인 시점

Delivery Semantics

At-Most-Once

  • 메세지가 중복되지 않지만 누락될 수 있음

At-Least-Once

  • 메세지가 누락되지 않지만 중복될 수 있음

Exactly-Once

 

KIP-98 - Exactly Once Delivery and Transactional Messaging - Apache Kafka - Apache Software Foundation

[This KIP proposal is a joint work between Jason Gustafson, Flavio Paiva Junqueira,  Apurva Mehta, Sriram, and Guozhang Wang] Status Current state: Adopted Discussion thread: http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+DISCUSS+KIP+98+Exact

cwiki.apache.org

 

728x90