누구나 쉽게 구축할 수 있는 하둡 기반 분산시스템
아파치 하둡을 활용한 로그 저장 및 처리
오픈소스 진영이 주도하고 있는 하둡은 올해 들어 여러 벤더의 제품에 편입되고 있다. 클라우데라의 배포판을 그대로 이용하거나 일부 코드를 수정하기도 한다. 몇몇 벤더를 제외하면 하둡의 코드를 수정하기보단 기본 인프라스트럭처를 그대로 이용하기도 하며, 일부는 하드웨어와 결합해 하둡을 판매하기도 한다. 이렇듯 하둡은 오픈소스 진영뿐 아니라 상용 벤더에게도 굉장히 중요한 솔루션으로서 각광받고 있다.
이 글에서는 아파치 하둡의 HDFS와 맵리듀스에 대해 좀더 심도 있게 살펴보고, 아파치 플룸을 이용해 수집된 데이터를 HDFS에 직접 저장하고 하이브에서 가공할 수 있는 시퀀스 파일로 변환하는 맵리듀스 프로그램을 개발한다. 여기서 시퀀스 파일은 간단하게 키 밸류 구조의 바이너리 파일로, 로그 데이터처럼 대용량 파일의 경우 일반 텍스트 형태보다 더 효율적으로 공간을 절약할 수 있는 이점이 있다.
아파치 하둡 설치
하둡을 설치할 때에는 몇 가지 제약사항에 유의해야 한다. 하둡은 윈도우 OS를 지원하지 않아 리눅스나 유닉스 기반의 OS에만 설치할 수 있다. 또한 분산 환경에서 하둡을 설치하기 위해서는 반드시 SSH가 필요하다. 이는 하나의 마스터 노드가 전체 노드를 시작하거나 중지, 재가동 등을 모두 담당하기 위해서다. 여기에는 관리상의 이유도 있다. SSH는 한 대의 노드에서 하둡이 설치된 모든 서버의 하둡 어플리케이션을 시작하거나 재기동 또는 종료 시키기 위해서 사용된다.
설치에 앞서 리눅스 환경의 3대 이상의 컴퓨터(노드)를 준비하자. 가능하다면 각 노드는 동일한 계정으로 세팅하는 것이 좋으며, 참고로 이 글에서는 mhb8436이란 계정을 설정했다. SSH 환경 설정에 앞서 각 노드에 SSH 서버를 먼저 설치해야 하며, 작업의 편의성을 위해 hosts 파일에 각각 노드를 등록해 노드 이름을 관리하자. vi 에디터로 /etc/host 파일을 열고 다음처럼 각자 환경에 맞게 4대의 서버를 추가한다.
# Hadoop Hosts
192.168.0.10 knode01
192.168.0.11 knode02
192.168.0.12 knode03
192.168.0.13 knode04
아이피와 노드 이름을 환경에 맞게 수정했다면 이제 커맨드 라인에 ‘$ssh-keygen -t rsa’ 명령어를 입력해 키를 생성한다. 이제 ‘$ssh-copy-id -i ~/.ssh/id_rsa.pub mhb8436@knode01’ 명령어를 통해 생성한 공개키를 각 서버에 배포해야 한다. 주의할 점은 각 환경에 맞춰 서버 노드 이름을 변경해야 한다.
하둡은 크게 네임노드와 데이터노드로 구성된다. 네임노드는 분산 환경에서 파일 구조 정보를 가지고 있으며, 한 대의 머신에 설치돼 여러 대의 데이터노드를 관리한다. 실제 데이터를 가지고 있는 것은 데이터노드다. 각 노드의 역할에 따라 서버를 분리하는 데, 필자의 경우 네임노드를 knode01, 나머지 노드에는 데이터노드를 할당했다. 두 노드 모두 데이터 폴더가 필요하므로 <리스트 1>처럼 데이터 폴더를 생성하자.
<리스트 1> 네임노드와 데이터노드를 위한 데이터 폴더 생성
#네임노드
$mkdir -p /data/name
$chown mhb8436.mhb8436 /data
#데이터노드
$mkdir -p /data01
$mkdir -p /data02
$chown mhb8436.mhb8436 /data01
$chown mhb8436.mhb8436 /data02
데이터를 저장할 네임노드의 데이터 폴더는 /data, 데이터노드의 경우 /data01, /data02로 설정하고 소유권을 공통된 계정(이 글의 경우 mhb8436)으로 변경한다. 향후 하둡에서 관리하는 모든 데이터 파일은 이 폴더에 저장된다. 하둡은 클라우데라 배포판 중 CDH3의 Tarball을 설치한다. 자동설치 버전도 있지만 직접 일일이 설정하는 것이 하둡을 더 빠르게 이해하는 데 도움이 된다. https://ccp.cloudera.com/display/SUPPORT/Downloads에서 CDH3를 클릭하고 Tarball 패키지 중 Hadoop0-0.20.2를 다운로드한다. 이후 다음의 폴더 구조로 설치하자.
/Users/mhb8436/gas/flume -> flume 설치 폴더
/Users/mhb8436/gas/hadoop -> hadoop 설치 폴더
/Users/mhb8436/gas/hive -> hive 설치 폴더
여기서 hadoop 폴더 하단에 다운로드한 파일의 압축을 풀고 하둡 홈 폴더로 설정한다(/Users/mhb8436/gas/hadoop/). 이제 네임노드와 데이터노드 각각에 열할을 부여할 차례다. 환경설정을 위해서는 하둡이 설치된 폴더 하위에 있는 conf 폴더의 hadoop-env.sh 파일을 열고 JAVA_HOME 설정 구문(export JAVA_HOME=/usr/lib/j2sdk1.6-sun)을 추가한다(jdk 폴더를 각자 환경에 맞게 수정해야 한다)
앞서 생성한 하둡 홈 폴더를 HADOOP_HOME으로 지칭하기 위해서는 사용자 계정 홈 폴더에 있는 .profile이나 .bashrc 파일을 vi로 열어 <리스트 2>처럼 구문을 추가한다.
<리스트 2> HADOOP_HOME 설정
$vi .profile or vi .bashrc
export HADOOP_HOME=/Users/mhb8436/gas/hadoop/
$source .profile or source .bashrc
source .profile 또는 source .bashrc를 실행해 변경된 프로파일을 저장한다. conf/core-site.xml 파일은 vi로 <리스트 3>처럼, conf/hdfs-site.xml 파일의 경우 <리스트 4>를 참고해 수정한다.
<리스트 3> core-site.xml 설정
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://knode01:9000</value> <!-- 네임 노드 정보 -->
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop-$user.name</value> <!-- 하둡이 사용할 임시 폴더-->
</property>
</configuration>
<리스트 4> hdfs-site.xml 설정
<configuration>
<property>
<name>fs.default.name</name> <!-- 네임노드주소 정보-->
<value>hdfs://knode01:9000</value>
</property>
<property>
<name>dfs.name.dir</name> <!-- 네임노드 폴더 -->
<value>/data/name</value>
</property>
<property>
<name>dfs.data.dir</name><!-- 데이터노드 폴더 -->
<value>/data01/hdfs,/data02/hdfs</value>
</property>
<property>
<name>dfs.replication</name><!-- 데이터 복제 계수 기본 3개 -->
<value>3</value>
</property>
</configuration>
네임노드와 데이터노드를 어떻게 사용할 것인가는 하둡의 설정 파일에 입력한다. $HADOOP_HOME/conf/masters 파일과 slaves 파일을 각각 vi로 <리스트 5>처럼 수정하자.
<리스트 5> masters와 slave 파일 설정
vi masters # $HADOOP_HOME/conf/maters 파일을 vi로 열고
knode01 # masters 파일에 knode1을 설정한다.
vi slaves # $HADOOP_HOME/conf/slaves 파일을 vi로 열고
knode02 # slaves 파일에 knode02, knode03, knode04을 설정한다.
knode03
knode04
하둡 다운로드, 하둡 홈 폴더 설정, core-site.xml과 hdfs-site.xml 수정, HADOOP_HOME 설정 등의 앞선 작업은 knode01, knode02, knode03, knode04에 동일하게 수행해야 한다. 이제 다음의 명령어를 실행해 데이터노드에서 하둡이 사용할 폴더를 생성하자.
$HADOOP_HOME/bin/slaves.sh mkdir -p /data01/hdfs
$HADOOP_HOME/bin/slaves.sh mkdir -p /data02/hdfs
이제 하둡 파일 시스템을 포맷하자. 네임노드로 이동하고 ‘$HADOOP_HOME/bin/hadoop namenode -format’ 명령어로 네임노드를 포맷한다. 포맷 후 ‘$HADOOP_HOME/bin/start-dfs.sh’ 명령어로 전체 노드에 하둡을 실행한다.
다소 복잡할 수 있지만 하둡 설치가 모두 끝났다. 모든 과정이 정상적으로 진행됐다면 네임노드 서버의 주소(http://knode1:50070/)로 접속해 각 노드에 대한 상태를 확인할 수 있다. 만약 SSH 관련 에러 메시지가 발생한면 구글에서 해당 메시지를 검색해 해결해보자.
아파치 하둡의 HDFS에 대해
지금까지 클라우데라 배포판 CHD3의 tarball 패키지를 이용해 아파치 하둡을 분산 환경에 설치했다. 이제부터 아파치 하둡의 HDFS에 대해 본격적으로 살펴보자. 기본적으로 HDFS의 아키텍처는 마스터와 슬레이브로 구성됐다. 이는 한 개의 마스터가 여러 개의 슬레이브를 관리할 수 있게 고안된 구조로, HDFS에서 마스터 역할을 하는 네임노드는 파일시스템의 네임스페이스와 비슷한 역할을 한다. 네임노드는 파일과 폴더를 열닫고 이름을 변경하는 역할을 수행하며, 물리적으로 클라이언트에서 파일에 데이터를 읽고 쓰는 역할은 데이터노드가 담당한다. 데이터 노드는 네임노드의 지시에 따라 블록 생성 및 삭제하고 복제한다.
HDFS는 설계부터 대용량 파일의 저장과 읽기에 적합하게 설계됐다. 이런 이유로 저용량의 파일 여러 개를 HDFS에 저장하는 것을 전체적인 효율을 크게 떨어뜨리며, 이는 하둡의 네임노드에는 주로 파일명과 파일의 위치 정보가 보관되고 이 정보들은 메모리에 저장되기 때문이다. 64~128MB 이상의 대용량 파일을 HDFS에 저장하면 이 파일은 HDFS에서 지정한 청크(Chunk) 사이즈로 분할돼 저장된다. 각각의 청크는 별도의 데이터노드에 저장된다.
<그림 2> HDFS 구조
네임노드와 데이터노드는 리눅스 OS 기반의 일반 서버에서 동작한다. 또한 HDFS는 자바로 개발돼 자바 VM을 지원하는 머신에서 사용할 수 있고, 네임노드와 데이터노드는 모두 동일한 소프트웨어로 구동된다. <그림 2>처럼 하나의 랙에 여러 대의 서버가 설치돼 있는 경우 각각의 서버에 블록들이 저장된다. 만약 하둡 클라이언트가 hadoop fs -cat /root/text1.txt와 같은 명령어를 실행해 네임노드에 /root/text1.txt 파일의 존재 여부에 대한 확인을 요청했다고 가정하자. 해당하는 메타 정보가 존재하면 해당 파일의 블록이 존재하는 데이터노드의 위치가 클라이언트에게 반환되며, 클라이언트는 다시 파일이 존재하는 데이터노드에 접속해 파일을 읽고 그 내용을 출력한다. 그렇다면 파일을 쓰는 경우에는 어떻게 될까?
hadoop fs -copyFromLocal /Users/mhb8436/test02.txt /root/test02.txt
위 명령어는 로컬 노드에 있는 파일을 HDFS에 저장한다. 하둡 클라이언트는 네임노드에게 어느 노드에 파일을 기록할 것인지를 묻고 네임노드가 어느 데이터노드에 파일을 쓸지를 결정한 후 결과를 클라이언트에게 반환한다. 클라이언트는 전달 받은 정보를 바탕으로 해당 데이터노드에 직접 접속해 파일을 쓴다.
지금까지 HDFS 상에서 파일의 I/O가 내부적으로 어떻게 동작하는지 살펴봤다. HDFS 상에 파일을 쓰면 앞서 설정한 복제본 개수에 따라서 데이터가 복제된다. HDFS는 대규모로 구성된 여러 머신에 대용량의 파일을 분산 저장하기에 적합하도록 설계됐다. 파일은 블록으로 나눠 저장되며 파일의 모든 블록은 모두 크기가 동일하다(단 마지막 블록은 크기가 다를 수 있다). 블록 크기와 복제될 개수는 앞서 살펴본 설정 파일에서 설정할 수 있으니 참고하자.
네임노드는 모든 블록의 본제본을 관리하며, 주기적으로 데이터노드로부터 허트비트(Heartbeat, 대상을 주기적으로 호출해 생존여부를 체크)를 받으며, 이때 블록에 대한 목록과 상태 정보를 함께 받는다. 여기서 복제본을 어디에 보관하는가는 신뢰성과 성능에 영향을 끼친다. HDFS에서 복제본 배치 전략은 다른 분산 파일 시스템과는 다른 중요한 역할을 한다.
HDFS에서는 cat이나 copyFormLocal과 같은 CLI(Command Line Interface) 외에도 여러 CLI를 제공한다. HDFS가 지원하는 대표적인 CLI을 정리한 결과는 <표 1>과 같다.
<표 1> HDFS에서 제공하는 CLI 목록
명령어 |
설명 |
hadoop fs -cat URI [URI …] |
코드 파일이 표준 모드로 출력 |
hadoop fs -chgrp [-R] GROUP URI [URI …] |
파일의 소유그룹 변경 |
hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI …] |
파일의 퍼미션 변경 |
hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ] |
파일의 소유권 변경 |
hadoop fs -copyFromLocal <localsrc> URI |
로컬 파일을 하둡에 복사 |
hadoop fs -copyToLocal [-ignorecrc] [-crc] URI <localdst> |
하둡 파일을 로컬에 복사 |
hadoop fs -cp URI [URI …] <dest> |
소스에서 타겟으로 파일 복사 |
hadoop fs -ls <args> |
파일목록 출력 |
hadoop fs -lsr <args> |
파일과 하위 폴더의 목록 출력 |
hadoop fs -mkdir <paths> |
폴더 생성 |
hadoop fs -mv URI [URI …] <dest> |
소스에서 타겟으로 파일 이동 |
hadoop fs -put <localsrc> ... <dst> |
로컬 파일을 하둡에 복사 |
hadoop fs -rm URI [URI …] |
URI의 파일 삭제 |
hadoop fs -rmr URI [URI …] |
URI의 파일과 하위폴더를 모두 삭제 |
hadoop fs -tail [-f] URI |
파일의 마지막 KB를 표준 모드로 출력 |
hadoop fs -text <src> |
소스 파일을 텍스트로 출력 |
지금까지 HDFS에 대해 살펴봤다. HDFS는 리눅스 파일 구조나 도스에 익숙한 사용자라면 별다른 지식 없이도 쉽게 사용할 수 있을 만큼 쉽다. 그러나 실제 운영 환경에서 성능과 신뢰성을 높이기 위해서는 많은 경험과 튜닝이 필요하다.
하둡의 병렬처리 프레임워크, 맵리듀스
HDFS는 파일의 저장과 관리에 중점을 뒀다면 앞으로 살펴볼 맵리듀스는 HDFS에 저장된 파일에서 데이터를 추출해 가공, 분석, 병합해 의미 있는 결과를 도출하기 위한 일종의 프로그램이다. 자바 언어를 지원하는 맵리듀스 프레임워크는 분산 환경에서 병렬 처리를 쉽게 할 수 있게 돕는다. 개발자는 맵리듀스에서 제공하는 몇 개의 클래스를 상속받아 규칙에 따라 코딩하면 HDFS에 저장된 데이터를 병렬 처리할 수 있다.
이름에서 알 수 있듯 맵리듀스는 맵과 리듀스로 구성된 프로그램이다. 맵은 한 개의 데이터노드에서 실행돼 어떤 결과물을 생성하며, 리듀스는 여러 개의 노드에서 맵들이 수행한 결과를 최종적으로 처리한다. 물론, 리듀스도 지정된 한 개의 노드에서 실행됨을 참고하자.
예컨대 사원 정보가 앞서 설치한 knode02, knod03에 분산돼 있다고 가정해보자. knode02에는 ‘ㄱ’부터 ‘ㅁ’성까지, knode03에는 ‘ㅇ’부터 ‘ㅎ’성까지 보관돼 있다. 여기서 우리는 각 성별로 해당 성을 가진 사람의 숫자를 세는 맵리듀스 프로그램을 만든다.
이 경우 맵리듀스 프로그램은 knode02와 knode03에서 각각 실행되며, 그 역할은 각 성별로 동일한 성을 지닌 사람을 센다. <표 2>는 가상의 knode02와 knode03에서 실행된 맵 프로그램의 실행 결과다.
<표 2> knode02와 knode03에서 수행되는 맵 프로그램 결과(가정)
knode02 |
knode03 |
ㄱ 100 ㄴ 81 ㄷ 20 … ㅁ 29 |
ㅇ 120 ㅈ 20 ㅊ 1 … ㅎ 23 |
knode02와 knode03 각각에 데이터를 기준으로 동일한 성을 가진 사람의 수를 센다. 즉 맵 함수는 네트워크를 부하를 최소화하고 데이터 처리 성능을 높이기 위해 관련 데이터가 저장된 데이터노드에 수행된다. 그렇다면 리듀스 함수는 어떤 역할을 할까? 리듀스는 함수는 2개의 노드에서 실행한 결과를 정렬해 보여주며 그 결과는 <리스트 6>과 같다.
<리스트 6> 리듀스 함수의 출력물
ㄱ 100
ㄴ 81
ㄷ 20
…
ㅇ 120
ㅈ 20
ㅊ 1
…
ㅎ 23
독자의 이해를 돕기 위한 예 덕분에 맵과 리듀스에 대한 개념을 쉽게 이해할 수 있었을 것이다. 실제 맵리듀스 프로그램은 다양하게 조합할 수 있어 더 복잡하다. <리스트 7>를 통해 맵리듀스를 보다 자세히 살펴보자.
<리스트 7> 아주 간단한 맵 리듀스 프로그램(워드 카운트 v1.0)
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens())
word.set(tokenizer.nextToken());
output.collect(word, one);
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
int sum = 0;
while (values.hasNext())
sum += values.next().get();
output.collect(key, new IntWritable(sum));
public static void main(String[] args) throws Exception
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
워드의 숫자를 세는 <리스트 7>은 비교적 간단한 맵리듀스 프로그램이다. 이를 실행하기 위해서는 컴파일하고 그 결과를 jar 파일로 변환해야 한다. 이 파일을 컴파일하기 위해서는 hadoop-HADOOP_VERSION-core.jar 파일이 필요하며, 이클립스에서 개발할 경우 해당 파일을 빌드 패스에 설정하고 컴파일한 후 export 기능을 활용해 jar 파일을 생성하면 된다. 이 jar 파일을 실행하기에 앞서 데이터 파일을 준비하자. 하둡이 설치된 네임노드로 이동해 <리스트 8>와 같이 입출력 폴더와 입력 파일을 생성하자.
<리스트 8> jar 파일을 실행하기 위한 입출력 폴더와 입력 파일
# 입출력 폴더
/usr/mhb8436/wordcount/input - HDFS의 입력 폴더(mhb8436은 각자의 사용자 계정에 맞게 변경)
/usr/mhb8436/wordcount/output - HDFS의 출력 폴더(mhb8436은 각자의 사용자 계정에 맞게 변경)
# 입력 파일 생성 및 저장
$vi file01
Hello World Bye World
:wq
$vi file02
Hello Hadoop Goodbye Hadoop
:wq
# 생성된 입력파일을 하둡에 저장
$bin/hadoop fs -copyFromLocal file01 /usr/mhb8436/wordcount/input/file01
$bin/hadoop fs -copyFromLocal file02 /usr/mhb8436/wordcount/input/file02
# 업로드 이상 유무 확인
$bin/hadoop fs -ls /usr/mhb8436/wordcount/input
file01
file02
입력 파일까지 모두 준비됐다면 <리스트 9> 명령어를 통해 맵리듀스 프로그램을 실제로 실행해보자. 실행 결과는 앞서 지정한 output 폴더에 텍스트 형태의 part-*란 이름의 파일로 저장된다(<리스트 10> 참조).
<리스트 9> wordcount 맵 리듀스 프로그램 실행
$bin/hadoop jar /usr/mhb8436/wordcount.jar org.myorg.WordCount /usr/mhb8436/wordcount/input /usr/mhb8436/wordcount/output
<리스트 10> wordcount 예제의 결과
$bin/hadoop fs -cat /usr/mhb8436/wordcount/output/part-0000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
<리스트 7>에는 세 가지 이너 클래스가 존재한다. 앞서 살펴본 맵 역할을 수행하는 Map 클래스는 mapper 인터페이스를 구현한다. 이 클래스는 반드시 map 메소드가 필요하며, 인수로는 LongWritable의 key, 텍스트 타입의 value, OutputCollector 타입의 Output과 reporter Reporter 변수가 있다. key와 value에는 맵을 실행할 때 받게 되는 인수를 대입한다. 예컨대 <리스트 8>를 실행할 경우 value에는 ‘Hello World Bye World’이 입력되며, file02도 인수로 주어졌으므로 ‘Hello Hadoop Goodbye Hadoop’도 함께 받게 된다. 앞으론 이해가 쉽도록 file01만 처리한다고 가정하겠다.
만약 입력 값이 여러 줄일 경우 map 메소드는 반복 실행된다. map 메소드를 자세히 살펴보면 StringTokenizer로 스페이스(빈칸)를 델리미터로 구분해 단어를 분리한 후 output 변수에 단어와 IntWritable 타입의 one이란 변수를 함께 대입한다. 여기서 LongWritable과 IntWritable은 하둡에서의 Long 객체와 Int 객체로 생각할 수 있다. 결과적으로 맵 클래스에서는 ‘Hello World Bye World’를 스페이스 단위로 잘라 저장하며 그 중간 값은 <리스트 11>과 같다.
<리스트 11> map 메소드 실행 결과(file01과 file02)
#file01 Hello 1 World 1 Bye 1 World 1 |
#file02 Hello 1 Hadoop 1 Goodbye 1 Hadoop 1 |
맵 함수의 중간 결과는 맵리듀스 프레임워크를 통해 Reduce 클레스에 전달된다. Reduce 클래스는 Reducer의 구현체로 사전에 reduce 메소드가 반드시 선언돼 있어야 한다. map 메소드에서 준 output 결과는 reduce 메소드의 key와 values 값으로 전달된다. reduce 메소드의 경우 전달된 key로 반복되는 횟수를 합쳐 이를 output 출력 변수에 전달하며, 그 최종 결과는 <리스트 12>와 같다.
<리스트 12> 리듀스 까지 실행된 최종 결과
#file01 Bye 1 Hello 1 World 2 |
#file02 Goodbye 1 Hadoop 2 Hello 1 |
마지막으로 main 함수를 살펴보자. 객체를 생성하는 JobConf는 맵리듀스의 잡을 설정하기 위한 객체다. JobConf에서는 Mapper의 인수에 대한 정보, Reduce의 정보, 리듀스의 인수에 대한 정보와 입출력 파일의 경로를 설정할 수 있다. 또한 최종적으로 JobClient 객체의 runJob을 수행해 맵리듀스를 실행할 수 있다.
다소 복잡하지만 실제 맵리듀스 프로그램이 어떻게 수행되는지에 대한 동작 방식과 구조를 블록다이어그램을 통해 살펴보자(<그림 3> 참조).
<그림 3> 맵리듀스 동작 구조
앞서 생성한 jar 파일을 하둡 jar을 통해 실행하면 잡트래커(JobTracker)는 job.xml을 참고해 태스크트래커(TaskTracker)에게 잡의 맵과 리듀스 태스크를 분리해 각각 할당한다. 태스크트래커는 잡트래커와 주기적으로 허트비트로 통신하면서 자신의 작업 상태를 보고한다. 잡트래커는 통상 네임노드에, 태스크트래커의 경우 각 데이터노드에 함께 있다고 볼 수 있다.
실제로는 Map 클래스를 실행하는 맵태스크, Reduce 클래스를 수행하는 리튜스태스크가 각각 존재한다. 태스크트래커는 각 태스크의 작업 상태를 관리하며, 태스크 수행 중에 이상이 발생하면 다시 처리하거나 잡트래커에 보고해 다른 노드에서 수행할 수 있도록 처리한다.
지금까지 간단한 맵리듀스 프로그램을 작성하고 동작 원리를 살펴봤다. 지금부터는 하둡 기반의 분석 시스템을 구축하기 위한 맵리듀스 프로그램을 개발해보자.
맵리듀스 예제 프로그램 작성
맵리듀스 프로그램을 개발하기 위해서는 앞서 연재된 플룸 예제에서 완성하지 못한 작업을 이어서 완료해야 한다. 이전 연재에서 마지막에 아파치 플룸의 커넥터에서 싱크의 주소를 일반 파일 시스템으로 설정했으며, 이를 하둡의 주소로 변경해야 한다. 플룸 쉘을 열고 <리스트 13>처럼 콜렉터의 싱크 부분을 수정하자.
<리스트 13> 플룸 콜렉터 싱크 수정
exec config collector 'collectorSource(35853)' 'collectorSink("hdfs://knode01:54310/user/mhb8436/flume/","tbl_smart_meter")'
collectorSink 앞에 인수에는 HDFS를 지정하면 되며, 이 때 각자의 환경에 맞게 인수 경로를 설정한다. collectorSink의 마지막 인수는 저장될 파일의 prefix이다. 지정된 prefix 이름이 파일명 앞에 붙게 된다. 예를 들어 리스트 13처럼 설정했을 경우에는 저장될 파일 명은 /user/mhb8436/flume/tbl_smart_device20120222-095349208+0900.90558084780533.00000038와 같은 형태로 저장이 된다. tbl_smart_device뒤에 숫자는 날짜와 플룸에서 생성한 임의의 코드들이다. 위의 설정을 마치고 플룸 파일을 변경하면 데이터가 HDFS에 저장되는 것을 확인할 수 있다. HDFS에 저장된 파일을 ‘hadoop fs –cat 생성된파일명’으로 확인하면 <리스트 14>와 비슷한 포맷의 데이터를 확인할 수 있다.
<리스트 14> 플룸을 통해 HDFS에 저장된 로그 파일
"body":"Y,null,20110512,7999999999,7999999991,2000,null,110146",
"timestamp":1329872026102,
"pri":"INFO",
"nanos":4907458791788,
"host":"bj",
"fields": "AckTag":"20120222-095346070+0900.4907427134760.00000023",
"AckType":"msg",
"AckChecksum":"83><96>",
"rolltag":"20120222-095349208+0900.90558084780533.00000038"
리스트 14의 json 데이터 중에서 로그 파일의 실제 데이터는 body의 값 부분인 “Y,null,20110512,7999999999,7999999991,2000,null,110146” 이다. 나머지는 플룸에서 관리를 위해 덧붙여진 항목이다. timestamp, fields, host와 같은 플룸에서 관리하는 항목은 우리가 구지 분석할 필요가 없기 때문에 body의 값 부분만 별도로 추려서 하이브에서 읽을 수 있도록 할 것이다. 우리는 하이브에서 읽기 쉽도록 시퀀스 파일 포맷으로 body의 값 부분에 있는 로그데이터를 시퀀스파일로 만드는 맵리듀스 프로그램을 만들어 보자(<리스트 15> 참조).
<리스트 15> 플룸 저장 파일을 시퀀스 파일로 만드는 맵리듀스
public class RollingSmartDevice extends Configured implements Tool
public class Fields
String AckTag;
String AckType;
String AckChecksum;
String rolltag;
@Override
public String toString()
return "Fields [AckTag=" + AckTag + ", AckType=" + AckType
+ ", AckChecksum=" + AckChecksum + ", rolltag=" + rolltag
+ "]";
public class Event
String body;
String timestamp;
String pri;
String nanos;
String host;
Fields fields;
@Override
public String toString()
return "SmartDeviceEvt [body=" + body + ", timestamp=" + timestamp
+ ", pri=" + pri + ", nanos=" + nanos + ", host=" + host
+ ", fields=" + fields + "]";
public static class RollMapper extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, Text>
@Override
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException
String line = value.toString();
Gson gson = new Gson();
Event evt = gson.fromJson(line, Event.class);
int maxValue = Integer.MIN_VALUE;
output.collect(new IntWritable(maxValue),new Text(evt.body));
@Override
public int run(String args[]) throws IOException
JobConf conf = new JobConf(getConf(),getClass());
FileInputFormat.addInputPath(conf, new Path(args[1]));
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
if(conf == null)
return -1;
conf.setMapperClass(RollMapper.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setNumReduceTasks(0);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
JobClient.runJob(conf);
return 0;
public static void main(String args[]) throws Exception
int exitCode = ToolRunner.run(new RollingSmartDevice(), args);
System.exit(exitCode);
<리스트 15>는 코드 길이가 길지만 주의 깊게 살펴볼 부분은 많지 않다. 이 예제는 단순히 파일 내용을 추출해 시퀀스 파일을 생성하기 때문에 맵 클래스만이 사용됐다. Map 클래스의 map 함수를 살펴보면, json 형식을 다루기 위해 Gson 라이브러리가 이용됐다. map 함수의 value 값으로 받은 스트링을 Gson 객체로 변환하고 이를 Event 객체로 변환한다. Event 객체는 <리스트 14>의 구조에 매핑된 객체이다. body, timestamp, pri, host와 같은 변수들은 <리스트 14>의 json 스트링의 각 항목과 대응됨을 쉽게 알 수 있을 것이다. <리스트 14>의 json 스트링을 이 구조체로 변환하여 관리가 쉽도록 별도의 Event 객체를 만들었다. 맵 함수에서는 출력의 키는 정수형 객체로 설정했고 value는 이벤트 객체의 body 부분이 출력으로 설정됐다.
main 함수의 JobConf 설정에서 출력 포맷은 SequenceFileOutputFormat을 이용해 body의 내용을 SequenceFile로 출력한다. 이 파일을 정상적으로 컴파일하기 위해서는 google-gson와 hadoop-core 라이브러리가 필요하다. 이클립스에서 빌드 패스를 설정하거나 커맨드 라인에서 명령어를 입력해 컴파일할 수 있다. 컴파일 후에는 jar 파일로 압축한 뒤 맵리듀스 프로그램을 실행해보자(<리스트 16> 참조).
<리스트 16> 시퀀스 파일을 만들기 위한 맵리듀스 예제 실행
$hadoop jar rollingsmartdevicejob.jar RollingSmartDevice hdfs://knode0:54310/user/ktds/flume/tbl_smart_device* hdfs://knode0:54310/user/ktds/flume/tbl_smart_device-seq
여기서 jar 파일명과 클래스명 그리고 입출력 파일의 위치 등은 각자 환경에 맞게 변경한 후 실행해야 한다. 설정된 출력 폴더를 hadoop fs -lsr을 이용해 조회하면 출력 결과를 확인할 수 있다. 드디어 하이브에서 시퀀스 파일을 읽기 위한 모든 준비가 끝났다.
실제 하둡을 이용해 분석 시스템을 구축하기 위해서는 지금까지 배운 내용을 바탕으로 안정성과 작업 효율성, 신뢰성을 높이기 위한 튜닝과 많은 테스트가 필요하다. 추가적으로 깊이 공부하길 원한다면 클라우데라나 아파치 하둡 웹사이트 등을 참고하자. 다음 글에서는 하이브에 대한 기본 개념과 UDF 작성법을 살펴보고, 앞서 저장한 시퀀스 파일을 아파치 하이브 상에 임포팅해 하둡 기반의 분산시스템을 완성해본다.
'클라우드 스크랩!' 카테고리의 다른 글
유용한 자바스크립트 들! (0) | 2013.03.12 |
---|---|
하둡 설정 파일 동기화 ! (0) | 2013.03.08 |
클라우데라 한번에 설치 (0) | 2013.03.08 |
클라우데라! (0) | 2013.03.08 |
빅데이터 분석에 관련 스크랩. (0) | 2013.03.02 |