데이터 분석을 위해 파일을 저장해야 할 필요가 있었다. 처음에는 csv파일 형식으로 저장을 했는데, 시간이 지남에 따라서 새로운 컬럼이 생기는 요구사항이 생겼다. 이런 경우 csv는 어떤 정보가 몇번째 컬럼에 있는지를 기술하지 않기 때문에 또 다른 파일에 컬럼 정보를 기록하고 데이터 타입등도 기술을 해줘야 하는 불편함이 생긴다. 언뜻 parquet이 그런 일을 하는것이라 어렴풋이 알고 있었기 때문에 이번에 parquet을 적용해 볼겸 조사를 해봤다.
특징들..
압축 지원. 50% 정도 세이브 할 수 있다고 함. 스토리지 비용이 반이라는 얘기.
여러가지 serialize framework 지원 (Avro, Thrift, protocol buffer)
column based
columnar storage다. 이렇게 접근한 이유는 크게 2가지.
보통 쿼리를 할때 모든 컬럼의 정보가 다 필요한 경우는 많지 않다. row based 라면 전체 열을 다 읽어야 쿼리를 수행할 수 있지만, parquet는 필요한 컬럼만 로드하면 된다. 여기서 속도 향샹이 생긴다.
그리고 컬럼끼리 모아서 압축을 하면 압축률이 더 좋아진다. timestamp를 가지는 컬럼이라고 생각하면 델터 인코딩 방식으로 압축을 하면 좋을 것이고, 각 컬럼의 특징이 살아 있으니 더 유리하다.
2G의 원본 csv를 SNAPPY압축을 이용해서 저장하니 1G로 50%정도나 줄어들었다.
그리고 query performance도 일반 텍스트에 비해서 2배 정도 성능이 좋다고 한다.
단점
마냥 좋기만 한것은 아니다. 분석용으로 최고의 파일 형식이라고 한 것은 데이터의 업데이트가 없다는 뜻이다. 즉, readonly일때 좋은 것이다.
자세한 사항은 avro spec을 보면 되고, 아래처럼 정의 하면 된다. 이 파일을 avro-tools.jar를 이용하면 POJO class를 만들 수 있고 이 파일을 이용하면 프로그래밍이 조금더 이뻐질 수 있다. 아래 read/save 예제에서 User class를 사용하는데 이것이 스키마를 바탕으로 생성된 클래스이다. 필수는 아니고 POJO class가 없을때는 GenericRecord를 사용할 수 도 있다.
// Write a record with GenericRecord GenericRecord r = new GenericData.Record(schema); r.put("uid", "darren"); r.put("age", 22); r.put("weight", 70.0); writer.write(r); writer.close();
/* // write a record using generated POJO class called User ParquetWriter<User> writer = AvroParquetWriter.<User>builder(path) .withCompressionCodec(CompressionCodecName.SNAPPY) .withSchema(schema) .build(); User p = new Profile(); p.setId("darren"); p.setAge(22); p.setWeight(70.0); writer.write(p); writer.close(); */ Configuration conf = new Configuration(); AvroReadSupport.setAvroReadSchema(conf, Profile.SCHEMA$); ParquetReader<Profile> reader = AvroParquetReader.<Profile>builder(path) .withConf(conf) .build(); Profile p1 = reader.read();
val p = sqlContext.parquetFile("s3://test.parquet") val multipleParquet = sqlContext.parquetFile("s3://p1", "s3://p2")
schema merge
스키마가 다른 여러 parquet 파일을 로드할때는 스키마 머지가 필요한데, 스파크 1.5.0 부터는 디폴트로 옵션이 꺼져 있다. 그래서 교집합?만 로드가 된다. 아래처럼 mergeSchema 옵션을 켜고 하면 합집합(?)으로 로드 된다.
1
val p = sqlContext.read.option("mergeSchema", "true").parquetFile("s3://v1.parquet", "s3://v2.parquet")
avro schema evolution
avro를 사용하면 좋은 점은 스키마 변화가 있을때 유연하게 대처할 수 있는 점인데, 아래의 경우를 생각해보자.
Profile은 name필드만 있었는데, v2에서 “create_time”, “newStringNullDefault”, “union” 등이 추가 되었다. create_time, newStringNullDefault은 default값이 있고, union(둘중 하나)은 union으로 선언되어졌다. 이때 Profile 스키마로 저장된 parquet파일을 Profile2로 읽으면 어떻게 될까?
default 값을 가진 필드는 디폴트 값이 채워지고, union들은 생략된다. 아래 예제 참고.
// Write a record with a null value GenericRecord r = new GenericData.Record(schema); for (int i = 0; i < 100; i++) { r.put("name", "darren"); writer.write(r); } writer.close();
// Let's load the same file with v2 schema Schema v2 = new Schema.Parser().parse(new File("src/test/avro/profile2.avro")); Configuration conf = new Configuration(); AvroReadSupport.setAvroReadSchema(conf, v2);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file) .withConf(conf).build(); GenericRecord gr = reader.read();
앞서 저장 용량과 분석 속도를 위해서 parquet을 도입했다는 얘기를 했는데, parquet 으로 로드를 하면 DataFrame으로 로딩이 된다. DataFrame은 RDD + Schema로 볼 수 있겠다.
DataFraem의 경우 쿼리에는 최적화 되어 있지만(읽기 전용), 변환을 하고자 한다면 아무래도 rdd가 더 편한것이 사실이다. 이것을 변환을 하려면 RDD로 변경을 하고, 다시 DataFrame으로 변경을 해야 한다. 이것에 관련된 것들을 알아보자.
DataFrame <-> RDD->
DataFrame을 뭔가 변환을 하려면, rdd로 변환후 하는게 편한데 변환은 아주 쉽다. DataFrame은 rdd라는 함수를 가지고 있으니 그것만 불러주면 된다.
RDD를 DataFrame으로 변환하는 방법은 크게 2가지,
case class를 사용하거나
스키마를 사용하거나
아래 예제 참고
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// from dataframe to rdd val df = xxx val rdd = df.rdd
// create DataFrame from RDD[Row] val schema = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, true) :: Nil) val df1 = sqlContext.createDataFrame(rdd, schema)
// create DataFrame from RDD[User] using case class import sqlContext.implicits._ caseclassUser(name:String, age:Int) valrddUser= sc.textFile("xx").map(l=> User(l.split(",")(0), l.split(",")(1))) val df2 = rddUser.toDF()
Row and schema
DataFrame.rdd를 하면 RDD[Row]가 리턴되는데, Row는 getLong(i),getString(i) … 등의 함수를 가지고 있다. 그래서 타입은 DataFrame의 스키마로 알수가 있고 컬럼 명을 알면 값을 구할 수 있다.
1 2 3 4 5 6
val rdd = df.rdd val idxName = df.schema.fieldIndex("name") val idxAge = df.schema.fieldIndex("age")
트리. 트리는 알고리즘 계에서 혁신적인 발견의 하나인데, O(N)으로 걸릴 만한 것을 O(logN)으로 바꿔주는 마법 같은 data structure이다. 트리는 여러가지 쓰임새가 있지만 이중에서도 segment tree에 대해서 알아보자.
비슷한 용도로 쓰이는 Binary Indexed Tree가 있는데, 구현이 segment tree보다 간단하기 때문에 더 많이 쓰인다. 따라서, segment tree는 비추!. 나중에 업데이트 할 계획
When
N개의 배열에서 특정 range(start, end)의 구간합을 구한다고 생각해보자. naive로 하면 O(N)이 걸린다. 각 배열의 값이 변하지 않는다고 가정하면 O(1)으로 값을 구할 수 있다. psum[N]을 선언하고 0~n까지의 합을 미리 계산하여 캐쉬하고 있으면 psum[end] - psum[start]로 구간합을 바로 구할 수 있다. 만약 각 배열의 값이 변한다면 가장 효율적인 방법은 무엇일까.. 이때 segement tree를 이용하면 된다.
heap이랑 구조가 비슷하다. root는 모든 범위의 합을 가지고 있고(0 to N-1), left node는 왼쪽 반, right node는 오른쪽 반을 가지게 계속 나누어서 트리를 구성하면 된다. 각 노드는 자신의 범위를 가지고 그 범위의 구간합을 계산하여 가진다. left와 right의 값이 같아질때까지 이것을 반복하면 segment tree가 완성되게 된다. 예를 들어, 4개의 원소가 있는 구간트리를 도식화해 보면 아래처럼 구성 되게 된다.
기본적으로 [start, end] 구간의 중간을 잡아서 왼쪽,오른쪽을 계속 recursive하게 수행하면 된다. 종료 조건은 start와 end가 같을때이다. 그리고 left child의 인덱스는 2i+1이 되고, 오른쪽 차일드의 인덱스는 2i+2가 된다. 그리고 트리의 길이는 원 배열 길이의 4배 정도를 잡아주면 된다.
build
위 전제조건으로 구현해 보면 아래처럼 포현할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** a: input array tree: segment tree i: index of segment tree */ intbuild(int* a, int* tree, int start, int end, int i){ if (start==end){ tree[i] = a[start]; return tree[i]; }
int m = (end-start)/2; tree[i] = build(a,tree, start, m, 2*i+1) + build(a, tree, m+1, end, 2*i+2); return tree[i]; }
query
세그먼트 트리를 만들었으니 이제 임의의 범위를 주면 그 합을 리턴해주는 query함수를 만들어 보자.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// start,end : start/end index of tree // qs, qe : query start/end index intquery(int*st, int qs, int qe, int start, int end, int i){ // 구간을 벗어남 if (qe<start || qs>start){ return0; } // 쿼리가 범위 안에 있음 if (qs<=start && end<=qe){ return st[i]; }
// 걸쳐 있는 경우 int m = (end-start)/2; return query(st,qs,qe,start, m, 2*i+1) + query(st, qs, qe, m+1,end,2*i+2); }
update
원 배열의 값이 바뀔때 트리를 업데이트 해주는 함수. 해당 배열이 포함된 모든 노드의 값을 수정해 준다. O(logN)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
voidupdate(int*st, int dest, int orgValue, int newValue){ update2(st, dest, newValue-orgValue, 0, N-1, i); }
voidupdate2(int*st, int dest, int diff, int s,int e, int i){ if (dest<s || dest>e) return;
st[i] += diff; if (s==e){ return; } int m = (e-s)/2; update2(st, dest, diff, s, m, 2*i+1); update2(st, dest, diff, m+1, e, 2*i+2); }
AWS EMR를 사용하다보면 보안등의 이유로 로컬로만 웹서비스등을 오픈해 놓았다. 그래서 web ui는 접속이 되더라도 제대로 동작하지 않다. YARN의 Resource Manager등도 개별 노드로 가는 노드들은 다 링크를 찾지 못한다. 각 링크들이 private ip로 되어 있기 때문이다. 제플린도 마찬가지로 웹은 뜨지만 제플린 서버로 접속을 하지 못해서 disconnted상태로 뜬다. 그래서 이럴땐 ssh tunneling을 사용하면 fully 접근하는 길이 열리게 된다.
아마존에서는 아래처럼 이렇게 가이드를 해놓았다.
Hadoop, Ganglia, and other applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local webserver (http://localhost:port) and are not published on the Internet
ssh tunneling
나의 경우는 아래처럼 구성되어 있기 때문에 bastion으로 한번 터널링을 하고 bastion에서 다시 EMR master로 터널링을 해줘야 한다.
이런 경우 ssh를 두번 타고 들어가면 myserver까지 접속은 가능하다. 아래 처럼. -tt는 tty를 강제할당하는 명령임.
ssh -tt bas ssh -v -i <key>.pem ec2-user@myserver ls
이렇게 하면 명령어가 길어지므로, 아래처럼 proxyCommand를 사용하면 더 간편해 진다. proxyCommand는 ssh를 하기 전에 실행할 명령을 써 주는 용도로 사용되는데, 여기서는 미리 bas에 접속하여 tcp proxy를 설정하는 역할을 하게 된다. tcp proxy는 nc(netcat)를 통하여 동작한다. 아래처럼 셋팅이 되어 있으면, 위의 명령어가 ssh myserver ls 이렇게 간단해 진다.
1 2 3 4 5 6
Host myserver User darren Hostname 10.0.0.99 Port 22 IdentityFile ~/.ssh/mykey.pem ProxyCommand ssh -qaY bas 'nc -w 14400 %h %p'
run multiple bash commands with ssh
bash의 here document 를 사용하면 된다. 아래 예제에서 -o SendEnv는 로컬의 환경변수를 remote로 전달하기 위해서 사용했다.
1 2 3 4 5 6 7 8
ssh -o SendEnv=AWS_ACCESS_KEY_ID -o SendEnv=AWS_SECRET_ACCESS_KEY dev <<EOF echo$AWS_ACCESS_KEY_ID, $AWS_SECRET_ACCESS_KEY cd /home/ec2-user sudo aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID sudo aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY sudo aws configure set default.region us-west-2 ll EOF
helm은 이맥스상에서 무엇인가를 선택할때(find-files, switch-buffer, recentf) 사용하는 프레임웍이다. 기존에는 ido-mode를 사용하고 있었는데 이번에 helm으로 바꾸면서 정리해 본다. 대부분의 내용은 레퍼런스에 있는 것을 번역한 수준임.
설치
M-x list-packages 에서 `helm’ 패키지를 찾아서 설치한다.
설정
1 2
(require 'helm-config) (helm-mode1)
사용법
기존의 tab-completion류의 것이랑은 다른 컨셉이라서 기본 설명이 필요할듯 하다.
helm 모드에 들어가면 pattern을 입력한다.
pattern은 N개 입력이 가능하고, 각각은 공백으로 구분한다. pattern은 regex일 수도 있다
메이븐으로 프로젝트를 진행하다가 이 것을 배포해야 할때 configuration 파일들이나 헬퍼 스크립트 등을 디렉토리로 구성해서 zip/tar.gz으로 배포해야 할 필요가 있다. 그런것을 메이븐 상에서 가능하게 해주는 것이 maven assembly plugin 이다.
mvn configuration
우선 assembly plugin을 사용하기 위해서 build/plugins 하위에 명시해 준다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
<plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <descriptors> <descriptor>src/assemble/distribution.xml</descriptor> </descriptors> </configuration> <executions> <execution> <id>make-assembly</id><!-- this is used for inheritance merges --> <phase>package</phase><!-- bind to the packaging phase --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
configuration 하위에 어떻게 assemble을 할지를 기술하는 assembly descriptor를 명시할 수 있다. 물론 복수개도 가능.
디폴트로 지원하는 descriptor도 있음. 예를 들면 jar-with-dependencies 같은 경우. 이것을 명시하면 fat-jar를 만들어 줌.
assembly:single goal만 유효함. 이것을 package phase에 실행하려면 위처럼 execution tag 하위에 phase와 goal을 명시하여 주면 됨.
assembly descriptor
moduleSet은 메이븐 모듈의 관점에서 처리를 하는 것이고. 소스, 바이너리등의 처리를 할 수 있다. 아래처럼 하면 소스들을 archive로 카피해 준다.
1 2 3 4 5 6 7 8 9 10 11
<moduleSet> <useAllReactorProjects>true</useAllReactorProjects><!--이것은 모든 모듈들에 대해서 처리를 하고 싶을때 사용한다. --> <includes> <include>com.nberserk:core</include> </includes> <sources> <outputDirectoryMapping> ${module.basedir.name} </outputDirectoryMapping> </sources> </moduleSet>
반면 dependencySet은 각 모듈의 산출물과 디펜던시를 가지는 것의 산출물들을 어떻게 가져 올것 인지에 대해서 기술한다.
이제 문서화를 할 차례다. Hello class에는 @Api, @ApiOperation annotation으로 Model class에는 @ApiModel, @ApiModelProperty로 annotation을 달아주면 나중에 swagger-ui로 보면 어노테이션한 설명들이 표시가 된다.
@ApiModelProperty(value = "description") public String getDesc(){ return desc; }
@Override public String toString(){ return"id: " + id + ", desc: " + desc; } }
Swagger가 잘 설정되었는지를 보려면 브라우저로 rest/swagger.json 여기 들어가서 json 파일이 보면 설정이 잘 된 것이다.
swagger-ui
자 이제 모든 준비는 끝났다. swagger-ui에 가서 http://localhost:8080/rest/swagger.json을 입력하면 annotation에서 기술했던 정보들이 문서화 되어서 나오고, 여기서 바로 rest api를 불러 볼 수 도 있다. 모든 개발자는 문서화를 싫어하고, 소스와 싱크를 맞추는 일 또한 노력과 정성이 들어가기 때문에 소스에 annotation만 잘 해주면 싱크도 맞춰지니까 유용한 툴이 될 것이다. 아래 스크린샷 참고.
이 중 MyResource.java가 리소스 파일이 되는데, 리네임해서 Hello.java로 만들자. 여기에 @Path만 붙여주면 rest api 처리하는 리소스 클래스가 된다. @GET, @POST ,@PUT , @DELETE 어노테이션을 하는 것만으로 쉽게 속성을 수정할 수 있다. 더구나 @Produces 를 사용해서 POJO class를 xml 혹은 json으로 인코딩 해서 보낼 수 있다. 멋지지 않는가? (아래의 Model class는 POJO class이다. 즉, 중요하지 않다.)
mvn jetty:run을 실행하면 localhost:8080으로 서버를 실행한다. 그러면 직접 rest api를 call 해 볼 수 있다. jetty:run이 무엇을 하는지 궁금하다면 여기 를 보자. 내가 요약해주면 각 기본 디렉토리(src/main/webapp, classes in outdir)에 있는 파일들을 jetty로 deploy 해준다. 특정 파일에 변경이 생기면 업데이트 해주는 센스까지..
그럼 아래 url들을 차례로 브라우저에서 접근해보자. 각각 text, json, xml 포맷으로 response가 온다면 제대로 동작하고 있는 것.
Spark에서는 data를 RDD로 abstract한다. RDD는 distributable 하고, 하나의 노드가 사고로 죽었을때에도 다시 생성해 낼수 있다. 캐쉬의 단위이기도 하고 파일에 쓸 수 도 있다. 스파크의 기본 데이터 스트럭쳐 되겠다. RDD는 쉽게 생각해서 row의 묶음 이라고 생각하면 된다. 각 row가 transformation의 대상이 되는 것이고.
이 RDD는 크게 2개의 종류가 있고. 이는 transformation과 actions이다. transformation는 바로 실행하지 않고 defer될 수 있고 action이 일어날때 lazy 하게 실행된다. action은 collect, count, take 등이고, 나머지는 모두 transformation이다. 보면 당장의 결과가 필요한 애들만 action이다. 그래서 transformation이 lazy하게 시행될 수 있는 것이다.
basic
map, flatMap
가장 많이 사용하는 map! 은 말 그대로 하나의 입력을 인자로 주어지는 function을 통해서 다른 입력값으로 map 한다.
아래 예제는 주어진 수를 제곱해주는 예제.
1 2 3 4
val rdd = sc.parallelize(List(1,2,3,4)) val squared = rdd.map( n => n*n).collect -------------------------------------- squared: Array[Int] = Array(1, 4, 9, 16)
flatMap이 map과 다른 점은 interable한 값을 리턴하는 대신 그것을 flatten해서 변환한다는 것이다. 아래예제를 보면 split을 하게 되면 String array가 만들어지는데 이것을 개별로 리턴해준다. split vs split2 비교해보면 이해가 잘 될듯하다.
1 2 3 4 5 6
val rdd = sc.parallelize(List("a b","v x")) val split = rdd.map( _.split(" ")).collect val split2 = rdd.flatMap( n => n.split(" ")).collect -------------------------------------- split: Array[Array[String]] = Array(Array(a, b), Array(v, x)) split2: Array[String] = Array(a, b, v, x)
filter
filter는 말그대로 true를 리턴하는 값만 가지고 나머지는 다 버리겠다는 것.
1 부터 10까지의 수중 짝수만 남았다
1 2 3 4
val rdd = sc.parallelize((1 to 10).toList) val filtered = rdd.filter(_ %2==0).collect -------------------------------------- filtered: Array[Int] = Array(2, 4, 6, 8, 10)
distinct, union, intersection, subtract
distinct는 중복값을 제거한후 리스트를 제공하고, union는 두 rdd의 합집합을, intersection은 교집합, subtract는 원집합에서 교집합을 제거 한다.
1 2 3 4 5 6 7 8 9 10 11 12
val a = sc.parallelize(List(1,1,2,3,3,4)) val b = sc.parallelize(List(3,4,5,5,6))