Reference

1. Storm-starter : https://github.com/apache/storm/tree/master/examples/storm-starter

2. Storm : https://github.com/apache/storm

3. Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm : http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/


소개

회사 업무로 Apache Storm(이하 Storm)과 관련된 프로젝트를 완료하고 휴식중에, Storm 기초를 설명할 수 있을만한 예제를 만들어보기 위해서 Storm Starter를 참조하여 간단한 프로젝트를 만들었다.

이 프로젝트는 Twitter Sample Public Status API(https://dev.twitter.com/streaming/reference/get/statuses/sample)를 사용하여 Twitter realtime stream data의 일부를 Input으로 하고, HashTag 정보를 추출한 후 일정 시간 간격(emit frequency)으로 일정 시간 동안(window length)의 일정 갯수(TOP_N)의 Top HashTag를 생성하여 출력하는 프로젝트이다.

Storm-starter project에서 많은 소스코드를 가져 왔으며 Twitter Library는 Twitter4J를 사용한다.

Project source : https://github.com/forcemax/storm_twitter_hashtag


실행하기

0. Prerequisites

Java 1.7 이상, Storm 0.9.5, Maven, Git

(Twitter API를 사용하기 위한 consumerKey, consumerSecret, accessToken, accessTokenSecret을 변경하지 않으면 실행이 안된다.)

1. 소스 가져오기

git clone https://github.com/forcemax/storm_twitter_hashtag

2. 소스 빌드하기

$ mvn clean package

3. Storm Cluster에 Topology submit

$ storm jar StormTwitterHashtag-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.embian.forcemax.twitter.StormTwitterHashtagTopologyRunner server

4. Storm UI에서 확인하기



그림. topology 목록



그림. topology 상세 정보


설명

- Rolling Count Algorithm은 Reference 3번 사이트를 참조

- Topology는 다음과 같이 구성되어 있다. Twitter API를 사용하기 위해서는 Twitter에 App 등록을 해야하며, App 등록을 하면 consumerKey, consumerSecret, accessToken, accessTokenSecret 값을 얻을 수 있다. 다음 코드에 해당 값을 넣어서 사용한다.



그림. Topology 구성도

- TwitterSpout은 Twitter4J Library를 사용하며, LinkedBlockingQueue를 사용해서 새로운 Public Status가 있을때 Queue에 저장한다. nextTuple() 호출시에 Queue에서 꺼내서 ExtractHashTagBolt에 넘긴다.

- ExtractHashTagBolt는 받은 Public Status에서 HashTag만 뽑아내서, RollingCountBolt로 넘긴다. HashTag의 갯수 만큼 emit이 발생한다.

- RollingCountBolt는 생성할 때 인자로 받은 window length와 emit frequency 값을 바탕으로, emit frequency마다 window length에 속하는 데이터에서 word별 count를 계산해서 IntermediateRankingsBolt로 넘긴다. 이때, emit frequency마다 emit을 하기 위해서 TickTuple을 사용하는데, TickTuple은 Storm 0.8에 새로 들어간 기능이며 Component(Spout, Bolt)내에서 일정 주기 마다 Tuple을 발생시키는 기능이다.

IntermediateRankingsBolt와 TotalRankingsBolt는 생성할때 인자로 topN, emit frequency를 받으며, 입력된 word별 count를 바탕으로 상위 topN개의 word와 count를 뽑아내고 emit frequency 마다 emit한다. IntermediateRankingsBolt는 parallelism hint를 크게 주어 map-reduce 구조에서 map의 역할을 하고, TotalRankingsBolt는 parallelism hint를 1로 주고 reduce의 역할을 한다. emit frequency마다 emit을 하기 위해서 RollingCountBolt와 마찬가지로 TickTuple을 사용한다.

- 마지막으로 PrinterBolt는 TotalRankingsBolt에서 emit한 Tuple을 출력하기 위해서 사용하며 특별한 기능은 없다.


이 프로젝트는 Storm의 Spout(nextTuple과 open), Bolt(execute, prepare)와 Topology wiring에 대한 이해만 있다면 코드를 보는데 아무 무리가 없을 정도로 간단한 예제이다. 그러나 외부 서비스(Twitter)과의 연계를 통한 Spout 구성, Atomic하게 역할을 분리한 Bolt, parallelism hint를 조절하여 성능을 향상시키는 방법을 확인해 보기에 알맞은 예제이다.

Twitter Sample Public Status API는 10분에 20000 statuses 정도의 데이터밖에 제공하지 않으므로, 한대의 Storm에서 처리하기에 충분하다.

초보자가 알아보기 쉽게 코드가 구성되어 있으니, Storm을 이용하여 realtime CEP 엔진을 공부하려는 분들에게 많은 도움이 되었으면 한다.


posted by forcemax

Posted by 알 수 없는 사용자
,



Apache Kafka 성능 테스트


조석현(zoe@embian.com)
Business Developer / Consultant at Embian Inc.



Apache KafkaLinkedIn에서 자신들의 내부 데이터 처리를 위해 개발한 DPMS(Distributed Processing Message System)입니다. 전통적 메세지 큐 시스템(Message Queue System)과는 다르게 Apache Kafka는 Broker Cluster를 여러 대의 Machine으로 구성하여 분산처리(Distributed Processing)가 가능하다는 장점을 가지고 있습니다. Big Data시장의 발전과 함께 가장 주목받고 있는 Queue System이기도 합니다.

본 포스팅에서는 Apache Kafka의 주요 특징을 살펴보고 분산 처리 성능을 확인하기 위한 성능 테스트와 결론을 정리해 보았습니다.



Index

1. About Apache Kafka

2. Performance Test

3. Conclusion




1. About Apache Kafka

Apache KafkaLinkedIn의 Legacy System인 ActiveMQSplunk를 대체하기 위하여 개발된 Message Queue System입니다. LinkedIn은 2009년 서비스 가입자가 폭증하며 자신들의 내부 데이터 처리 과정에서 한계를 느끼고, 그들만의 Messaging System을 개발하게 되었습니다. 현재 LinkedIn에서 사용자들에게 잘 알려진 PYMK(People You May Know)등의 서비스가 대표적인 적용사례입니다. 해외에서는 Twitter, Netflix, Tumblr, Foursquare 등의 기업에서 Apache Kafka를 도입하여 사용하고 있습니다.

Kafka는 개발과정에서 Open Source로 관리되었으며 Apache Incubator를 지나 현재는 Apache 정식 프로젝트로 등록되어 관리되고 있습니다. 게시물 작성일 기준으로 0.8.0 버젼이 Stable Release입니다.



1.1. Kafka's Features

Apache Kafka는 LinkedIn에서 설계 시점부터 Big Data를 염두에 두고 Kafka를 설계하였습니다. Kafka 공식 홈페이지에서 자랑하는 Kafka의 주요 특징은 아래와 같습니다.


  • Scala/Java로 작성되어 있다.
  • 분산처리(Distributed Processing)가 가능하다.
  • Log Data를 디스크에 기록한다.
  • Publish-Subscribe구조로 되어 있다.
  • 확장 가능(Scalablility)하다.
  • 처리량이 높다.(High Throughput)
  • 응답속도가 빠르다.(Low Latency)


Apache Kafka의 가장 큰 특징은 확장 가능(Scalablitiy)이라고 할 수 있습니다. 그리고 Log Data를 메모리가 아닌 디스크로 저장함으로서 In-Memory기반의 메세지 큐에 비해 Data의 안정성을 높여주며, Message Queue의 역할과 Log Aggregation의 역할을 함께 할 수 있다는 특징이 있습니다. 



1.2. Kafka Architecture

Apache Kafka는 전통적 메세지 큐 방식인 Publish-Subscribe의 구조로 구성되어 있습니다. Producer는 Message를 Broker로 발행(Publish)하고, Consumer는 Broker에서 메세지를 구독(Consume)하는 형태입니다. 각 Broker는 여러 대의 Broker Cluster로 구성 가능하며, Zookeeper에 의해 각 노드가 모니터링됩니다. 기존 메세지 큐와 가장 큰 차이점은 여러 개의 Partition으로 메세지가 분산처리된다는 점입니다.


Apache Kafka에서 제공하는 Kafka의 기본 컨셉은 아래의 그림과 같습니다.


Apache Kafka의 Basic Concept과 Architecture, 사용법 등의 내용들은 향후 Posting할 예정입니다.




2. Performance Test


2.1. 테스트 배경 및 목적

Kafka를 도입하려는 많은 기업의 경영자와 엔지니어들은 Kafka를 도입하기 위하여 Kafka의 가치를 검토할 필요가 있습니다. End-User에게 보다 좋은 서비스를 제공하기 위하여, Application의 환경 또한 중요한 요소이며, Kafka 도입을 통해 기업이 얻는 Benefits을 판단하는 것은 중요한 Decision Point입니다.

앞서 언급한대로, Kafka는 Scale Out을 통한 분산처리가 핵심이라고 할 수 있습니다. Kafka Broker Cluster를 구성하였을때, Broker 수가 증가함에 따라 Throughput이 얼마나 증가하는지, Kafka Cluster Set의 최대 처리량은 언제 어떤 시점에서 나타나는지, Log를 기록하는 디스크의 성능에 따라 얼마나 성능이 차이나는지 확인할 필요가 있습니다.


"그래서 Apache Kafka는 대체 성능이 얼마나 좋은건데??"


이를 위하여 성능측정의 가이드라인이 필요하다고 판단하였고, 저 역시도 기술스택 도입 이전의 검토단계로서 "Apache Kafka의 성능이 얼마나 되는 것인가?" 라는 막연한 질문에서 출발하여 자료를 찾아보기 시작했습니다. Kafka의 성능 수치에 대한 검색결과는 내부 측정 결과와 다소 차이가 있었고, 환경 구성이나 배경에 대한 설명이 부족하여 검색결과를 신뢰하기 어려워 직접 Performance Test를 수행하게 되었습니다.


Performance Test의 목적은 아래와 같이 정의하였습니다.


 

1. Broker 수 증가에 따른 Kafka의 분산처리 성능 확인

 2. Kafka의 Network 대역폭 한계 Throughput 관찰

 3. SSD vs HDD의 Throughput 비교




2.2. 수행 테스트 목록

수행 테스트는 아래 3가지로 정의하였으며,
Producer - Broker구간으로 한정하여 테스트를 수행하였습니다.



2.3. 테스트 툴 선택 / 평가기준

Kafka에서는 Benchmark Test를 위해 기본적으로 Performance 측정 도구를 제공합니다.

Kafka를 설치하면 {$KAFKA_HOME}/bin에 kafka-producer-perf-test.sh 파일과 kafka-consumer-perf-test.sh 파일을 확인할 수 있는데, 이 파일을 실행하여 Performance Test를 수행할 수 있습니다. 테스트 범위가 Producer-Broker 구간이므로, kafka-producer-perf-test.sh 쉘 스크립트를 이용하여 테스트를 수행하였습니다.


개별 메세지를 전송할 때 평가 기준은 처리량(Throughput)과 응답시간(Latency)을 기준으로 최초 정의하였으나, Kafka에서 제공하는 테스트 도구에서는 응답시간을 지원하지 않고 Producer - Broker구간에서는 응답속도(Latency)가 필요하지 않아 평가 기준에서는 배제하였습니다.


평가기준 : Throughput(초당 처리량)

   - 단위 : MegaByte/second




2.4. 테스트 시나리오 및 수행방법

  Test 1. Broker 수 증가 테스트(SSD)

Producer Machine을 3대로 고정하고, Broker Machine의 대수를 1대부터 3대까지 증가시키며 Throughput의 변화를 확인합니다. 각 Broker의 Storage는 SSD를 장착하였습니다.


  Test 2. Broker 수 증가 테스트(HDD)

Producer Machine을 3대로 고정하고, Broker Machine의 대수를 1대부터 3대까지 증가시키며 Throughput의 변화를 확인합니다. 각 Broker의 Storage를 HDD로 교체하여 장착하였습니다.


  Test 3. 최대 처리량 측정 테스트

Producer Machine과 Broker Machine의 대수를 동시에 1대부터 5대까지 증가시키며 Throughput의 변화를 확인합니다. 각 Broker의 Storage는 최대 성능을 고려하여 SSD로 교체하여 장착하였습니다.

* Producer - Broker 구성이 5대 - 5대 로 구성되어야 하나, 장비 제약으로 인해 4대 - 5대 구성으로 대체하였습니다.



2.5. 테스트 환경구성

테스트를 위한 Network 구성과 Machine 상세는 아래 그림과 같습니다.


* Broker Machine의 Storage는 테스트 방법에 따라 SSD type과 HDD type을 사용하였습니다.



각 Machine에 설치된 OS/Application은 아래와 같습니다.

OS : Ubuntu Desktop 12.04( Precise Pangolin) 64bit

Java : Oracle Java 1.7.0_51-b13

Kafka : Apache Kafka 0.8.0 Release(src)

Network Monitor Support Tool : sar, htop 1.0.1


Zookeeper의 환경설정( {$KAFKA_HOME}/config/zookeeper.properties )과 Broker( {$KAFKA_HOME}/config/server.properties )의 환경설정은 설치시 Default Setting으로 테스트를 수행했습니다.

Kafka Official Site에서는 Zookeeper Cluster를 독립 Machine에서 사용할 것을 권고하고 있으나, 테스트 장비의 한계상 Broker Machine과 함께 구성하였습니다.



2.6. 테스트 제한사항

Broker의 수를 증가시키기 위해 예비 테스트를 수행해 본 결과 간단한 설정값이 Broker 성능에 큰 영향을 미칠 수 있음을 확인했습니다. 객관성있는 테스트를 위해 서버의 설정값과 테스트 변수들은 고정하였습니다.


본 테스트에 사용된 모든 Producer와 Broker Machine에서는 Kafka 0.8.0 Stable Release 버젼을 이용하여 테스트를 수행하였습니다.


Broker의 환경설정 파일은 아래 내용과 같습니다.


[{$KAFKA_HOME}/config/server.properties]

broker.id=1

port=9092

num.network.threads=2

num.io.threads=2

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=1

log.flush.interval.messages=10000

log.flush.interval.ms=1000

log.retention.hours=168

log.segment.bytes=536870912

zookeeper.connection.timeout.ms=1000000

zk.connect=zookeeper:2181



Kafka에서 제공하는 Performance 도구는 아래 명령으로 실행하였습니다.


 

bin/kafka-producer-perf-test.sh --broker-list=embian4:9092 --messages 20000000 --topic test --message-size 800 --threads 24 --batch-size 200 --compression-codec 0 --request-num-acks 0 --message-send-gap-ms 0 --hide-header



여기서 중요한 설정은 producer.type인데 async로 설정하였습니다. 명령어에서는  producer.type에 관한 내용이 생략되어 있으며, 옵션이 없을시 async로 동작합니다.

비동기 모드(async)는 Throughput 측면에서 Kafka의 성능을 최대한 활용할 수 있어 필수적입니다. 이 모드에서 각 Producer는 Broker에게 chunk 단위로 메세지를 보내기 위하여 In-memory queue를 가지며, 미리 설정된 Batch Size나 Time Interval에 도달하면 batch로 메세지를 전송합니다. 이는 압축을 훨씬 더 효율적으로 만들며, 네트워크도 더 효율적으로 사용할 수 있는 장점이 있습니다.


알려진 Use-Case에 따르면 Topic의 수가 증가함에 따라 Throughput의 변화가 없었기 때문에 구성된 Broker Cluster에서 topic은 1개로 지정하였고, Kafka는 Broker Cluster에 Replication 기능을 지원하여 데이터의 안정성과 신뢰성을 보장할 수 있지만, 모든 메세지에 대해 Broker로부터 답신 응답(acknowledgement response)를 기다리게 됩니다. 이에 따라, Throughput의 효율성 측면에서는 처리속도가 느려진다는 단점이 있어 본 테스트에서는 Replication 기능을 사용하지 않았습니다.


본 포스팅에 언급되지 않은 모든 기타 설정들은 Default Setting으로 설정하였습니다.



2.7. 테스트 결과


- Test 1. Broker 수 증가에 따른 Throughput (SSD)


Broker의 수가 증가함에 따라 Throughput이 증가하는 모습을 확인하였습니다.

Broker가 1대에서 2대로 증가시 약 33%의 Throughput이 증가하였고, 2대에서 3대로 증가시 약 28% Throughput이 증가하고 있습니다.



- Test 2. Broker 수 증가에 따른 Throughput(HDD)


SSD를 사용할때보다 Throughput이 확연히 낮아진 모습을 보여주고 있습니다. 하지만, Broker의 수가 증가함에 따라 Throughput이 증가하는 모습을 보여줍니다.

Broker가 1대에서 2대로 증가시 약 75% Throughput이 증가하였고, 2대에서 3대로 증가시 약 26% Throughput이 증가하였습니다.



Kafka Official Site에서는 디스크의 IO성능이 Kafka Broker의 성능에 큰 영향을 미칠 수 있음을 언급하고 있습니다. 앞서 수행한 Test 1과 Test 2를 비교하여 SSD와 HDD를 장착하였을 경우의 성능을 비교해 본 그래프는 아래와 같습니다.


SSD 사용시 HDD 대비 최대 2.7배 Throughput이 향상되는 것을 확인했습니다.



 - Test 3. 최대 처리량 측정 테스트


Producer 4대 / Broker 5대의 구성에서 초당 최대 처리량인 129.7MB/sec를 측정하였습니다.





3. Conclusion


위의 테스트를 통해 얻은 수치에 근거하여 얻은 결론은 아래와 같습니다.


1. Broker 수 증가에 따라 Throughput이 증가한다(1대->2대 : 33%, 2대->3대: 28%, SSD 사용시).

2. SSD를 사용하는 것이 HDD를 사용하는 것보다 우수한 성능을 보인다(최대 2.7배).

3. Producer 4대 / Broker 5대의 구성에서부터 Gigabit 대역폭에 가까운 Throughput을 보인다(129.7MB/sec).



기본적으로 Broker 수 증가에 따른 Scaling이 이루어지는 모습을 확인할 수 있었습니다. 다만, Broker Machine의 수를 증가시키는 비율대로 Throughput이 향상되지는 않고 있어서(1대->2대: 33% 증가, 2대->3대: 28% 증가, SSD 사용시), Broker Machine의 추가에 따른 비용 대비 성능비는 그리 높지 못하다고 보여집니다. 
따라서 Broker Machine의 무조건적인 추가보다는 topic 별로 Broker Machine을 분리하여 운영하는 방식이 더 효율적이라고 생각합니다.


Kafka는 기본적으로 Sequential Access를 사용합니다. 이 때문에, 일반적으로 HDD에 비해 2배정도 Disk IO가 좋은 것으로 알려진 SSD가 최대 2.7배 높은 성능을 보여주고 있습니다. Kafka 도입시 Machine의 Disk의 성능에 대해 충분히 고려해야 합니다.


최대 Throughput 측정에 있어서는 장비 구성의 물리적 제약 때문에 Saturation Point를 측정하지는 못하였습니다.  다만 Producer 4대와 Broker 5를 사용하여 총 9대의 장비를 구성하였을 경우, 129.7MB/sec가 측정되어 Gigabit 대역폭에 가까운 Throughput을 확인하였습니다.




※ Embian은 본 포스팅에 대한 Feedback을 환영합니다. 만약 포스팅 내용에 대한 문의사항이 있으시다면, 블로그 하단 Comment 또는 이메일(zoe@embian.com)을 통해 연락주시면 Feedback을 드리도록 하겠습니다.




[Reference]

1. Apache Kafka Official Site http://kafka.apache.org/

2. Kafka: A Distributed Messaging System for Log Processing, Jay Kreps, Neha Narkhede, Jun Rao from LinkedIn, at NetDB workshop 2011

3. Building LinkedIn’s Real-time Activity Data Pipeline, Ken Goodhope, Joel Koshy, Jay Kreps, Neha Narkhede, Richard Park, Jun Rao, Victor Yang Ye

4. KAFKA 0.8 PRODUCER PERFORMANCEPiotr Kozikowski, Apr 8 2013

5. Performance testingAdded by Jay Kreps, last edited by John Fung on Jan 29, 2013

6. Tom's Hardware - Benchmark Results: Sequential Read/WriteBy Patrick Schmid and Achim Roos

Posted by 알 수 없는 사용자
,