Kafka 란?

http://kafka.apache.org/
http://kafka.apache.org/intro
http://kafka.apache.org/documentation/


설치

spark 와 마찬가지로 다운받아 별도 설치 없이 데몬을 띄움.

별다른 설정 없이 카프카 서버의 실행, 메세지 테스트를 진행합니다.

서버에 Java 1.7.x 이상이 설치되어 있어야 합니다.



구조


구조

Kafka는 발행-구독(publish-subscribe) 모델을 기반으로 동작하며 크게 producer, consumer, broker로 구성된다.

확장성 / 고가용성을 위해 브로커들을 클러스터로 구성 동작.


기본 동작

producer 특정 topic 메시지를 생성 전달 -> Broker 분산처리 (Zookepper) Topic 별로 적재(Topic 은 파티션별로 쪼개져 각서버에 분산) -> 해당 Topic 을 구독하는 Consumer 가 메시지를 가져가서 처리


설명

Producers

    - 메시지 송신 API

    - 특정 Topic에 해당하는 메시지를 생성하는 프로세스, 메시지를 Broker에 전달(발행/Publish)

    - Producers는 데이터를 그들이 선택한 Topic으로 publish한다.

    - Producer가 메시지를 실제로 어떤 partition으로 전송할지는 사용자가 구현한 partition 분배 알고리즘에 의해 결정된다. 예를 들어 라운드-로빈 방식의 partition 분배 알고리즘을 구현하여 각 partition에 메시지를 균등하게 분배하도록 하거나, 메시지의 키를 활용하여 알파벳 A로 시작하는 키를 가진 메시지는 P0에만 전송하고, B로 시작하는 키를 가진 메시지는 P1에만 전송하는 형태의 구성도 가능하다.

Consumer

    - 메시지 수신 API

    - Broker에게서 구독(Subscribe)하는 Topic의 메시지를 가져와 사용(처리)하는 프로세스

    - Topic 당 할당된 스레드 개수만큼 스레드가 만들어지면 Partition 으로부터 메세지를 읽음.

    - 하나의 스레드는 1개 이상 partition 으로 부터 메시지를 읽을 수 있다.

    - public void run 메소드 내 while(is.hasNext())에서 블록킹돼 있다가 파티션으로 메시지가 들어오면 이 곳에서 메시지를 읽는다. 따라서 다른 타겟으로 메시지를 처리하는 데 적합한 장소라고 할 수 있다. 즉 메시지를 파일로 저장하던지 대용량 입력이 가능한 하둡이나 NoSQL로 저장하기에 유용하다.

Broker

    - 토픽을 기준으로 메시지 관리

    - broker 는 클러스터로 구성 (이에 대한 분산 처리는 zookeeper 가 처리)

    - Producers와 Consumers가 만날 수 있도록 메시지를 관리하는 서버 클러스터로 Producer에게서 전달받은 메시지를 Topic별로 분류한다. 

    - 여러대의 Broker Cluster로 구성 가능하며, Zookeeper에 의해 각 노드가 모니터링 된다.



그외

Topic

    - 발행(Publish)된 메시지들의 category

    - 유사한 메시지들의 집합이다. 프로듀서는 메시지를 전달할 토픽을 반드시 지정해야 한다.

    - partition 단위로 클러스터 각 서버들에 분산 저장

    - 각 partition 은 0부터 1씩 증가하는 offset 값을 메시지에 부여 (partition 내 메시지 식별)

    - 클러스터내 메시지들은 설정된 기간동안 유지 후 삭제됨 

Log 

    Producers가 생성한 메시지

Consumer API

    - 컨슈머 구현에 두가지 API 제공 Simple Consumer API, High-Level Consumer API  

    - 세부적인 것들은 모두 추상화되어 있어 몇 번의 간단한 함수 호출로 consumer를 구현할 수 있는 High Level Consumer API

    - offset과 같은 세부적인 부분까지 다룰 수 있지만 이 때문에 구현하기가 상당히 까다로운 Simple Consumer API가 제공된다(이름은 simple이지만 전혀 simple하지 않다).

리플리케이션(Replication) 

    - Fault Tolerance 위해 파티션 단위로 복제한다.

Partition

    - 로드밸런싱을 목적으로 토픽을 논리적으로 분할하는 것을 의미한다.



기존의 메시징과 차이점

1 기존 broker -> consumer 로 push 카프카 반대로 pull 로 땡겨옴.

2 이로 인해 필요한 메시지만 broker 에서 가져오므로 최적의 성능. 그리고 pull 방식이라 batch 처리 구현 가능.

3 카프카는 파일로 저장 연속성 상승 (대용량에 적합) - HDD 의 순차읽기는 SSD에 7배 정도만 느리다.

4 AMQP 프로토콜이나 JMS API를 사용하지 않고 단순한 메시지 헤더를 지닌 TCP기반의 프로토콜을 사용하여 오버헤드 감소


기본 클러스터 구조도





기본 예제

# 토픽생성
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".


# log stash 에서 들어올 토픽도 하나 생성해 둔다. 
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash_logs
logstash_logs

참고로 토픽 삭제시 --delete 옵션을 주면 된다.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 토픽이름

# 토픽리스트
# bin/kafka-topics.sh --list --zookeeper localhost:2181
logstash_logs
test
# 프로듀서 메시지 생성
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
message.
line enter.
hello world.
# 컨슈머 메시지 받기 
# 이제 새창에서 위 프로듀서로 메세지를 전송하면 실시간으로 받는다.
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
message.
line enter.
hello world.

hello
hi
asdf
asdf
as
df
as
df


hhahaha
======================================



실행 오류시

실행시 오류가 나온다면 JDK 버젼 및 64bit 여부 확인 

또는 JAVA_HOME 설정 (.profile) 이 되어 있는지 확인한다.




다음은 데이터 생산 스크립트를 실행하는 예다.


kafka-producer-perf-test.sh --topic test_topic \  

--throughtput 700 \

--record-size 2000 \

--num-recored 200000 \

--producer-props \

bootstrap.servers=kafka01.server:9092,kafka02.server:9092  



다음은 데이터 소비 스크립트를 실행하는 예다.

kafka-consumer-perf-test.sh \  

--zookeeper kafka01.server:2181,kafka02.server:2181 \

--messages 200000 \

--topic test_topic \

--threads 2 \

--show-detailed-stats \

--broker-list kafka01.server:9092,kafka02.server:9092 \

--group test_group



참고 

kafka-producer-perf-test.sh 파일과 kafka-consumer-perf-test.sh 파일을 옵션 없이 실행하면 스크립트 사용 방법을 확인할 수 있다.



토픽명 alter 도 가능.

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count


+ Recent posts