'주니어 개발자'에 해당되는 글 2건

  1. 2015.11.26 주니어 개발자의 storm kafka 시작하기 2
  2. 2013.11.27 주니어 개발자의 첫걸음

최근 회사에서 storm kafka에 대해 공부하면서 작은 미니 프로젝트를 했는데 공부했던 경험을 정리도 할겸 포스팅을 한다.

먼저 storm과 kafka에 대해 간단히 알아보도록 하자. 


storm 


storm은 실시간 분산 처리 시스템이고, 방대한 양에 데이터 스트림을 안정적으로 처리한다. storm은 실시간 분석, 머신러닝 등에 사용된다. storm 클러스터는 Hadoop 클러스터와 표면적으로 유사한데 Hadoop에서  "MapReduce job"을 실행하는 반면에, storm은 "topology"를 실행시킨다.  "jobs" 와 "topology"는 매우 다른데 한가지 핵심적으로 다른 점은 MapReduce job 은 결국 끝나게 되지만 토폴로지는 kill하지 않는 이상 계속 message를 처리한다. 

storm에 대한 좀 더 자세한 설명은 링크링크2 를 확인해 보기 바란다.








kafka


kafka는 LinkedIn에서 자신들의 내부 데이터 처리를 위해 개발한 Distriubted Processing Message System이다. 전통적인 메세지큐시스템과는 다르게 Apache Kafka는 Broker Cluster를 여러 대의 Machine으로 구성하여 분산처리가 가능하다는 장점을 가지고 있다. Big Data시장의 발전과 함께 가장 주목받고 있는 Queue System이기도 하다. 

Kafka에 대한 좀 더 자세한 설명은 링크를 확인해 보기 바란다.  

아래의 사이트도 설명이 잘 되어있다.

http://epicdevs.com/17 

http://kafka.apache.org/documentation.html#introduction




storm kafka에 대한 간단한 소개를 마쳤으니, 이제 내가 했던 작은 프로젝트에 대해 간단히 설명하고 프로젝트에 대한 step 하나 하나 설명해 보도록 하겠다. 


storm kafka 미니 프로젝트 


producer에서 로그 파일을 읽어  message를 broker(연습 용이므로 broker는 하나만 사용한다)로 publish하고  storm spout에서 consume하여 bolt에서 처리하도록 한는것이 프로젝트 목표이다.




로그 파일의 log는 다음과 같은 형태로 되어 있다고 가정한다.

@timestamp : 2015-11-10T15:32:06.046+09:00; doctype : sns; key : 974cfc83-99e0-420e-bfd1-2262e4e82dbd; appid : com.facebook.katana; appversion : 48


개발환경은 ubuntu 14.04를 기반으로 한다. 


step 1. kafka 준비 (kafka 다운로드 페이지)


다운 받은 kafka 앞축을 풀고 설치된 kafka디렉토리로 이동한다. 


(1)zookeeper 서버 실행 (zookeepr에 대한 자세한 설명은 여기 링크를 참조 하기 바란다.  )

$ bin/zookeeper-server-start.sh config/zookeeper.properties


(2)kafka서버 실행 

$ bin/kafka-server-start.sh config/server.properties


(3)topic 만들기 

kafka의 broker는 topic이라는 기준으로 메시지를 관리한다. producer에서 특정 topic의 메시지를 생성 한 후 broker에 전달하면 broker는 전달받은 메시지를 topic별로 분류하여 쌓아놓는다.  "onlytest"라는 이름의 topic을 생성하도록 하겠다. 

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic onlytest



step 2. producer 준비 



log 파일을 읽어 message를 send하는 간단한 코드이다. 추가 properties 설정에 대한 자세한 사항은 이 링크에서 확인해 보기 바란다.


import java.io.File;

import java.util.Properties;

import org.apache.commons.io.input.Tailer;

import org.apache.commons.io.input.TailerListenerAdapter;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


public class TestProducer {

private static final int SLEEP = 500;

public kafka.javaapi.producer.Producer<String,String> producer;

public void setConfig(){

Properties properties = new Properties();

       properties.put("metadata.broker.list","localhost:9092"); // broker list 필수!

       properties.put("serializer.class","kafka.serializer.StringEncoder"); //메시지를 serialize할때 사용하는 인코더

       ProducerConfig producerConfig = new ProducerConfig(properties);

       producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);

}

public static void main(String[] args) throws InterruptedException{

      

       TestProducer testProducer = new TestProducer();

       testProducer.setConfig();

       testProducer.run();

   }

private void run() throws InterruptedException {

OnlyLogListenter onlyLogListenter = new OnlyLogListenter(producer);

       Tailer tailer = Tailer.create(new File("your file path"), onlyLogListenter,SLEEP);

       while(true){

        Thread.sleep(SLEEP);

       }

   }

public class OnlyLogListenter extends TailerListenerAdapter{

kafka.javaapi.producer.Producer<String,String> producer;

public OnlyLogListenter(kafka.javaapi.producer.Producer<String,String> producer){

this.producer = producer;

}

@Override

public void handle(String line){

System.err.println(line);

KeyedMessage<String, String> message =new KeyedMessage<String, String>("onlytest",line);

    producer.send(message);

}

}

}

 

step 3. storm 준비 


(1) storm 다운받기 

storm은 요기 링크에서 다운로드 받으면 된다.(source code가 아닌 release 버전으로 다운로드 한다.) 나는 0.9.5버전을 다운로드 받았다.압축을 풀고 해당 디렉토리로 이동한다. 

나의 경우는 local에서 production cluster 모드로 테스트 하려고 하기 때문에  conf 디렉토리에 storm.yaml을 수정하도록 하겠다.

$ vi ~스톰 디렉토리/conf/storm.yaml


(2) storm.yaml 파일 수정하기 

아래와 같은 항목을 storm.yaml 파일에 추가하도록 하자  nimbus라는 것을 발견할 수 있는데 이에 대한 설명은 다음 링크에서 확인해 볼수 있다.

#zookeeper 서버 설정. local에서 production cluster모드로 테스트 해볼 것이니 실제 아이피 주소로 셋팅하자.

storm.zookeeper.servers:

      - 192.168.0.11


#nimbus host 및 seed 설정. local에서 production cluster모드로 테스트 해볼 것이니 실제 아이피 주소로 셋팅하자.

nimbus.host: 192.168.0.11

nimbus.seed: "192.168.0.11"


#storm local dir 설정 

storm.local.dir: "storm local dir path "


#storm ui  

ui.port : 8087


위와 같이 파일을 수정하고 저장한다. 


step 4. storm topology 만들기  


kafka broker에 전달된 메세지를 spout에 consume하여 bolt에서 처리 하도록 하는 코드이다. 




(1) zookeeper url을 설정한다. 

storm을 local cluster 모드로 테스트 해보자고 한다면 zookeeper 는 "localhost:2181"이 될것이다. 만약에 production cluster모드로 local에서 테스트 해보자고 한다면 "127.0.0.1:2181" 이 아닌 자신의 "실제  ip 주소 : 2181"로 셋팅하면 된다. 

String zkUrl = "zookeeper url:2181";


(2)kafkaspout설정을 해준다. 

아래 코드에 대해 좀더 자세히 알아 보고 싶으면 다음 링크에서 확인해 볼수 있다.

ZkHosts hosts = new ZkHosts(zkUrl);

SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


(3)spout과 bolt설정을 해준다. 

shuffleGrouping,fieldsGrouping 이라고 보이는데 이것을 stream grouping이라하고 토폴로지에  두개의 컴포넌트사이에서 어떻게 튜플을 send할지 알려주는 것을 말한다. 여기서 쓴 shuffleGrouping은 튜플을 무작위로 동일한 비율로 나눠서 볼트에 task를 할당하는 것이고, fieldsgrouping은 튜플에 있는 필드 값을 기준으로 파티셔닝되어 각 볼트 task에 튜플을 할당한다.  그 외 다른 stream grouping을 살펴보고 싶다면 다음 링크에서 확인해 볼 수 있다. 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", kafkaSpout, 1);

builder.setBolt("cutbolt", new CutLogBolt(), 8).shuffleGrouping("spout");

builder.setBolt("classifybolt", new ClassifyKeyBolt(), 8).fieldsGrouping("cutbolt",new Fields("key","doctype"));

builder.setBolt("docbolt", new DoctypeCountBolt(), 8).fieldsGrouping("classifybolt",new Fields("subdoctype"));


(4) nimbus host,storm local dir 등등을 설정해준다.

local mode와 cluster mode를 제량에 따라 설정할 수 있는데 나는 local에서 production cluster mode로 실행 하려고 했기 때문에 아래와 같이 설정했다. 

//=============================

// local mode

//=============================

// LocalCluster cluster = new LocalCluster();

// cluster.submitTopology("log-stat", conf, builder.createTopology());

// Thread.sleep(10000);

// cluster.shutdown();

//=============================

// cluster mode

//=============================

conf.put(Config.NIMBUS_HOST, "nimbus url");

conf.put(Config.STORM_LOCAL_DIR,"your storm local dir");

conf.put(Config.NIMBUS_THRIFT_PORT,6627);

conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[] {"zookeeper url"}));

// conf.setNumWorkers(20);

// conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());


아래는 최종 topology 코드이다. 

import java.util.ArrayList;

import java.util.Arrays;

import java.util.List;

import java.util.UUID;


import soeun.storm.kafka.bolt.ClassifyKeyBolt;

import soeun.storm.kafka.bolt.CutLogBolt;

import soeun.storm.kafka.bolt.DoctypeCountBolt;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;


public class StormKafakaSimpleTopology {

  

   public static void main(String[] args) throws Exception {


       String zkUrl = "zookeeper url:2181";        // zookeeper url

       String brokerUrl = "localhost:9092";


       if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) {

           System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]");

           System.out.println("   E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]");

           System.exit(1);

       } else if (args.length == 1) {

           zkUrl = args[0];

       } else if (args.length == 2) {

           zkUrl = args[0];

           brokerUrl = args[1];

       }


       System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl);


       ZkHosts hosts = new ZkHosts(zkUrl);

       SpoutConfig spoutConfig = new SpoutConfig(hosts, "onlytest", "/onlytest", UUID.randomUUID().toString());

       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", kafkaSpout, 1);

builder.setBolt("cutbolt", new CutLogBolt(), 8).shuffleGrouping("spout");

builder.setBolt("classifybolt", new ClassifyKeyBolt(), 8).fieldsGrouping("cutbolt",new Fields("key","doctype"));

builder.setBolt("docbolt", new DoctypeCountBolt(), 8).fieldsGrouping("classifybolt",new Fields("subdoctype"));

Config conf = new Config();

conf.setDebug(true);

List<String> nimbus_seeds = new ArrayList<String>();

nimbus_seeds.add("nimbus url");


if (args != null && args.length > 0) {

conf.setNumWorkers(3);


StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

}

else {


//=============================

// local mode

//=============================

// LocalCluster cluster = new LocalCluster();

// cluster.submitTopology("log-stat", conf, builder.createTopology());

// Thread.sleep(10000);

// cluster.shutdown();

//=============================

// cluster mode

//=============================

conf.put(Config.NIMBUS_HOST, "nimbus url");

conf.put(Config.STORM_LOCAL_DIR,"your storm local dir");

conf.put(Config.NIMBUS_THRIFT_PORT,6627);

conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[] {"zookeeper url"}));

// conf.setNumWorkers(20);

// conf.setMaxSpoutPending(5000);

StormSubmitter.submitTopology("onlytest", conf, builder.createTopology());


}

}

}


CutLogBolt.java

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class CutLogBolt extends BaseBasicBolt{

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String[] splitArray = input.getString(0).split(";");

String key = "";

String doctype = "";

for(int i = 0; i < splitArray.length; i++){

if(splitArray[i].contains("key"))

key  = splitArray[i];

if(splitArray[i].contains("doctype"))

doctype = splitArray[i];

}

collector.emit(new Values(key,doctype));

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("key","doctype"));

}


}


ClassifyKeyBolt.java

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class ClassifyKeyBolt extends BaseBasicBolt{


@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String[] splitdoctype = input.getStringByField("doctype").split(":");

String[] splitkey = input.getStringByField("key").split(":");

if(splitkey.length == 2 && splitdoctype.length == 2){

String doctype  = splitdoctype[1].trim();

String key  = splitkey[1].trim();

// System.err.println(key + ":" + doctype);

collector.emit(new Values(key + ":" + doctype));

}

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("subdoctype"));

}

}


DoctypeCountBolt.java 

import java.util.HashMap;

import java.util.Map;


import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;


public class DoctypeCountBolt extends BaseBasicBolt {

Map<String,Integer> docMap = new HashMap<String,Integer>();

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

String doctype = input.getStringByField("subdoctype");

Integer count = docMap.get(doctype);

if(count == null)

count = 0;

count++;

docMap.put(doctype, count);

System.out.println(docMap);

collector.emit(new Values(docMap));

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("docmap"));

}

}


(5) pom.xml 


pom.xml 설정할 때 주의 할 것은 storm과 kafka의 버전의 따른 dependency 를 꼭 확인해 보고 pom.xml에 추가해야 한다는 것이다.

현재 나는 storm 버전이 0.9.5이기 때문에 

storm-core : 0.9.5

storm-kafka : 0.9.5

앞서 설치한 kafka는  kafka_2.10_0.8.1 이기 때문에 버전에 맞게 설정했다. 버전에 맞지 않으면 엄청난 삽질을 하게 된다. 

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

 <modelVersion>4.0.0</modelVersion>

 <groupId>com.soeun.storm</groupId>

 <artifactId>ministorm</artifactId>

 <version>0.0.1-SNAPSHOT</version>

 <packaging>jar</packaging>

 <name>ministorm</name>

 <url>http://maven.apache.org</url>

 <properties>

   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

 </properties>

<dependencies>

 <dependency>

   <groupId>junit</groupId>

   <artifactId>junit</artifactId>

   <version>4.11</version>

   <scope>test</scope>

 </dependency>

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-core</artifactId>

<version>0.9.5</version>

<scope>provided</scope>

</dependency>

 <dependency>

     <groupId>org.apache.storm</groupId>

     <artifactId>storm-kafka</artifactId>

     <version>0.9.5</version>

   </dependency>

   <dependency>

<groupId>org.testng</groupId>

<artifactId>testng</artifactId>

<version>6.8</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.mockito</groupId>

<artifactId>mockito-all</artifactId>

<version>1.9.0</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.1</version>

<exclusions>

       <exclusion>

         <groupId>org.apache.zookeeper</groupId>

         <artifactId>zookeeper</artifactId>

       </exclusion>

       <exclusion>

               <groupId>com.sun.jmx</groupId>

               <artifactId>jmxri</artifactId>

           </exclusion>

           <exclusion>

               <groupId>com.sun.jdmk</groupId>

               <artifactId>jmxtools</artifactId>

           </exclusion>

           <exclusion>

               <groupId>javax.jms</groupId>

               <artifactId>jms</artifactId>

           </exclusion>

     </exclusions>

</dependency>

</dependencies>

<build>

 <plugins>

   <plugin>

     <artifactId>maven-assembly-plugin</artifactId>

     <version>2.2.1</version>

     <configuration>

       <descriptorRefs>

         <descriptorRef>jar-with-dependencies</descriptorRef>

       </descriptorRefs>

       <archive>

         <manifest>

           <mainClass />

         </manifest>

       </archive>

     </configuration>

     <executions>

       <execution>

         <id>make-assembly</id>

         <phase>package</phase>

         <goals>

           <goal>single</goal>

         </goals>

       </execution>

     </executions>

   </plugin>

 </plugins>

</build>

</project>


(6) maven install 


앞에 코드가 작성되어있는 프로젝트 디렉토리에 이동한후 maven clean install해준다.  

$ mvn clean install


target폴더에  ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar 파일이 생긴 것을 확인 할 수 있을 것이다. 


step 5. storm nimbus,supervisor,ui 실행 


스톰이 설치 되어 있는 디렉토리로 이동하여 nimbus,supervisor,ui를 실행 시킨다. 

$ bin/storm nimbus


$ bin/storm supervisor


$ bin/storm ui


ui를 실행시키면 http://localhost:8087로 접근하여 topology 상황을 웹으로 확인해 볼 수 있다. 



step 6.  ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar summit 


ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar 를 아래와 같이 실행 시킨다 


$ bin/storm jar ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar {main class path} 


ex)

bin/storm jar ministorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar soeun.storm.kafka.topology.StormKafakaSimpleTopology 


정상적으로 submit 하면 다음과 같은 메세지가 출력된다.

..............


[main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: onlytest


http://localhost:8087로 접근하면 아래와 같은 화면이 나올것이다 Topology summary에 "onlytest"라는 이름이 있으면 성공한 것이다.




step 7.  test

 

이제 위에서 작성하였던 TestProducer를 실행시켜 "onlytest"라는 topic의 메세지가 broker에 전송하여 storm worker 로그에 잘 찍히는지 확인해 보자 log는 storm디렉토리/logs/worker-.. .log에서 확인해 볼수 있다. 

$ tail -f worker-{}.log 

 또한 storm ui 에 Topology Visualization에서도 확인해 볼 수 있다. 




예제 소스는 다음 링크에 존재한다. 


-끝-


p.s storm kafka에 대한 좀더 자세한 사항을  알아보고자 한다면 다음 레퍼런스를 참고하기 바란다. 


참고 레퍼런스:

http://storm.apache.org/index.html

https://storm.apache.org/documentation/Setting-up-development-environment.html

http://bcho.tistory.com/995

https://github.com/apache/storm/blob/master/docs/documentation/Tutorial.md

https://github.com/apache/storm/tree/master/examples/storm-starter

https://storm.apache.org/documentation/Tutorial.html

Posted by 알 수 없는 사용자
,

대학교 4학년 1학기 재학 중 머리도 안 감고 모자 꾹 눌러쓴 날 갑작스럽게 인턴의 기회가 찾아 왔다.

덥석 물어 잡아 그해 여름 방학부터 인턴을 시작하게 되었다.

힘들지는 않았지만, 날 좌절에 빠지게 만든건.. 

큰 실력 차이, 점점 작아지는 기분, 성적만 좋은 멍청이가 된 느낌.

슬퍼2

고교시절 수능을 본 후 다시는 후회할 짓 하지 말자던 굳은 다짐을 하고, 매 학기 시작마다 직전 학기를 뒤돌아보며 ‘잘했다.’, ‘수고했다.’며 스스로 머리를 쓰담쓰담하며 지내온 대학 생활 3년 반이 ‘좀 더 열심히 할껄..’이란 문장 하나로 다 와르르 무너져 내렸다.

하지만! 초 긍정 마인드로 곧 극복!

슈퍼맨

홀로 속앓이 많았던 인턴 생활동안 어떤 일을 했는지 적어보고자 한다.



난 아직 대학교를 졸업도 안한 새내기이기 때문에 주니어 개발자로서 Big Data관련하여 일명 ‘Storm기반의 Real-Time Stream Processing Prototype’이라는 Storm관련 프로젝트 중 일부분을 맡았다.

그림1 Project Architecture


정확히 말하자면 ‘Storm기반의 Real-Time Stream Processing Prototype’시스템의 성능 테스트를 위한 도구인 Data ThrowerData Receiver 개발을 했다.

이 성능 테스트를 위한 도구를 개발하는 과정이 어땠는지 살펴보려 한다.

과정은 총 4단계로 1)Task 분류, 2)요구사항 작성, 3)설계, 4)구현 순으로 진행했다.



1. Task 분류


1) 일의 전,후 관계를 명확

2) 일의 할당 용이

3) 프로젝트의 총 개발 기간 예상

하기 위해 Task를 분류했다.





그림2 Task Level


분류는 쪼개고 쪼개서 한 사람이 충분히 해낼 수 있는 가장 작은 일을 기준으로 했다.

먼저 상화 차장님을 도와 Task를 분류 했다.

가장 큰 틀로 쪼개니 6개의 제목이 나왔다. 그 각각의 제목들을 한번 더 쪼개니 제목 안에 프로젝트 기간동안 해야할 항목들이 나열되었다. 그 항목들을 바탕으로 Task를 적고 항목들에 대한 목적과 목표를 적었다.


분류를 했으니 프로젝트의 총 개발 기간을 예상하기 위해 하나의 Task를 해결하는데 걸리는 시간을 적어보았다. 큰 프로젝트 개발 경험이 많이 없어서 Task를 해결하는데 걸리는 시간을 예상하기가 쉽지 않았다. 내 나름의 기준을 세우고 예상 시간을 적어야 하는데 해본 것이 많지 않으니 기준을 세우는 것부터 어려웠다.

일단 내가 먼저 적고 부족한 부분은 상화차장님께서 수정해 주셨다.



2. 요구사항 작성


프로그램을 구현할 때 고객이 원하는 바와 다르게 가지 않도록 실수를 줄이고, 고객이 원하는 프로그램을 만들기 위한 고려 사항은 무엇인지 명확히 하기 위해 요구사항을 작성했다.


그림3 Data Thrower 요구사항


요구사항이라는 말 그대로 이 프로그램이 요구하는 것을 생각하며 적었다. 지금 보면 Data Thrower의 기능이 제대로 잘 수행될 경우인 Happy Path에 대한 사항만 적은 것이 좀 아쉽다.



3. 설계


앞서 작성한 요구사항을 바탕으로 프로그램을 어떻게 만들 것인지 구상하며 설계를 했다.


 

그림4 Data Thrower 설계


위의 설계에서는

1) Data ThrowerArchitecture

2) Data Thrower의 기능

3) ,출력 Data 형태

등을 기술했다.


구현 전 위와 같은 작업(Task분류, 요구사항 작성, 설계)을 거치며 처음보다 ‘내가 만들어야할 것이 무엇인가’에 대해 명확하게 머릿속에 정리가 되었다.



4. 구현


다음으로 전 작업에서 만든 요구사항 및 설계를 바탕으로 구현작업에 들어갔다.

먼저 Data Thrower를 개발했다.

그림5 Project Architecture_Data Thrower


그림6 Data Thrower 구조


Data Thrower

1) Data Thrower는 사용자 설정에 따라 일정한 양의 Log Message를 생성

2) 생성된 Log MessageInput Queue에 전송

하는 두가지 기능을 가지고 있다.


1) 앞서 요구사항과 설계를 했다 하여도 막상 이 두 가지의 기능을 구현하자니 많은 기능이 있는 것도 아닌데 막막했다. ‘사용자 설정에 따라’라는데 이건 어떤 식으로 구현해야 하는건지, 일정한 양의 Message는 어떻게 조절하도록 해야할지 고민스러웠다. 그때 주현 팀장님께서 ‘javaProperties라는 Class가 있다’라고 알려주셨다.

Properties Class를 이용하여 Configuration을 만드는 과정은 의외로 간단했다.

(참고 자료 : http://www.okjsp.net/bbs?seq=38761)

잘 알아두면 앞으로도 유용하게 쓰일 것 같다.


2) 사용자 설정에 맞게 Log Message를 생성했으니, Input Queue에 넣어볼 차례이다.

QueueMessage를 넣는 방법은 RabbitMQ 홈페이지 Tutorials1. Hello World!(Java)에 잘 설명되어 있어 참고해서 구현했다.

(http://www.rabbitmq.com/tutorials/tutorial-one-java.html)



그림7 Queue로 데이터 전송하는 예제



막상 구현을 끝내고나니 처음 겁먹었던 것보다 크게 어렵지 않았다고 느꼈다.

이렇게 Data Thrower를 마무리 짓고 다음으로 Data Receiver를 개발했다.



그림8 Project Architecture_Data Receiver


그림9 Data Receiver 구조


Data Receiver

1) Output Queue에 있는 Log Message를 가져옴

2) 가져온 Log Message를 출력

하는 두가지 기능을 가지고 있다.

Data Thrower를 개발한 후라서 Receiver는 어려움 없이 개발 할 수 있었다.

1) Queue에 있는 Message를 가져와서 출력하는 부분도 Data Thrower와 마찬가지로 RabbitMQ 홈페이지 Tutorials1. Hello World!(Java)에 잘 설명되어있다.


(http://www.rabbitmq.com/tutorials/tutorial-one-java.html)



그림10 Queue에서 데이터 받아오는 예제


2) 처음에는 Queue에서 가져온 Message를 파일로 저장했지만 메모리 및 반응 속도가 느려지는 문제 등으로 인해 에코 출력으로 변경하였다.


그림11 Data Receiver Message 출력


-참고 자료-


1. RabbitMQ Homepage

http://www.rabbitmq.com/


2. RabbitMQ Tutorials

http://www.rabbitmq.com/getstarted.html


3. RabbitMQ “Hello World!”

http://www.rabbitmq.com/getstarted.html


4. Properties Class

http://www.rabbitmq.com/getstarted.html

Posted by 알 수 없는 사용자
,