https://community.hortonworks.com/articles/42027/rdd-vs-dataframe-vs-sparksql.html



특정 Spark 데이터 처리 작업 부하에 대해 RDD 대 DataFrame 대 SparkSQL의 성능 비교


결론 :  데이터 프레임이 짱. 데이터셋은 진보중.


RDD가 DataFrames 및 SparkSQL보다 우수한 성능을 발휘했다고 나오는데 DataFrame 및 SparkSQL 테스트를 위해 RDD 에서 데이터 프레임으로 변환하는 과정때문이라고 Comment 로 지적. databricks 에서 SparkSQL csv reader 를 사용했다면 비교가 다를 것이라고 지적. 



아파치 문서마다 DataFrame에 RDD 보다 뛰어난 메모리와 쿼리 옵티마이져가 있기 때문에 데이터 프레임이 빠르다.

파일이 json 이라면 바로 데이터 프레임으로 읽을 수 있으면 RDD 보다 높은 성능을 보일 것...



http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/



RDD 주된 단점은 자바 직렬화 사용. (대부분 kryo 를 빠른 대안으로 사용)


데이터 프레임은 Spark 1.3 에서 Spark 성능과 확장성을 개선하고자 project Tungsten 이니셔티브 의 일부로 새로운 데이터프레임 API 를 도입했다.

데이터 프레임은 스키마 개념을 도입하여 자바 직렬화를 사용하는 것보다 spark의 스키마를 관리하고 노드간에 데이터를 전달하는 방식을 훨씬 더 효율적으로 허용한다.

Spark 가 바이너리 형식의 오프힙저장소로 데이터를 직렬화 한다음 오프 힙메모리에서 직접 많은 변환을 수행하여 개별 개체를 구성하는 것과 관련된 가비지 수집 비용을 피할 수 있으므로 단일 프로세스에서 계산을 수행할 때 장점이 있다.(데이터를 인코딩하기 위해 자바 직렬화를 사용할 필요가 없음)


자바로 DataFrame 또는 Dataset API 를 채택하기전에 스칼라로 전환하는 것을 고려해야 한다.

자바로 작성을 해야 하면 bean 을 완벽히 준수 해야 한다.

스칼라 + Spark 1.6.0 을 사용하여 코드를 제작하는 경우 DataFrame API 가 가장 안정적인 옵션이며 현재 가장 우수한 성능 제공

데이터를 직렬화 할때 Dataset API 는 JVM 표현(객체)와 Spark 의 내부 바이너리 형식 사이를 변환하는 인코더 개념을 가지고 있다. Spark 에는 바이트 코드를 생성하여 오프 힙 데이터와 상호 작용하고 전체 개체를 비 직렬화 할 필요없이 개별 특성에 대한 필요시 액세스를 제공한다는 점에서 매우 진보된 기본 제공 인코더가 있다.




databrics 사에서 DataFrame 소개

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html


오늘, 우리는 더 많은 사람들이 큰 데이터 처리를 더욱 쉽게 할 수 있도록 고안된 새로운 DataFrame API를 발표하게 된 것을 기쁘게 생각합니다.




위의 차트는 실행중인 그룹 단위의 런타임 성능을 단일 시스템 ( 소스 코드 https://gist.github.com/rxin/c1592c133e4bccf515dd ) 에서 1,000 만 개의 정수 쌍으로 비교합니다 . Scala와 Python DataFrame 연산은 모두 JVM 바이트 코드로 컴파일되어 실행되기 때문에 두 언어간에 차이가 거의 없으며 바닐라 Python RDD 변형은 5 배, Scala RDD 변형은 2 배의 성능을 보입니다.


Spark 1.3 의 일부로 출시 

Spark 1.6 에서 안정화?

https://spark.apache.org/releases/spark-release-1-6-0.html

http://www.itworld.co.kr/news/97513

Spark 2.0 에서 많은 발전...

'오픈소스 > Spark' 카테고리의 다른 글

[Spark] SQL and DataFrame  (0) 2017.04.10
[Spark] Overview  (0) 2017.01.25

스파크 SQL


소개


각 레코드의 필드 구성에 대해 모두 알고 있을 경우 스파크 SQL 을 더 쉽고 효과적으로 불러오고 쿼리를 쓸 수 있다.


스파크 SQL 주요 기능 3가지

1. DataFrame 추상화 클래스 제공

2. 다양한 구조적 포맷의 데이터를 읽고 쓸 수 있다 (JSON, Hive, 파케이(parquet)

3. 내부 JDBC/ODBC 나 외부 Tableau (태블루) 같은 BI 툴 등을 써서 스파크 SQL 을 통해 SQL 로 데이터를 질의 할 수 있다.


SchemaRDD

Row 객체의 RDD 

각 아이템은 Record 를 의미

RDD 와 유사해 보이지만 내부적으로는 좀더 효율적인 방법으로 저장하고 Scheme 이점 활용한다

RDD 에서 제공되지 않는 Operation 도 제공

DataFrame은 이전버젼(1.0~1.2)에서 쓰이는 SchemaRDD가 진화한 것

SchemaRDD 로 코딩되어 있어도 1.3이상에서는 DataFrame으로 쓰는것과 같다.

Dataset 은 SQL 의 최적화된 실행 엔진의 이점을 이용하여 RDD (강력한 타이핑, 람다기능)의 이점을 제공하는 Spark 1.6 에서 추가된 새로운 인터페이스.


그외

1. 스파크 SQL 은 하둡의SQL 엔진인 apache Hive 를 포함하거나 또는 포함하지 않고서 빌드가 가능하다.

2. 하이브 의존성 충돌은 주의 해야한다. spark-hive_2.10, spark-sql_2.10

-> SQLContext 는 스파크 SQL의 기능을 제공하지만 하이브에 의존하지는 않는다.

3. HiveQL 은 스파크 SQL 을 쓰기위해 추천하는 질의 언어




구조


그림1

아래는 스파크의 전체 구조입니다.

우리는 본 문서에서 SQL 에 대해서 살펴볼 예정입니다. 

아래 그림2에서 Spark SQL 구조에 대해서 알아봅시다.


그림2

스파크 SQL 의 적용

Hive, JSON, Parquet 등으로 수집된 Spark SQL 을 태블루 등이 JDBC 등을 이용하여 질의 하는 내용이다.



그림3

다음 그림은 스파크SQL의 다른 세부 구조를 보여줍니다.



즉 API, RDD, DATA Source 가 아키텍쳐에 포함되어 있다.



DataFrame

1 큰 클러스터 단일 노드 클러스터에서 페타 바이트에 킬로바이트 크기의 데이터를 처리 할 수 있는 능력.

2 다른 데이터 형식 (아 브로, CSV, 탄성 검색, 카산드라) 및 스토리지 시스템 (HDFS, HIVE 테이블, MySQL은, 등)을 지원합니다.

3 스파크의 SQL 촉매 최적화 (트리 변환 프레임 워크)을 통해 기술 최적화 및 코드 생성의 주.

4 쉽게 스파크 코어를 통해 모든 빅 데이터 툴과 프레임 워크와 통합 할 수 있습니다.

5 파이썬, 자바, 스칼라, 및 R 프로그래밍을위한 API를 제공합니다.



Databricks 의 데이터 프레임 소개

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html



 DataFrame은 JSON 파일, Parquet 파일, 하이브 테이블을 비롯하여 가장 많이 사용되는 형식의 데이터를 읽는 것을 지원합니다. JDBC를 통해 로컬 파일 시스템, 분산 파일 시스템 (HDFS), 클라우드 스토리지 (S3) 및 외부 관계형 데이터베이스 시스템에서 읽을 수 있습니다. 또한 Spark SQL의 외부 데이터 소스 API 를 통해 DataFrames를 확장하여 타사 데이터 형식이나 소스를 지원할 수 있습니다. 기존의 타사 확장 프로그램에는 Avro, CSV, ElasticSearch 및 Cassandra가 이미 포함되어 있습니다.

대규모 데이터 과학을위한 Spark의 DataFrames 소개

데이터 소스에 대한 DataFrames의 지원으로 응용 프로그램은 서로 다른 소스의 데이터 (데이터베이스 시스템에서 페더 레이 티드 쿼리 처리라고 함)를 쉽게 결합 할 수 있습니다.예를 들어, 다음 코드 스 니펫은 S3에 저장된 사이트의 텍스트 트래픽 로그를 PostgreSQL 데이터베이스와 조인하여 각 사용자가 사이트를 방문한 횟수를 계산합니다.


users = context.jdbc(“jdbc:postgresql:production”, users”)
logs = context.load(“/path/to/traffic.log”)
logs.join(users, logs.userId == users.userId, left_outer”) \
.groupBy(“userId”).agg({“*”: count”})

DataFrame에 대한 계산이 시작되기 전에 Catalyst Optimizer 는 DataFrame을 구축하는 데 사용 된 작업을 실제 계획으로 컴파일하여 실행합니다. 옵티마이 저는 조작의 의미 및 데이터 구조를 이해하므로 계산 속도를 높이기 위해 지능적인 결정을 내릴 수 있습니다.



DataFrame 성능





예제>

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

scala> val dfs = sqlContext.read.json("employee.json")

scala> dfs.show()

scala> dfs.select("name").show()

scala> dfs.groupBy("age").count().show()



참조

1 http://spark.apache.org/docs/latest/sql-programming-guide.html

2 Lerning Spark 서적

3 http://www.w3ii.com/ko/spark_sql/spark_sql_quick_guide.html



'오픈소스 > Spark' 카테고리의 다른 글

Spark RDDs vs DataFrames vs SparkSQL  (0) 2017.06.18
[Spark] Overview  (0) 2017.01.25


공식 문서 : http://spark.apache.org/docs/latest/index.html


Apache Spark는 빠르고 일반적인 범용 클러스터 컴퓨팅 시스템입니다. 

Java, Scala, Python 및 R의 고급 API와 일반 실행 그래프를 지원하는 최적화 된 엔진을 제공합니다. 또한 SQL 및 구조화 된 데이터 처리를위한 Spark SQL, 기계 학습을위한 MLlib, 그래프 처리를위한 GraphX 및 Spark Streaming을 비롯한 다양한 고급 도구 세트를 지원합니다. 

아래 그림에서 자세히 알아봅니다.


특징

- 인메모리

- 단일 시스템에서 배치/스트림 처리, SQL, ML Graph 프로세싱 지원

- 자바 스칼라 파이션 등 인터페이스 제공

- Strandalone 은 물론 YARN, mesos 등의 클러스터 리소스 관리 패키지를 통해 다양한 환경에서 구동가능

- 8000 개 이상의 노드 추가 가능 확장성 확보

- HDFS, 카산드라, 흐브, S3 등 다양한 데이터의 활용 가능



Spark SQL - SQL / HiveQL 같은 쿼리 작업이 가능하게 해주는 라이브러리

RDD - 본 문서에서 다룸

DataFrame / DataSet - 테이블과 비슷한 형태로 Rows objects 들의 집합

- DataFrame/DataSet 은 다양한 input source 를 지원

- 존재하던 RDD

- JSON

- HiveQL

- ODBC / JCBC 서버 연동

 


RDD, DataFrame 뭐가 좋나?

https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html


Spark RDDs vs DataFrames vs SparkSQL

https://community.hortonworks.com/articles/42027/rdd-vs-dataframe-vs-sparksql.html 


이 내용은 별도 페이지로 정리하도록 한다.

http://dadk.tistory.com/74



Spark 구성요소




아파치 스파크 코어

스파크 코어 다른 모든 기능이에 구축 스파크 플랫폼의 기본 일반 실행 엔진입니다. 이는 인 메모리 컴퓨팅 및 외부 스토리지 시스템에서 참조하는 데이터 세트를 제공합니다.


스파크 SQL

스파크 SQL 구조화 및 반 구조화 된 데이터에 대한 지원을 제공 SchemaRDD라는 새로운 데이터 추상화를 소개 스파크 코어의 상단에 구성 요소입니다.


스파크 스트리밍

스파크 스트리밍 스트리밍 분석을 수행하는 코어의 빠른 스케줄링 기능을 스파크 활용합니다. 이 미니 일괄 적으로 데이터를 섭취 및 데이터의 그 미니 배치에 RDD (탄력 분산 데이터 집합) 변환을 수행한다.


MLlib (기계 학습 라이브러리)

MLlib는 스파크 위 때문에 분산 메모리 기반 스파크 아키텍처의 분산 기계 학습 프레임 워크입니다. 그것은, 벤치 마크에 따르면, 교류 최소 제곱 (ALS) 구현에 대한 MLlib 개발자에 의해 수행된다. 스파크 MLlib은 (두싯는 스파크 인터페이스를 얻은 전) 구 배 빨리 아파치 머하 웃의 하둡 디스크 기반 버전입니다.


GraphX

GraphX??는 스파크의 상단에 분산 그래프 처리 프레임 워크입니다. 이 프레 겔 추상화 API를 사용하여 사용자 정의 그래프를 모델링 할 수 그래프 연산을 표현하기위한 API를 제공한다. 또한이 추상화에 최적화 된 런타임을 제공합니다.


Spark Application 개발은 아래의 3가지에 대한 이해를 바탕으로 시작

1. RDD

2. RDD 변환 API (Scala / Python / java / R interface)

3. RDD 변환 API 로 만들어진 SQL / ML / graph / stream Library (Scala / python.. 상동)



1 성능

저장방식 : 메모리 Read-only

성능이 좋다. 기존 Hadoop, HadoopBM 과 비교해도 빠르다.

머신이 늘어나도 성능이 유지된다

기존에는

HDFS + MapReduce 하둡, SQL 질의를 위한 Hive, Oozie, 하둡->대량데이터 전송을 위한 Sqoop 등을 써야 했다.

하둡 API 자체도 어려웠고 접근이 더 어려움.


2 RDD (Resilient Distributed Datasets)

핵심 : RAM 으로 적재하면서 Read-only 로 처리

- 막말로 Spark 는 RDD + Interface 라 할 수 있다.

- 두가지 논문이 근간을 이룸

GFS

스탠포드 = 정석

사골 (읽어볼 수록 새로움)

RDD

버클리 = 개념원리

라면 (후루룩 읽힘)

- 배열 다루듯이 하면 된다. RDD 를 다룬다는건 컬렉션을 다룬다는 뜻.

- RDD 를 데이터를 생성하는 두가지 방법

* Parallelized 

sc.parallelize(Array(1,2,3,4,5))

* External Datasets 

sc.textFile("data.txt")

2-1 RDD 에 대해서 좀더 알아보자

RDD는 병렬로 작동 할 수있는 클러스터 노드간에 분할 된 요소 모음입니다. 

RDD는 Hadoop 파일 시스템 (또는 다른 Hadoop 지원 파일 시스템)이나 드라이버 프로그램의 기존 Scala 컬렉션에서 시작하여 파일을 변환하여 만들어집니다. 

사용자는 Spark에 메모리에 RDD를 유지하도록 요청하여 병렬 작업에서 효율적으로 재사용 할 수 있습니다. 

마지막으로, RDD는 노드 장애로부터 자동으로 복구됩니다. 

다음문서에서 Spark Shell 기반으로 RDD 핸들링 하는 방법과 관련 사항을 알아보도록 하겠다.

http://dadk.tistory.com/67

2-2 Dataset / DataFrames

RDD 보다 Dataset (dataframes) 이 더 빠르다

https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

(RDD 와 Dataset 은 별도 포스팅으로 알아보도록 한다.)


3. DStream (Discretized Streams)

시간 흐름에 따른 순차적 데이터를 의미 내부적으로 DStream 은 각 시점에 RDD 스퀸스이다.

Flume, Kafka 또는 HDFS 와 같은 많은 input 유형으로 부터 생성될 수 있다.

Dtream 두가지 Operation

Transformation

    DStream 을 생성(파생)

    operation 은 두가지로 구분 할 수 있다.

        Stateless

            이전 Batch 결과 데이터에 영향을 받지 않는 데이터를 처리

            map(), filter(), reduceByKey() 같은 Operation 들

        Stateful

            이전 Batch 결과 데이터를 현재 Batch 결과를 만들기 위해 사용

            Sliding Window 나 기간내 상태 주적 등이 있다.

Output

    외부 데이터 저장소에 결과 저장


4 RDD 두가지 액션

- transformations 

데이터를 개발자가 원하는 모양으로 변형시키는 것

(map, reduce, join) 데이터의 이합집산, 지지고 볶는 흐름 단순히 map, reduce 만 있던 MR 보다 명령어 풍부

Trasfomation 의 결과는 항상 RDD 이다.

- Action 

모든 trasformations 결과를 내 높아라

trans->trans->....trans->action

액션을 실행하는 순간 데이터 변형(Transfomation)을 얻음(?) 결과는 value

Action 을 실행해야 Job 이 나온다.


Operation 의 순서를 기록해 DAG 로 표현한 것을 Lineage 라 부른다.

하나의 RDD 는 여러개의 파티션으로 나뉜다.


5. Spark Streaming

실시간 데이터 스트림의 확장 성, 높은 처리량, 내결함성 스트림 처리를 가능하게하는 핵심 Spark API의 확장입니다. 

http://spark.apache.org/docs/latest/streaming-programming-guide.html


Kafka, Flume, Kinesis 또는 TCP 소켓과 같은 많은 소스에서 데이터를 수집 할 수 있으며 map, reduce, join 및 window와 같은 고급 함수로 표현 된 복잡한 알고리즘을 사용하여 데이터를 처리 할 수 있습니다. 마지막으로 처리 된 데이터를 파일 시스템, 데이터베이스 및 라이브 대시 보드로 푸시 할 수 있습니다. 실제로 Spark의 기계 학습 및 그래프 처리 알고리즘을 데이터 스트림에 적용 할 수 있습니다.



6. Spark Application 처리 구조

Driver Program - main() 함수를 가지고 있는 프로세스 (SparkContext 객체를 생성하고 RDD 를 전달)

Application을 task 라고 불리는 실제 수행 단위로 변환을 task를 묶어서 Worker Node의 Excutor 로 전달

Worker Node - 실제 작업을 수행하는 노드 

Cluster Manager - 클러스터에서 필요한 자원을 찾아줌

Excutor 
- Task 를 수행하는 프로세스 Executor 가 오류가 나면 대체 Executor 에게 Job 할당 (멀티 스레드에서 tasks 를 수행하고 결과를 Driver Program 에게 전송) 
- Cache 하는 RDD 를 저장하기 위한 메모리 공간 제공
Task - 익스큐터에 할당되는 작업의 단위
Job - 사용자 입장에서의 작업의 단위 (Task 의 조합)


7. Spark Application 실행동작 순서 흐름
1 사용자 Spark-submit 을 사용해 작성 어플 실행
2 Submit 은 Driver Program 을 실행하여 main() 호출
3 Driver 에서  생성된 SparkContext 는 Cluster Manager 로 부터 Executor 실행을 위한 리소스 요청 
4 Cluster Manager 는 Excutor 를 실행
5 Driver Program 은 Application을 Task 단위로 나누어 Excutor 에게 전송
6 Executor 는 Task 를 실행
7 Executor 는 Application 이 종료되면 결과를 Driver Program 에게 전달하고 클러스터 매니저에게 리소스 반납.



참조할만한 문서

Spark SQL for SQL

    Hadoop의 Hive가 아닌 Spark SQL을 통해 SQL을 MapReduce없이 빠르게 처리가 가능합니다. 

    http://spark.apache.org/docs/latest/sql-programming-guide.html


Structed data processing

    Json, Parquet 등의 다양한 struced data processing을 지원합니다.

    

MLlib for machine learning

    Classification, Regression, Abnormal Detection, Clustering 등의 다양한 machine learning algorithm을 제공합니다. 

    http://spark.apache.org/docs/latest/ml-guide.html


GraphX for graph processing

    graph processing을 지원하는 GraphX를 제공합니다. 

    http://spark.apache.org/docs/latest/graphx-programming-guide.html


Spark Streaming.

    streaming처리가 가능한 storm처럼 spark에서도 batch processing 외에도 streaming처리가 가능합니다.

    http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

    http://spark.apache.org/docs/latest/streaming-programming-guide.html


Launching on a Cluster

    Sparks를 클러스터에서 동작하게 하기 위해서는 cluster manager가 필요합니다.


Cluster Manager 종류 

    Amazon EC2

    Standalone Deploy Mode

    Apache Mesos

    Hadoop Yarn


Download

  스파크는 binary, build 버전을 받을 수 있습니다. [다운로드] 만약 binary를 다운로드 받으신다면, Hadoop버전에 맞는 package type을 선택하셔야 합니다. 저 같은 경우에는 hadoop2.6의 hdfs와 yarn위에서 동작시키기 위해 Hadoop 2.6 prebuilt된 버전을 다운로드 했습니다. 

wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.1-bin-hadoop2.6.tgz




그외


1. Spark 실행

스파크는 Windows, UNIX (e.g. Linux, Mac OS)에서 동작이 가능합니다. Spark를 사용하기 위해서는 사전 설치해야하는 항목들이 있습니다. Spark는 JVM위에서 동작하기 때문에 JAVA는 필수입니다. Java 7+, Python 2.6+, R 3.1+, Scala를 사용하기 위해서는 2.10을 설치해야 합니다.


2 Running the Examples and Shell

Spark를 다운로드 받으면 $SPARK_HOME/examples/src/main의 경로에 언어별로 간단한 예제를 제공하고 있습니다. 


3 Interactive Shell 지원

$SPARK_HOME/bin/pyspark --master local[2]

python interative shell도 제공을 합니다.

$SPARK_HOME/bin/spark-submit examples/src/main/python/pi.py 10

실행 결과는 Pi is roughly 3.140176


당연히 Scala 도 가능

$SPARK_HOME/bin/run-example SparkPi 10

interactive scala shell을 실행할 수 있습니다.  


3-1 master local[2] 옵션

$SPARK_HOME/bin/spark-shell --master local[2]

--master local[2] 옵션은 spark를 2개의 worker threads로 locally 실행한다의 의미 입니다. (--help로 확인 가능)더 자세한 내용은 아래 참고하세요. 

https://spark.apache.org/docs/latest/submitting-applications.html#master-urls



4. R 언어지원

Spark 1.4부터 R을 제공합니다.

$SPARK_HOME/bin/sparkR --master local[2]

$SPARK_HOME/bin/spark-submit examples/src/main/r/dataframe.R



참조문서

https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html






'오픈소스 > Spark' 카테고리의 다른 글

Spark RDDs vs DataFrames vs SparkSQL  (0) 2017.06.18
[Spark] SQL and DataFrame  (0) 2017.04.10



Kafka 란?

http://kafka.apache.org/
http://kafka.apache.org/intro
http://kafka.apache.org/documentation/


설치

spark 와 마찬가지로 다운받아 별도 설치 없이 데몬을 띄움.

별다른 설정 없이 카프카 서버의 실행, 메세지 테스트를 진행합니다.

서버에 Java 1.7.x 이상이 설치되어 있어야 합니다.



구조


구조

Kafka는 발행-구독(publish-subscribe) 모델을 기반으로 동작하며 크게 producer, consumer, broker로 구성된다.

확장성 / 고가용성을 위해 브로커들을 클러스터로 구성 동작.


기본 동작

producer 특정 topic 메시지를 생성 전달 -> Broker 분산처리 (Zookepper) Topic 별로 적재(Topic 은 파티션별로 쪼개져 각서버에 분산) -> 해당 Topic 을 구독하는 Consumer 가 메시지를 가져가서 처리


설명

Producers

    - 메시지 송신 API

    - 특정 Topic에 해당하는 메시지를 생성하는 프로세스, 메시지를 Broker에 전달(발행/Publish)

    - Producers는 데이터를 그들이 선택한 Topic으로 publish한다.

    - Producer가 메시지를 실제로 어떤 partition으로 전송할지는 사용자가 구현한 partition 분배 알고리즘에 의해 결정된다. 예를 들어 라운드-로빈 방식의 partition 분배 알고리즘을 구현하여 각 partition에 메시지를 균등하게 분배하도록 하거나, 메시지의 키를 활용하여 알파벳 A로 시작하는 키를 가진 메시지는 P0에만 전송하고, B로 시작하는 키를 가진 메시지는 P1에만 전송하는 형태의 구성도 가능하다.

Consumer

    - 메시지 수신 API

    - Broker에게서 구독(Subscribe)하는 Topic의 메시지를 가져와 사용(처리)하는 프로세스

    - Topic 당 할당된 스레드 개수만큼 스레드가 만들어지면 Partition 으로부터 메세지를 읽음.

    - 하나의 스레드는 1개 이상 partition 으로 부터 메시지를 읽을 수 있다.

    - public void run 메소드 내 while(is.hasNext())에서 블록킹돼 있다가 파티션으로 메시지가 들어오면 이 곳에서 메시지를 읽는다. 따라서 다른 타겟으로 메시지를 처리하는 데 적합한 장소라고 할 수 있다. 즉 메시지를 파일로 저장하던지 대용량 입력이 가능한 하둡이나 NoSQL로 저장하기에 유용하다.

Broker

    - 토픽을 기준으로 메시지 관리

    - broker 는 클러스터로 구성 (이에 대한 분산 처리는 zookeeper 가 처리)

    - Producers와 Consumers가 만날 수 있도록 메시지를 관리하는 서버 클러스터로 Producer에게서 전달받은 메시지를 Topic별로 분류한다. 

    - 여러대의 Broker Cluster로 구성 가능하며, Zookeeper에 의해 각 노드가 모니터링 된다.



그외

Topic

    - 발행(Publish)된 메시지들의 category

    - 유사한 메시지들의 집합이다. 프로듀서는 메시지를 전달할 토픽을 반드시 지정해야 한다.

    - partition 단위로 클러스터 각 서버들에 분산 저장

    - 각 partition 은 0부터 1씩 증가하는 offset 값을 메시지에 부여 (partition 내 메시지 식별)

    - 클러스터내 메시지들은 설정된 기간동안 유지 후 삭제됨 

Log 

    Producers가 생성한 메시지

Consumer API

    - 컨슈머 구현에 두가지 API 제공 Simple Consumer API, High-Level Consumer API  

    - 세부적인 것들은 모두 추상화되어 있어 몇 번의 간단한 함수 호출로 consumer를 구현할 수 있는 High Level Consumer API

    - offset과 같은 세부적인 부분까지 다룰 수 있지만 이 때문에 구현하기가 상당히 까다로운 Simple Consumer API가 제공된다(이름은 simple이지만 전혀 simple하지 않다).

리플리케이션(Replication) 

    - Fault Tolerance 위해 파티션 단위로 복제한다.

Partition

    - 로드밸런싱을 목적으로 토픽을 논리적으로 분할하는 것을 의미한다.



기존의 메시징과 차이점

1 기존 broker -> consumer 로 push 카프카 반대로 pull 로 땡겨옴.

2 이로 인해 필요한 메시지만 broker 에서 가져오므로 최적의 성능. 그리고 pull 방식이라 batch 처리 구현 가능.

3 카프카는 파일로 저장 연속성 상승 (대용량에 적합) - HDD 의 순차읽기는 SSD에 7배 정도만 느리다.

4 AMQP 프로토콜이나 JMS API를 사용하지 않고 단순한 메시지 헤더를 지닌 TCP기반의 프로토콜을 사용하여 오버헤드 감소


기본 클러스터 구조도





기본 예제

# 토픽생성
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".


# log stash 에서 들어올 토픽도 하나 생성해 둔다. 
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logstash_logs
logstash_logs

참고로 토픽 삭제시 --delete 옵션을 주면 된다.

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 토픽이름

# 토픽리스트
# bin/kafka-topics.sh --list --zookeeper localhost:2181
logstash_logs
test
# 프로듀서 메시지 생성
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
message.
line enter.
hello world.
# 컨슈머 메시지 받기 
# 이제 새창에서 위 프로듀서로 메세지를 전송하면 실시간으로 받는다.
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
message.
line enter.
hello world.

hello
hi
asdf
asdf
as
df
as
df


hhahaha
======================================



실행 오류시

실행시 오류가 나온다면 JDK 버젼 및 64bit 여부 확인 

또는 JAVA_HOME 설정 (.profile) 이 되어 있는지 확인한다.




다음은 데이터 생산 스크립트를 실행하는 예다.


kafka-producer-perf-test.sh --topic test_topic \  

--throughtput 700 \

--record-size 2000 \

--num-recored 200000 \

--producer-props \

bootstrap.servers=kafka01.server:9092,kafka02.server:9092  



다음은 데이터 소비 스크립트를 실행하는 예다.

kafka-consumer-perf-test.sh \  

--zookeeper kafka01.server:2181,kafka02.server:2181 \

--messages 200000 \

--topic test_topic \

--threads 2 \

--show-detailed-stats \

--broker-list kafka01.server:9092,kafka02.server:9092 \

--group test_group



참고 

kafka-producer-perf-test.sh 파일과 kafka-consumer-perf-test.sh 파일을 옵션 없이 실행하면 스크립트 사용 방법을 확인할 수 있다.



토픽명 alter 도 가능.

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count


+ Recent posts