최근 회사에서 storm kafka에 대해 공부하면서 작은 미니 프로젝트를 했는데 공부했던 경험을 정리도 할겸 포스팅을 한다.

먼저 storm과 kafka에 대해 간단히 알아보도록 하자. 


storm 


storm은 실시간 분산 처리 시스템이고, 방대한 양에 데이터 스트림을 안정적으로 처리한다. storm은 실시간 분석, 머신러닝 등에 사용된다. storm 클러스터는 Hadoop 클러스터와 표면적으로 유사한데 Hadoop에서  "MapReduce job"을 실행하는 반면에, storm은 "topology"를 실행시킨다.  "jobs" 와 "topology"는 매우 다른데 한가지 핵심적으로 다른 점은 MapReduce job 은 결국 끝나게 되지만 토폴로지는 kill하지 않는 이상 계속 message를 처리한다. 

storm에 대한 좀 더 자세한 설명은 링크링크2 를 확인해 보기 바란다.








kafka


kafka는 LinkedIn에서 자신들의 내부 데이터 처리를 위해 개발한 Distriubted Processing Message System이다. 전통적인 메세지큐시스템과는 다르게 Apache Kafka는 Broker Cluster를 여러 대의 Machine으로 구성하여 분산처리가 가능하다는 장점을 가지고 있다. Big Data시장의 발전과 함께 가장 주목받고 있는 Queue System이기도 하다. 

Kafka에 대한 좀 더 자세한 설명은 링크를 확인해 보기 바란다.  

아래의 사이트도 설명이 잘 되어있다.

http://epicdevs.com/17 

http://kafka.apache.org/documentation.html#introduction




storm kafka에 대한 간단한 소개를 마쳤으니, 이제 내가 했던 작은 프로젝트에 대해 간단히 설명하고 프로젝트에 대한 step 하나 하나 설명해 보도록 하겠다. 


storm kafka 미니 프로젝트 


producer에서 로그 파일을 읽어  message를 broker(연습 용이므로 broker는 하나만 사용한다)로 publish하고  storm spout에서 consume하여 bolt에서 처리하도록 한는것이 프로젝트 목표이다.




로그 파일의 log는 다음과 같은 형태로 되어 있다고 가정한다.

@timestamp : 2015-11-10T15:32:06.046+09:00; doctype : sns; key : 974cfc83-99e0-420e-bfd1-2262e4e82dbd; appid : com.facebook.katana; appversion : 48


개발환경은 ubuntu 14.04를 기반으로 한다. 


step 1. kafka 준비 (kafka 다운로드 페이지)


다운 받은 kafka 앞축을 풀고 설치된 kafka디렉토리로 이동한다. 


(1)zookeeper 서버 실행 (zookeepr에 대한 자세한 설명은 여기 링크를 참조 하기 바란다.  )

$ bin/zookeeper-server-start.sh config/zookeeper.properties


(2)kafka서버 실행 

$ bin/kafka-server-start.sh config/server.properties


(3)topic 만들기 

kafka의 broker는 topic이라는 기준으로 메시지를 관리한다. producer에서 특정 topic의 메시지를 생성 한 후 broker에 전달하면 broker는 전달받은 메시지를 topic별로 분류하여 쌓아놓는다.  "onlytest"라는 이름의 topic을 생성하도록 하겠다. 

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic onlytest



step 2. producer 준비 



log 파일을 읽어 message를 send하는 간단한 코드이다. 추가 properties 설정에 대한 자세한 사항은 이 링크에서 확인해 보기 바란다.


import java.io.File;

import java.util.Properties;

import org.apache.commons.io.input.Tailer;

import org.apache.commons.io.input.TailerListenerAdapter;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


public class TestProducer {

private static final int SLEEP = 500;

public kafka.javaapi.producer.Producer<String,String> producer;

public void setConfig(){

Properties properties = new Properties();

       properties.put("metadata.broker.list","localhost:9092"); // broker list 필수!

       properties.put("serializer.class","kafka.serializer.StringEncoder"); //메시지를 serialize할때 사용하는 인코더

       ProducerConfig producerConfig = new ProducerConfig(properties);

       producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

}

public static void main(String[] args) throws InterruptedException{

      

       TestProducer testProducer = new TestProducer();

       testProducer.setConfig();

       testProducer.run();

   }

private void run() throws InterruptedException {

OnlyLogListenter onlyLogListenter = new OnlyLogListenter(producer);

       Tailer tailer = Tailer.create(new File("your file path"), onlyLogListenter,SLEEP);

       while(true){

        Thread.sleep(SLEEP);

       }

   }

public class OnlyLogListenter extends TailerListenerAdapter{

kafka.javaapi.producer.Producer<String,String> producer;

public OnlyLogListenter(kafka.javaapi.producer.Producer<String,String> producer){

this.producer = producer;

}

@Override

public void handle(String line){

System.err.println(line);

KeyedMessage<String, String> message =new KeyedMessage<String, String>("onlytest",line);

    producer.send(message);

}

}

}

 

step 3. storm 준비 


(1) storm 다운받기 

storm은 요기 링크에서 다운로드 받으면 된다.(source code가 아닌 release 버전으로 다운로드 한다.) 나는 0.9.5버전을 다운로드 받았다.압축을 풀고 해당 디렉토리로 이동한다. 

나의 경우는 local에서 production cluster 모드로 테스트 하려고 하기 때문에  conf 디렉토리에 storm.yaml을 수정하도록 하겠다.

$ vi ~스톰 디렉토리/conf/storm.yaml


(2) storm.yaml 파일 수정하기 

아래와 같은 항목을 storm.yaml 파일에 추가하도록 하자  nimbus라는 것을 발견할 수 있는데 이에 대한 설명은 다음 링크에서 확인해 볼수 있다.

#zookeeper 서버 설정. local에서 production cluster모드로 테스트 해볼 것이니 실제 아이피 주소로 셋팅하자.

storm.zookeeper.servers:

      - 192.168.0.11


#nimbus host 및 seed 설정. local에서 production cluster모드로 테스트 해볼 것이니 실제 아이피 주소로 셋팅하자.

nimbus.host: 192.168.0.11

nimbus.seed: "192.168.0.11"


#storm local dir 설정 

storm.local.dir: "storm local dir path "


#storm ui  

ui.port : 8087


위와 같이 파일을 수정하고 저장한다. 


step 4. storm topology 만들기  


kafka broker에 전달된 메세지를 spout에 consume하여 bolt에서 처리 하도록 하는 코드이다. 




(1) zookeeper url을 설정한다. 

storm을 local cluster 모드로 테스트 해보자고 한다면 zookeeper 는 "localhost:2181"이 될것이다. 만약에 production cluster모드로 local에서 테스트 해보자고 한다면 "127.0.0.1:2181" 이 아닌 자신의 "실제  ip 주소 : 2181"로 셋팅하면 된다. 

String zkUrl = "zookeeper url:2181";


(2)kafkaspout설정을 해준다. 

아래 코드에 대해 좀더 자세히 알아 보고 싶으면 다음 링크에서 확인해 볼수 있다.

ZkHosts hosts = new ZkHosts(zkUrl);

SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


(3)spout과 bolt설정을 해준다. 

shuffleGrouping,fieldsGrouping 이라고 보이는데 이것을 stream grouping이라하고 토폴로지에  두개의 컴포넌트사이에서 어떻게 튜플을 send할지 알려주는 것을 말한다. 여기서 쓴 shuffleGrouping은 튜플을 무작위로 동일한 비율로 나눠서 볼트에 task를 할당하는 것이고, fieldsgrouping은 튜플에 있는 필드 값을 기준으로 파티셔닝되어 각 볼트 task에 튜플을 할당한다.  그 외 다른 stream grouping을 살펴보고 싶다면 다음 링크에서 확인해 볼 수 있다. 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", kafkaSpout, 1);

builder.setBolt("cutbolt", new CutLogBolt(), 8).shuffleGrouping("spout");

builder.setBolt("classifybolt", new ClassifyKeyBolt(), 8).fieldsGrouping("cutbolt",new Fields("key","doctype"));

builder.setBolt("docbolt", new DoctypeCountBolt(), 8).fieldsGrouping("classifybolt",new Fields("subdoctype"));


(4) nimbus host,storm local dir 등등을 설정해준다.

local mode와 cluster mode를 제량에 따라 설정할 수 있는데 나는 local에서 production cluster mode로 실행 하려고 했기 때문에 아래와 같이 설정했다. 

//=============================

// local mode

//=============================

// LocalCluster cluster = new LocalCluster();

// cluster.submitTopology("log-stat", conf, builder.createTopology());

// Thread.sleep(10000);

// cluster.shutdown();

//=============================

// cluster mode

//=============================

conf.put(Config.NIMBUS_HOST, "nimbus url");

conf.put(Config.STORM_LOCAL_DIR,"your storm local dir");

conf.put(Config.NIMBUS_THRIFT_PORT,6627);

conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[] {"zookeeper url"}));

// conf.setNumWorkers(20);

// conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());


아래는 최종 topology 코드이다. 

import java.util.ArrayList;

import java.util.Arrays;

import java.util.List;

import java.util.UUID;


import soeun.storm.kafka.bolt.ClassifyKeyBolt;

import soeun.storm.kafka.bolt.CutLogBolt;

import soeun.storm.kafka.bolt.DoctypeCountBolt;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;


public class StormKafakaSimpleTopology {

  

   public static void main(String[] args) throws Exception {


       String zkUrl = "zookeeper url:2181";        // zookeeper url

       String brokerUrl = "localhost:9092";


       if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) {

           System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]");

           System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]");

           System.exit(1);

       } else if (args.length == 1) {

           zkUrl = args[0];

       } else if (args.length == 2) {

           zkUrl = args[0];

           brokerUrl = args[1];

       }


       System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl);


       ZkHosts hosts = new ZkHosts(zkUrl);

       SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());

       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", kafkaSpout, 1);

builder.setBolt("cutbolt", new CutLogBolt(), 8).shuffleGrouping("spout");

builder.setBolt("classifybolt", new ClassifyKeyBolt(), 8).fieldsGrouping("cutbolt",new Fields("key","doctype"));

builder.setBolt("docbolt", new DoctypeCountBolt(), 8).fieldsGrouping("classifybolt",new Fields("subdoctype"));

Config conf = new Config();

conf.setDebug(true);

List<String> nimbus_seeds = new ArrayList<String>();

nimbus_seeds.add("nimbus url");


if (args != null && args.length > 0) {

conf.setNumWorkers(3);


StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

}

else {


//=============================

// local mode

//=============================

// LocalCluster cluster = new LocalCluster();

// cluster.submitTopology("log-stat", conf, builder.createTopology());

// Thread.sleep(10000);

// cluster.shutdown();

//=============================

// cluster mode

//=============================

conf.put(Config.NIMBUS_HOST, "nimbus url");

conf.put(Config.STORM_LOCAL_DIR,"your storm local dir");

conf.put(Config.NIMBUS_THRIFT_PORT,6627);

conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[] {"zookeeper url"}));

// conf.setNumWorkers(20);

// conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());


}

}

}


CutLogBolt.java

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class CutLogBolt extends BaseBasicBolt{

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String[] splitArray = input.getString(0).split(";");

String key = "";

String doctype = "";

for(int i = 0; i < splitArray.length; i++){

if(splitArray[i].contains("key"))

key  = splitArray[i];

if(splitArray[i].contains("doctype"))

doctype = splitArray[i];

}

collector.emit(new Values(key,doctype));

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("key","doctype"));

}


}


ClassifyKeyBolt.java

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class ClassifyKeyBolt extends BaseBasicBolt{


@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String[] splitdoctype = input.getStringByField("doctype").split(":");

String[] splitkey = input.getStringByField("key").split(":");

if(splitkey.length == 2 && splitdoctype.length == 2){

String doctype  = splitdoctype[1].trim();

String key  = splitkey[1].trim();

// System.err.println(key + ":" + doctype);

collector.emit(new Values(key + ":" + doctype));

}

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("subdoctype"));

}

}


DoctypeCountBolt.java 

import java.util.HashMap;

import java.util.Map;


import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class DoctypeCountBolt extends BaseBasicBolt {

Map<String,Integer> docMap = new HashMap<String,Integer>();

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String doctype = input.getStringByField("subdoctype");

Integer count = docMap.get(doctype);

if(count == null)

count = 0;

count++;

docMap.put(doctype, count);

System.out.println(docMap);

collector.emit(new Values(docMap));

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("docmap"));

}

}


(5) pom.xml 


pom.xml 설정할 때 주의 할 것은 storm과 kafka의 버전의 따른 dependency 를 꼭 확인해 보고 pom.xml에 추가해야 한다는 것이다.

현재 나는 storm 버전이 0.9.5이기 때문에 

storm-core : 0.9.5

storm-kafka : 0.9.5

앞서 설치한 kafka는  kafka_2.10_0.8.1 이기 때문에 버전에 맞게 설정했다. 버전에 맞지 않으면 엄청난 삽질을 하게 된다. 

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

 <modelVersion>4.0.0</modelVersion>

 <groupId>com.soeun.storm</groupId>

 <artifactId>ministorm</artifactId>

 <version>0.0.1-SNAPSHOT</version>

 <packaging>jar</packaging>

 <name>ministorm</name>

 <url>http://maven.apache.org</url>

 <properties>

   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

 </properties>

<dependencies>

 <dependency>

   <groupId>junit</groupId>

   <artifactId>junit</artifactId>

   <version>4.11</version>

   <scope>test</scope>

 </dependency>

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-core</artifactId>

<version>0.9.5</version>

<scope>provided</scope>

</dependency>

 <dependency>

     <groupId>org.apache.storm</groupId>

     <artifactId>storm-kafka</artifactId>

     <version>0.9.5</version>

   </dependency>

   <dependency>

<groupId>org.testng</groupId>

<artifactId>testng</artifactId>

<version>6.8</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.mockito</groupId>

<artifactId>mockito-all</artifactId>

<version>1.9.0</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.1</version>

<exclusions>

       <exclusion>

         <groupId>org.apache.zookeeper</groupId>

         <artifactId>zookeeper</artifactId>

       </exclusion>

       <exclusion>

               <groupId>com.sun.jmx</groupId>

               <artifactId>jmxri</artifactId>

           </exclusion>

           <exclusion>

               <groupId>com.sun.jdmk</groupId>

               <artifactId>jmxtools</artifactId>

           </exclusion>

           <exclusion>

               <groupId>javax.jms</groupId>

               <artifactId>jms</artifactId>

           </exclusion>

     </exclusions>

</dependency>

</dependencies>

<build>

 <plugins>

   <plugin>

     <artifactId>maven-assembly-plugin</artifactId>

     <version>2.2.1</version>

     <configuration>

       <descriptorRefs>

         <descriptorRef>jar-with-dependencies</descriptorRef>

       </descriptorRefs>

       <archive>

         <manifest>

           <mainClass />

         </manifest>

       </archive>

     </configuration>

     <executions>

       <execution>

         <id>make-assembly</id>

         <phase>package</phase>

         <goals>

           <goal>single</goal>

         </goals>

       </execution>

     </executions>

   </plugin>

 </plugins>

</build>

</project>


(6) maven install 


앞에 코드가 작성되어있는 프로젝트 디렉토리에 이동한후 maven clean install해준다.  

$ mvn clean install


target폴더에  ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar 파일이 생긴 것을 확인 할 수 있을 것이다. 


step 5. storm nimbus,supervisor,ui 실행 


스톰이 설치 되어 있는 디렉토리로 이동하여 nimbus,supervisor,ui를 실행 시킨다. 

$ bin/storm nimbus


$ bin/storm supervisor


$ bin/storm ui


ui를 실행시키면 http://localhost:8087로 접근하여 topology 상황을 웹으로 확인해 볼 수 있다. 



step 6.  ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar summit 


ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar 를 아래와 같이 실행 시킨다 


$ bin/storm jar ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar {main class path} 


ex)

bin/storm jar ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar soeun.storm.kafka.topology.StormKafakaSimpleTopology 


정상적으로 submit 하면 다음과 같은 메세지가 출력된다.

..............


[main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: onlytest


http://localhost:8087로 접근하면 아래와 같은 화면이 나올것이다 Topology summary에 "onlytest"라는 이름이 있으면 성공한 것이다.




step 7.  test

 

이제 위에서 작성하였던 TestProducer를 실행시켜 "onlytest"라는 topic의 메세지가 broker에 전송하여 storm worker 로그에 잘 찍히는지 확인해 보자 log는 storm디렉토리/logs/worker-.. .log에서 확인해 볼수 있다. 

$ tail -f worker-{}.log 

 또한 storm ui 에 Topology Visualization에서도 확인해 볼 수 있다. 




예제 소스는 다음 링크에 존재한다. 


-끝-


p.s storm kafka에 대한 좀더 자세한 사항을  알아보고자 한다면 다음 레퍼런스를 참고하기 바란다. 


참고 레퍼런스:

http://storm.apache.org/index.html

https://storm.apache.org/documentation/Setting-up-development-environment.html

http://bcho.tistory.com/995

https://github.com/apache/storm/blob/master/docs/documentation/Tutorial.md

https://github.com/apache/storm/tree/master/examples/storm-starter

https://storm.apache.org/documentation/Tutorial.html

Posted by 알 수 없는 사용자
,