Inertia

Parquet + Avro + Spark

데이터 분석을 위해 파일을 저장해야 할 필요가 있었다. 처음에는 csv파일 형식으로 저장을 했는데, 시간이 지남에 따라서 새로운 컬럼이 생기는 요구사항이 생겼다. 이런 경우 csv는 어떤 정보가 몇번째 컬럼에 있는지를 기술하지 않기 때문에 또 다른 파일에 컬럼 정보를 기록하고 데이터 타입등도 기술을 해줘야 하는 불편함이 생긴다.
언뜻 parquet이 그런 일을 하는것이라 어렴풋이 알고 있었기 때문에 이번에 parquet을 적용해 볼겸 조사를 해봤다.

특징들..

  • 압축 지원. 50% 정도 세이브 할 수 있다고 함. 스토리지 비용이 반이라는 얘기.
  • 여러가지 serialize framework 지원 (Avro, Thrift, protocol buffer)

column based

columnar storage다. 이렇게 접근한 이유는 크게 2가지.

보통 쿼리를 할때 모든 컬럼의 정보가 다 필요한 경우는 많지 않다. row based 라면 전체 열을 다 읽어야 쿼리를 수행할 수 있지만, parquet는 필요한 컬럼만 로드하면 된다. 여기서 속도 향샹이 생긴다.

그리고 컬럼끼리 모아서 압축을 하면 압축률이 더 좋아진다. timestamp를 가지는 컬럼이라고 생각하면 델터 인코딩 방식으로 압축을 하면 좋을 것이고, 각 컬럼의 특징이 살아 있으니 더 유리하다.

2G의 원본 csv를 SNAPPY압축을 이용해서 저장하니 1G로 50%정도나 줄어들었다.

그리고 query performance도 일반 텍스트에 비해서 2배 정도 성능이 좋다고 한다.

단점

마냥 좋기만 한것은 아니다. 분석용으로 최고의 파일 형식이라고 한 것은 데이터의 업데이트가 없다는 뜻이다. 즉, readonly일때 좋은 것이다.

환경 설정

spark + parquet + avro 를 사용하려면 다음과 같은 디펜던시가 필요하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<properties>
<scala.version>2.11.4</scala.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<parquet.version>1.8.1</parquet.version>
<avro.version>1.7.7</avro.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
</dependencies>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

avro schema define

자세한 사항은 avro spec을 보면 되고, 아래처럼 정의 하면 된다. 이 파일을 avro-tools.jar를 이용하면 POJO class를 만들 수 있고 이 파일을 이용하면 프로그래밍이 조금더 이뻐질 수 있다. 아래 read/save 예제에서 User class를 사용하는데 이것이 스키마를 바탕으로 생성된 클래스이다. 필수는 아니고 POJO class가 없을때는 GenericRecord를 사용할 수 도 있다.

1
2
3
4
5
6
7
8
9
10
{
"namespace": "com.nberserk.example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "age", "type": "int"},
{"name": "weight", "type":"float"}
]
}

parquet save/read in java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Schema schema = new Schema.Parser().parse(new File("src/test/avro/user.avro"));
File tmp = new File("test.parquet");
Path path = new Path(tmp.getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

// Write a record with GenericRecord
GenericRecord r = new GenericData.Record(schema);
r.put("uid", "darren");
r.put("age", 22);
r.put("weight", 70.0);
writer.write(r);
writer.close();

/*
// write a record using generated POJO class called User
ParquetWriter<User> writer = AvroParquetWriter.<User>builder(path)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withSchema(schema)
.build();

User p = new Profile();
p.setId("darren");
p.setAge(22);
p.setWeight(70.0);
writer.write(p);
writer.close();
*/
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, Profile.SCHEMA$);
ParquetReader<Profile> reader = AvroParquetReader.<Profile>builder(path)
.withConf(conf)
.build();
Profile p1 = reader.read();

assertEquals("darren", p1.getUid().toString());
assertEquals(22, p1.getAge());
assertEquals(77.0, p1.getWeight());

Spark에서 parquet 읽기

spark 1.5.0 미만에서는 [[https://issues.apache.org/jira/browse/SPARK-6566][parquet로드시 익셉션을 내는 버그가 있었는데]] 그때는 스파크 버전을 1.5.0 이상으로 올리면 해결 된다.

1
2
val p = sqlContext.parquetFile("s3://test.parquet")
val multipleParquet = sqlContext.parquetFile("s3://p1", "s3://p2")

schema merge

스키마가 다른 여러 parquet 파일을 로드할때는 스키마 머지가 필요한데, 스파크 1.5.0 부터는 디폴트로 옵션이 꺼져 있다. 그래서 교집합?만 로드가 된다.
아래처럼 mergeSchema 옵션을 켜고 하면 합집합(?)으로 로드 된다.

1
val p = sqlContext.read.option("mergeSchema", "true").parquetFile("s3://v1.parquet", "s3://v2.parquet")

avro schema evolution

avro를 사용하면 좋은 점은 스키마 변화가 있을때 유연하게 대처할 수 있는 점인데, 아래의 경우를 생각해보자.

Profile은 name필드만 있었는데, v2에서 “create_time”, “newStringNullDefault”, “union” 등이 추가 되었다. create_time, newStringNullDefault은 default값이 있고, union(둘중 하나)은 union으로 선언되어졌다. 이때 Profile 스키마로 저장된 parquet파일을 Profile2로 읽으면 어떻게 될까?

default 값을 가진 필드는 디폴트 값이 채워지고, union들은 생략된다. 아래 예제 참고.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*

// Profile avro spec
{"namespace": "com.nberserk.avro",
"type": "record",
"name": "Profile",
"fields": [
{"name": "name", "type": "string"}
]
}

// Profile2 avro spec
{"namespace": "com.nberserk.avro",
"type": "record",
"name": "Profile2",
"fields": [
{"name": "name", "type": "string"}
{"name": "create_time", "type":"long", "default":0},
{"name": "union", "type": ["null", "string"]},
{"name": "value_default", "type": "string", "default": "null"}
]
}
,*/
Schema schema = new Schema.Parser().parse(new File("src/test/avro/profile.avro"));

File tmp = new File("test2.tmp");
tmp.deleteOnExit();
tmp.delete();

Path file = new Path(tmp.getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
.build();

// Write a record with a null value
GenericRecord r = new GenericData.Record(schema);
for (int i = 0; i < 100; i++) {
r.put("name", "darren");
writer.write(r);
}
writer.close();


// Let's load the same file with v2 schema
Schema v2 = new Schema.Parser().parse(new File("src/test/avro/profile2.avro"));
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, v2);

ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file)
.withConf(conf).build();
GenericRecord gr = reader.read();

assertEquals("darren", gr.get("name").toString());
assertEquals(0L, gr.get("create_time"));
assertEquals("null", gr.get("newStringNullDefault").toString());

performance comparison

원본 사이즈, txt로는 20G, parquet로는 6.5G 정도임 row수는 1억1천 row정도의 데이터.

file size, 20G 파일이 parquet으로 저장하니 6.5G 정도로 작아짐. 50% 이상.
query, 특정 값을 가진 사용자들의 수를 쿼리하는 것을 각각 측정해보니 txt의 경우는 186초. parquet의 경우는 105초 정도가 소요됨.

따라서, 스토리지 용량/ 쿼리 속도 면에서 모두 만족할 만한 결과를 보임.

trouble shooting

writing parquet files to s3 is too slow

csv에서 parquet파일로 바꾸고 난후에 S3에 write 할때 시간이 너무 오래 걸려서 분석해보니 write.parquet()의 시간이 너무 오래 걸렸다. 검색을 해보니 [[http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration][spark.sql.parquet.output.committer.class 을 org.apache.spark.sql.parquet.DirectParquetOutputCommitter 로 설정]]하면 괜찮아진다는 얘기가 있다. 수정해보니 효과가 있었다.

1
2
//sc : SparkContext
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class", "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

revision history

  • 2015/12/10 initial draft
  • 2015/12/18 add schema evolution
  • trouble shooting/s3 slow added

RDD and DataFrame

앞서 저장 용량과 분석 속도를 위해서 parquet을 도입했다는 얘기를 했는데, parquet 으로 로드를 하면 DataFrame으로 로딩이 된다. DataFrame은 RDD + Schema로 볼 수 있겠다.

DataFraem의 경우 쿼리에는 최적화 되어 있지만(읽기 전용), 변환을 하고자 한다면 아무래도 rdd가 더 편한것이 사실이다. 이것을 변환을 하려면 RDD로 변경을 하고, 다시 DataFrame으로 변경을 해야 한다. 이것에 관련된 것들을 알아보자.

DataFrame <-> RDD

DataFrame을 뭔가 변환을 하려면, rdd로 변환후 하는게 편한데 변환은 아주 쉽다. DataFrame은 rdd라는 함수를 가지고 있으니 그것만 불러주면 된다.

RDD를 DataFrame으로 변환하는 방법은 크게 2가지,

  • case class를 사용하거나
  • 스키마를 사용하거나

아래 예제 참고

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// from dataframe to rdd
val df = xxx
val rdd = df.rdd

// create DataFrame from RDD[Row]
val schema = StructType(
StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)
val df1 = sqlContext.createDataFrame(rdd, schema)

// create DataFrame from RDD[User] using case class
import sqlContext.implicits._
case class User(name:String, age:Int)
val rddUser = sc.textFile("xx").map(l=> User(l.split(",")(0), l.split(",")(1)))
val df2 = rddUser.toDF()

Row and schema

DataFrame.rdd를 하면 RDD[Row]가 리턴되는데, Row는 getLong(i),getString(i) … 등의 함수를 가지고 있다. 그래서 타입은 DataFrame의 스키마로 알수가 있고 컬럼 명을 알면 값을 구할 수 있다.

1
2
3
4
5
6
val rdd = df.rdd
val idxName = df.schema.fieldIndex("name")
val idxAge = df.schema.fieldIndex("age")

// age만 남게 변환
rdd.map(r => _.getInt(idxAge))

revision history

  • 2015/12/17 initial draft

segment tree

트리. 트리는 알고리즘 계에서 혁신적인 발견의 하나인데, O(N)으로 걸릴 만한 것을 O(logN)으로 바꿔주는 마법 같은 data structure이다. 트리는 여러가지 쓰임새가 있지만 이중에서도 segment tree에 대해서 알아보자.

비슷한 용도로 쓰이는 Binary Indexed Tree가 있는데, 구현이 segment tree보다 간단하기 때문에 더 많이 쓰인다. 따라서, segment tree는 비추!. 나중에 업데이트 할 계획

When

N개의 배열에서 특정 range(start, end)의 구간합을 구한다고 생각해보자. naive로 하면 O(N)이 걸린다.
각 배열의 값이 변하지 않는다고 가정하면 O(1)으로 값을 구할 수 있다. psum[N]을 선언하고 0~n까지의 합을 미리 계산하여 캐쉬하고 있으면 psum[end] - psum[start]로 구간합을 바로 구할 수 있다.
만약 각 배열의 값이 변한다면 가장 효율적인 방법은 무엇일까.. 이때 segement tree를 이용하면 된다.

time complexity
construction O(NlogN)
query O(logN)
update O(logN)

idea

heap이랑 구조가 비슷하다. root는 모든 범위의 합을 가지고 있고(0 to N-1), left node는 왼쪽 반, right node는 오른쪽 반을 가지게 계속 나누어서 트리를 구성하면 된다. 각 노드는 자신의 범위를 가지고 그 범위의 구간합을 계산하여 가진다. left와 right의 값이 같아질때까지 이것을 반복하면 segment tree가 완성되게 된다.
예를 들어, 4개의 원소가 있는 구간트리를 도식화해 보면 아래처럼 구성 되게 된다.

1
2
3
4
5
6
7
8
9
                  +--------+
| (0,3) |
+--------+
+--------+ +--------+
| (0,1) | | (2,3) |
+--------+ +--------+
+--------+ +--------+ +--------+ +--------+
| (0,0) | | (1,1) | | (2,2)| | (3,3) |
+--------+ +--------+ +--------+ +--------+

implementation

기본적으로 [start, end] 구간의 중간을 잡아서 왼쪽,오른쪽을 계속 recursive하게 수행하면 된다. 종료 조건은 start와 end가 같을때이다. 그리고 left child의 인덱스는 2i+1이 되고, 오른쪽 차일드의 인덱스는 2i+2가 된다.
그리고 트리의 길이는 원 배열 길이의 4배 정도를 잡아주면 된다.

build

위 전제조건으로 구현해 보면 아래처럼 포현할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
a: input array
tree: segment tree
i: index of segment tree
*/
int build(int* a, int* tree, int start, int end, int i){
if (start==end){
tree[i] = a[start];
return tree[i];
}

int m = (end-start)/2;
tree[i] = build(a,tree, start, m, 2*i+1) +
build(a, tree, m+1, end, 2*i+2);
return tree[i];
}

query

세그먼트 트리를 만들었으니 이제 임의의 범위를 주면 그 합을 리턴해주는 query함수를 만들어 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// start,end : start/end index of tree
// qs, qe : query start/end index
int query(int*st, int qs, int qe, int start, int end, int i){
// 구간을 벗어남
if (qe<start || qs>start){
return 0;
}
// 쿼리가 범위 안에 있음
if (qs<=start && end<=qe){
return st[i];
}

// 걸쳐 있는 경우
int m = (end-start)/2;
return query(st,qs,qe,start, m, 2*i+1) + query(st, qs, qe, m+1,end,2*i+2);
}

update

원 배열의 값이 바뀔때 트리를 업데이트 해주는 함수. 해당 배열이 포함된 모든 노드의 값을 수정해 준다. O(logN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void update(int*st, int dest, int orgValue, int newValue){
update2(st, dest, newValue-orgValue, 0, N-1, i);
}

void update2(int*st, int dest, int diff, int s,int e, int i){
if (dest<s || dest>e) return;

st[i] += diff;
if (s==e){
return;
}
int m = (e-s)/2;
update2(st, dest, diff, s, m, 2*i+1);
update2(st, dest, diff, m+1, e, 2*i+2);
}

revision history

  • 2015/11/16 initial draft
  • 2016/11/8 typo fixed

ssh tunneling on AWS

AWS EMR를 사용하다보면 보안등의 이유로 로컬로만 웹서비스등을 오픈해 놓았다. 그래서 web ui는 접속이 되더라도 제대로 동작하지 않다. YARN의 Resource Manager등도 개별 노드로 가는 노드들은 다 링크를 찾지 못한다. 각 링크들이 private ip로 되어 있기 때문이다. 제플린도 마찬가지로 웹은 뜨지만 제플린 서버로 접속을 하지 못해서 disconnted상태로 뜬다. 그래서 이럴땐 ssh tunneling을 사용하면 fully 접근하는 길이 열리게 된다.

아마존에서는 아래처럼 이렇게 가이드를 해놓았다.

Hadoop, Ganglia, and other applications publish user interfaces as websites hosted on the master node. For security reasons, these websites are only available on the master node’s local webserver (http://localhost:port) and are not published on the Internet

ssh tunneling

나의 경우는 아래처럼 구성되어 있기 때문에 bastion으로 한번 터널링을 하고 bastion에서 다시 EMR master로 터널링을 해줘야 한다.

1
2
3
+----------+         +---------+        +------------+
|localhost | <----> |bastion | <---> | EMR master |
+----------+ +---------+ +------------+

Zeppelin을 예로 들어서 터널링을 해보자. Zeppelin의 경우 emr-4.1에서 8890포트로 설정이 되어 있어서 우선 내 로컬의 8890포트를 bastion의 8890포트로 포워딩 시켜준다.

1
ssh -i <your.pem>  -L 8890:localhost:8890  ec2-user@bastion

이렇게 하면 bastion으로 ssh 접속이 되었을 것이고 다시 EMR master로 8890 포트를 포워딩 한다.

1
ssh -i <your.pem> -p 22 -L 8890:localhost:8890 hadoop@ec2-??-??-??-???.us-west-2.compute.amazonaws.com

이렇게 하면 이제 EMR master로 ssh 접속이 된 상태일 것이고. 그러고 난 후 http://localhost:8890 에 접속을 하면 제플린 웹에 접속할 수 있게 된다. 오른쪽 상단에 ‘connected’라고 되어 있을 것이다.

options

ssh 명령을 줄때 다음 옵션들을 경우에 맞게 사용하면 아주 유용.

  • -v : verbose. 뭔가 잘 안될때 이 옵션을 사용하자
  • -N : ssh 접속을 하지 않는다.

revision history

  • 2015/10/30 initial draft

ssh config

aws를 사용하다보면 ssh 작업을 많이 하게 되는데, 이 때 알아두면 너무 너무 편리한 팁들이다.

passwordless ssh

나의 public key(.ssh/id_rsa.pub)를 대상 컴의 .ssh/authrized_keys 에 복사해주면
ssh user@server ls /를 패스워드 없이 바로 실행할 수 있다.

cat ~/.ssh/id_rsa.pub | ssh stg ‘cat >> .ssh/authorized_keys’

만약 .ssh 폴더가 없다면, ssh-keygen -t rsa로 생성해 주면 된다.

.ssh/config

ssh의 여러가지 설정을 담고 있는 설정 파일이다.

1
2
3
Host my
User darren
Hostname 10.1.1.1

위 설정 파일을 사용하면 ssh my만 타이핑 하면, ssh darren@10.1.1.1로 번역되어 실행 된다.

1
2
3
4
5
Host bas
User ec2-user
Hostname bastion
Port 352
IdentityFile ~/.ssh/bastion.pem

위 설정 파일은 ssh basssh -i ~/.ssh/bastion.pem ec2-user@bastion -p 352 로 해석 된다.

ssh tunneling

특정 포트를 포트 포워딩할 수도 있다. 보통 회사의 네트웍 같은 경우 80 포트를 제외한 나머지는 모두 방화벽으로 막혀 있는 경우가 많은데 이럴때 유용하게 사용할 수 있다.

1
2
3
4
5
Host myserver
HostName mysql
IdentityFile ~/.ssh/mysql.key
LocalForward 9906 127.0.0.1:3306
User mysqlUser

database컴의 3306 포트가 localhost:9906로 포트 포워딩 된다. localhost:9906에서 mysql의 port에 접근할 수 있다.

ControlPath

control path를 사용하면 ssh connection을 캐쉬 해서 접속 시간을 줄일 수 있어, 반복적으로 ssh 명령을 불러야 할때 사용하면 유용하다.

1
2
Host *
ControlPath ~/.ssh/mux-%r@%h:%p

proxyCommand

클라우드를 사용하면 회사 방화벽때문에 connector(bastion) 서버에 우선 접속한 뒤, 실제 서버로 연결해야 하는 경우가 많이 있다. 예를 들면 아래와 같은 경우.

1
2
3
+----------+         +---------+        +-----------+
|localhost | <----> |bastion | <---> | myserver |
+----------+ +---------+ +-----------+

이런 경우 ssh를 두번 타고 들어가면 myserver까지 접속은 가능하다. 아래 처럼. -tt는 tty를 강제할당하는 명령임.

ssh -tt bas ssh -v -i <key>.pem ec2-user@myserver ls

이렇게 하면 명령어가 길어지므로, 아래처럼 proxyCommand를 사용하면 더 간편해 진다. proxyCommand는 ssh를 하기 전에 실행할 명령을 써 주는 용도로 사용되는데, 여기서는 미리 bas에 접속하여 tcp proxy를 설정하는 역할을 하게 된다. tcp proxy는 nc(netcat)를 통하여 동작한다. 아래처럼 셋팅이 되어 있으면, 위의 명령어가 ssh myserver ls 이렇게 간단해 진다.

1
2
3
4
5
6
Host myserver
User darren
Hostname 10.0.0.99
Port 22
IdentityFile ~/.ssh/mykey.pem
ProxyCommand ssh -qaY bas 'nc -w 14400 %h %p'

run multiple bash commands with ssh

bash의 here document 를 사용하면 된다. 아래 예제에서 -o SendEnv는 로컬의 환경변수를 remote로 전달하기 위해서 사용했다.

1
2
3
4
5
6
7
8
ssh -o SendEnv=AWS_ACCESS_KEY_ID -o SendEnv=AWS_SECRET_ACCESS_KEY dev <<EOF
echo $AWS_ACCESS_KEY_ID, $AWS_SECRET_ACCESS_KEY
cd /home/ec2-user
sudo aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
sudo aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY
sudo aws configure set default.region us-west-2
ll
EOF

Reference

revision history

  • 2015/8/24 initial draft
  • 8/28 ControlPath added
  • 9/1 proxy command added
  • 2017/3/29 added run multiple bash commands

emacs helm

helm은 이맥스상에서 무엇인가를 선택할때(find-files, switch-buffer, recentf) 사용하는 프레임웍이다. 기존에는 ido-mode를 사용하고 있었는데 이번에 helm으로 바꾸면서 정리해 본다. 대부분의 내용은 레퍼런스에 있는 것을 번역한 수준임.

설치

M-x list-packages 에서 `helm’ 패키지를 찾아서 설치한다.

설정

1
2
(require 'helm-config)
(helm-mode 1)

사용법

기존의 tab-completion류의 것이랑은 다른 컨셉이라서 기본 설명이 필요할듯 하다.

  1. helm 모드에 들어가면 pattern을 입력한다.
  2. pattern은 N개 입력이 가능하고, 각각은 공백으로 구분한다. pattern은 regex일 수도 있다
  3. helm은 후보들 중에서 pattern과 매칭하는 후보들을 리스트 업 하고 RET로 선택
  4. C-n/C-p 로 아래/위로, C-v M-v로 아래페이지/위 페이지로 이동
  5. C-spc로 후보를 마크할 수 있다. M-a로 모두 다 선택이다.
  6. mark한 파일 list를 C-c C-i로 현재 버퍼에 카피할 수 있다.
  7. C-t로 helm buffer를 vertical 또는 horizontal로 스위칭

Reference

revision history

  • 2015/8/16 initial draft

maven assembly plugin으로 패키징 해서 배포하기

메이븐으로 프로젝트를 진행하다가 이 것을 배포해야 할때 configuration 파일들이나 헬퍼 스크립트 등을 디렉토리로 구성해서 zip/tar.gz으로 배포해야 할 필요가 있다. 그런것을 메이븐 상에서 가능하게 해주는 것이 maven assembly plugin 이다.

mvn configuration

우선 assembly plugin을 사용하기 위해서 build/plugins 하위에 명시해 준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<descriptors>
<descriptor>src/assemble/distribution.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
  • configuration 하위에 어떻게 assemble을 할지를 기술하는 assembly descriptor를 명시할 수 있다. 물론 복수개도 가능.
  • 디폴트로 지원하는 descriptor도 있음. 예를 들면 jar-with-dependencies 같은 경우. 이것을 명시하면 fat-jar를 만들어 줌.
  • assembly:single goal만 유효함. 이것을 package phase에 실행하려면 위처럼 execution tag 하위에 phase와 goal을 명시하여 주면 됨.

assembly descriptor

moduleSet은 메이븐 모듈의 관점에서 처리를 하는 것이고. 소스, 바이너리등의 처리를 할 수 있다. 아래처럼 하면 소스들을 archive로 카피해 준다.

1
2
3
4
5
6
7
8
9
10
11
<moduleSet>
<useAllReactorProjects>true</useAllReactorProjects> <!--이것은 모든 모듈들에 대해서 처리를 하고 싶을때 사용한다. -->
<includes>
<include>com.nberserk:core</include>
</includes>
<sources>
<outputDirectoryMapping>
${module.basedir.name}
</outputDirectoryMapping>
</sources>
</moduleSet>

반면 dependencySet은 각 모듈의 산출물과 디펜던시를 가지는 것의 산출물들을 어떻게 가져 올것 인지에 대해서 기술한다.

1
2
3
4
5
6
7
8
9
10
11
<dependencySet>
<includes>
<include>com.nberserk:core</include> <!--core를 루트 드렉토리로 카피-->
</includes>
</dependencySet>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<excludes>
<exclude>com.nberserk:core</exclude> <!--core가 refer하는 모든 jar들을 lib으로 카피-->
</excludes>
</dependencySet>

이 외에도 fileSet 을 사용해서 특정파일이나 폴더를 패키징할 수도 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<fileSets>
<fileSet>
<directory>../</directory>
<includes>
<include>*.md</include> <!--*.md 파일을 루트로 복사-->
</includes>
</fileSet>
<fileSet>
<directory>../bin</directory> <!--bin디렉토리에 있는것을 bin으로 복사-->
<directoryMode>0755</directoryMode>
<fileMode>0755</fileMode> <!--파일 모드를 변경해줌-->
</fileSet>
<fileSet>
<directory>../conf</directory>
</fileSet>
</fileSets>

Reference

revision history

  • 2015/7/32 initial draft

Swagger로 rest api 문서화 하기

지난 번에 구현했던 rest api 프로젝트에 Swagger를 통해서 문서화 하고 rest api를 테스트 해 볼 수 있는 web ui를 만드는 방법을 소개합니다.

스웨거는 rest api를 문서화 하고 기술하는 스펙임. swagger.json 혹은 swagger.yaml이 곧 rest api의 명세를 작성하는 방법이 되는 것. 서브 프로젝트로

  • swagger-editor : 스웨거 명세를 작성하게 도와주는 텍스트 에디터. 온라인 데모
  • swagger-ui: swagger api를 문서화 해서 보여주고 직접 rest api call도 해볼 수 있음 온라인 데모
  • swagger-codegen : swagger 명세를 기반으로 각 언어로 소스 뼈대를 만들어줌. Java, Scala, node 등을 지원해 줌.
    등이 있고, 잘 사용하면 유용하게 사용할 수 있다.

swagger integration

먼저 pom.xml 에 swagger dependency를 추가하고

1
2
3
4
5
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-jersey2-jaxrs</artifactId>
<version>1.5.0</version>
</dependency>

web.xml. Jersey2Config는 swagger.json을 위한 것이고 기타 api.version과 basepath등을 여기서 설정해 준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<servlet>
<servlet-name>jersey</servlet-name>
<servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
<init-param>
<param-name>jersey.config.server.provider.packages</param-name>
<param-value>
io.swagger.jaxrs.listing,
com.nberserk.rest
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet>
<servlet-name>Jersey2Config</servlet-name>
<servlet-class>io.swagger.jersey.config.JerseyJaxrsConfig</servlet-class>
<init-param>
<param-name>api.version</param-name>
<param-value>1.0.0</param-value>
</init-param>
<init-param>
<param-name>swagger.api.basepath</param-name>
<param-value>http://localhost:8080/rest</param-value>
</init-param>
<load-on-startup>2</load-on-startup>
</servlet>

Swagger로 annotation추가 하기

이제 문서화를 할 차례다. Hello class에는 @Api, @ApiOperation annotation으로 Model class에는 @ApiModel, @ApiModelProperty로 annotation을 달아주면 나중에 swagger-ui로 보면 어노테이션한 설명들이 표시가 된다.

annotation된 Hello class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Api(value="/hello", description="hello APIs")
@Path("hello")
public class Hello {

static Model model;
static {
model = new Model("Darren");
model.setDesc("description");
}

@ApiOperation(value = "", notes = "Gets model (text)", response = Model.class)
@Path("/text")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String getText(){
return model.toString();
}

@ApiOperation(value = "", notes = "Gets model (json)", response = Model.class)
@Path("/json")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getJson() {
return Response.ok().entity(model).build();
}

@ApiOperation(value = "", notes = "Gets model (xml)", response = Model.class)
@Path("/xml")
@GET
@Produces(MediaType.APPLICATION_XML)
public Response getXml(){
return Response.ok().entity(model).build();
}
}

annotation된 Model class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
ApiModel(description="Model")
@XmlRootElement
public class Model {
String id;
String desc;

public Model(String id){
this.id = id;
}

@ApiModelProperty(required = true, value = "id")
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public void setDesc(String desc) {
this.desc = desc;
}

@ApiModelProperty(value = "description")
public String getDesc() {
return desc;
}

@Override
public String toString() {
return "id: " + id + ", desc: " + desc;
}
}

Swagger가 잘 설정되었는지를 보려면 브라우저로 rest/swagger.json 여기 들어가서 json 파일이 보면 설정이 잘 된 것이다.

swagger-ui

자 이제 모든 준비는 끝났다. swagger-ui에 가서 http://localhost:8080/rest/swagger.json을 입력하면 annotation에서 기술했던 정보들이 문서화 되어서 나오고, 여기서 바로 rest api를 불러 볼 수 도 있다. 모든 개발자는 문서화를 싫어하고, 소스와 싱크를 맞추는 일 또한 노력과 정성이 들어가기 때문에 소스에 annotation만 잘 해주면 싱크도 맞춰지니까 유용한 툴이 될 것이다. 아래 스크린샷 참고.

swagger-ui screenshot

Reference

실제로 동작하는 프로젝트는 github에 공유되어 있으니 참고하시면 되겠다.

revision history

  • 2015/7/14 initial draft
  • 2015/7/17 rest/swagger 분리

Jersey로 rest api 서버 만들기

Jetty 를 이용해서 rest api 서버를 만들고 Jetty로 실행해 보는 코스.

Jersey 로 rest api 구현하기

아래 처럼 maven 명령을 실행하면 프로젝트 구조를 만들어 준다.

1
mvn archetype:generate -DarchetypeArtifactId=jersey-quickstart-webapp -DarchetypeGroupId=org.glassfish.jersey.archetypes

아래와 같은 파일들이 만들어 지고 ..

1
2
3
4
5
rest/pom.xml
rest/src/main
rest/src/main/java/<your package>/MyResource.java
rest/src/main/webapp/index.jsp
rest/src/main/webapp/WEB-INF/web.xml

이 중 MyResource.java가 리소스 파일이 되는데, 리네임해서 Hello.java로 만들자. 여기에 @Path만 붙여주면 rest api 처리하는 리소스 클래스가 된다. @GET, @POST ,@PUT , @DELETE 어노테이션을 하는 것만으로 쉽게 속성을 수정할 수 있다. 더구나 @Produces 를 사용해서 POJO class를 xml 혹은 json으로 인코딩 해서 보낼 수 있다. 멋지지 않는가? (아래의 Model class는 POJO class이다. 즉, 중요하지 않다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Path("hello")
public class Hello {

static Model model;
static {
model = new Model("Darren");
model.setDesc("description");
}

@Path("/text")
@GET
public String getText(){
return model.toString();
}

@Path("/json")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getJson() {
return Response.ok().entity(model).build();
}

@Path("/xml")
@GET
@Produces(MediaType.APPLICATION_XML)
public Response getXml(){
return Response.ok().entity(model).build();
}

Jetty로 실행해 보기

앞단에서 rest api 구현은 해놨고 이것을 실행을 해야 하는데, tomcat등에 설치해서 하면 되지만 약간 무거운(?) 느낌이 있다. 그래서 light한 Jetty로 이 웹앱을 실행해 보자.

우선 jetty dependency를 pom.xml에 추가하고

1
2
3
4
5
<plugin>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>9.3.0.v20150612</version>
</plugin>

mvn jetty:run을 실행하면 localhost:8080으로 서버를 실행한다. 그러면 직접 rest api를 call 해 볼 수 있다. jetty:run이 무엇을 하는지 궁금하다면 여기 를 보자. 내가 요약해주면 각 기본 디렉토리(src/main/webapp, classes in outdir)에 있는 파일들을 jetty로 deploy 해준다. 특정 파일에 변경이 생기면 업데이트 해주는 센스까지..

그럼 아래 url들을 차례로 브라우저에서 접근해보자. 각각 text, json, xml 포맷으로 response가 온다면 제대로 동작하고 있는 것.

Reference

실제로 동작하는 프로젝트는 github에 공유되어 있으니 참고하시면 되겠다.

revision history

  • 2015/7/14 initial draft
  • 2015/7/17 split to 2 posts. this one + swagger

RDD transformation and actions

Spark에서는 data를 RDD로 abstract한다. RDD는 distributable 하고, 하나의 노드가 사고로 죽었을때에도 다시 생성해 낼수 있다. 캐쉬의 단위이기도 하고 파일에 쓸 수 도 있다. 스파크의 기본 데이터 스트럭쳐 되겠다. RDD는 쉽게 생각해서 row의 묶음 이라고 생각하면 된다. 각 row가 transformation의 대상이 되는 것이고.

이 RDD는 크게 2개의 종류가 있고. 이는 transformation과 actions이다. transformation는 바로 실행하지 않고 defer될 수 있고 action이 일어날때 lazy 하게 실행된다. action은 collect, count, take 등이고, 나머지는 모두 transformation이다. 보면 당장의 결과가 필요한 애들만 action이다. 그래서 transformation이 lazy하게 시행될 수 있는 것이다.

basic

map, flatMap

가장 많이 사용하는 map! 은 말 그대로 하나의 입력을 인자로 주어지는 function을 통해서 다른 입력값으로 map 한다.

아래 예제는 주어진 수를 제곱해주는 예제.

1
2
3
4
val rdd = sc.parallelize(List(1,2,3,4))
val squared = rdd.map( n => n*n).collect
--------------------------------------
squared: Array[Int] = Array(1, 4, 9, 16)

flatMap이 map과 다른 점은 interable한 값을 리턴하는 대신 그것을 flatten해서 변환한다는 것이다. 아래예제를 보면 split을 하게 되면 String array가 만들어지는데 이것을 개별로 리턴해준다. split vs split2 비교해보면 이해가 잘 될듯하다.

1
2
3
4
5
6
val rdd = sc.parallelize(List("a b","v x"))
val split = rdd.map( _.split(" ")).collect
val split2 = rdd.flatMap( n => n.split(" ")).collect
--------------------------------------
split: Array[Array[String]] = Array(Array(a, b), Array(v, x))
split2: Array[String] = Array(a, b, v, x)

filter

filter는 말그대로 true를 리턴하는 값만 가지고 나머지는 다 버리겠다는 것.

1 부터 10까지의 수중 짝수만 남았다

1
2
3
4
val rdd = sc.parallelize((1 to 10).toList)
val filtered = rdd.filter(_ %2==0).collect
--------------------------------------
filtered: Array[Int] = Array(2, 4, 6, 8, 10)

distinct, union, intersection, subtract

distinct는 중복값을 제거한후 리스트를 제공하고, union는 두 rdd의 합집합을, intersection은 교집합, subtract는 원집합에서 교집합을 제거 한다.

1
2
3
4
5
6
7
8
9
10
11
12
val a = sc.parallelize(List(1,1,2,3,3,4))
val b = sc.parallelize(List(3,4,5,5,6))

val distinct_a = a.distinct.collect
val union_ab = a.union(b).collect
val intersect_ab = a.intersection(b).collect
val subtract_ab = a.subtract(b).collect
-------------------------------------
distinct_a: Array[Int] = Array(1, 2, 3, 4)
union_ab: Array[Int] = Array(1, 1, 2, 3, 3, 4, 3, 4, 5, 5, 6)
intersect_ab: Array[Int] = Array(3, 4)
subtract_ab: Array[Int] = Array(1, 1, 2)

cartesian

두 rdd의 Cartesian product을 구한다. pair(a in A RDD,b in B RDD)로 나온다.

1
2
3
4
5
6
// cartesian
val users = sc.parallelize(List("darren", "andrew") )
val products = sc.parallelize(List("mac", "cellphone"))
val cartesian = users.cartesian(products).collect
-------------------------------------
cartesian: Array[(String, String)] = Array((darren,mac), (darren,cellphone), (andrew,mac), (andrew,cellphone))

sample

rdd중에서 주어진 확률값으로 샘플링한 rdd를 리턴한다.

1
2
3
4
val rdd = sc.parallelize(1 to 10, 1)
val sampled = rdd.sample(false, 0.2).collect
-------------------------------------
sampled: Array[Int] = Array(3, 9)

key/value pair

reduceByKey

key가 값은 값을 tuple(u,v) 로 넘겨주면 이것을 어떻게 reduce할지 변환 함수를 주면 된다. 아래 예에서 _+_ 는 그냥 두 값을 concatenate하라는 뜻이다.

1
2
3
4
5
6
val rdd = sc.parallelize(List((20,"darren"), (10,"andrew"), (10, "Joy")))
val reduced = rdd.reduceByKey(_+_).collect
val reduced1 = rdd.reduceByKey( (u,v) => (s"$u|$v")).collect
-------------------------------------
reduced: Array[(Int, String)] = Array((20,darren), (10,andrewJoy))
reduced1: Array[(Int, String)] = Array((20,darren), (10,andrew|Joy))

keyBy

각 값의 키를 만드는 함수를 제공해주면, RDD(K,V)를 리턴해준다.

1
2
3
4
val rdd = sc.parallelize(List("darren", "andrew", "Joy"))
val tuples = rdd.keyBy(_.length).collect
-------------------------------------
tuples: Array[(Int, String)] = Array((6,darren), (6,andrew), (3,Joy))

groupByKey

같은 키값을 가진 것끼리 묶어서 RDD[K, Interable[V]]를 리턴한다.

1
2
3
4
val rdd = sc.parallelize(List((20,"darren"), (10,"andrew"), (10, "Joy")))
val grouped = rdd.groupByKey.collect
-------------------------------------
grouped: Array[(Int, Iterable[String])] = Array((10,CompactBuffer(andrew, Joy)), (20,CompactBuffer(darren)))

join

(,) tuple 이어야만 가능한 rdd operation이다.
join은 교집합만
leftOuterJoin은 left 원본 + 교집합 머지
rightOuterJoin은 right원본 + 교집합 머지
fullOuterJoin은 left/right + 교집합 머지

1
2
3
4
5
6
7
8
9
10
11
12
val t=sc.parallelize(Array(("darren", "1"), ("andrew", "v2")))
val t2 = sc.parallelize(Array( ("darren","33") ))

val joined = t.join(r2).collect
val leftJoined = t.leftOuterJoin(r2).collect
val rightJoined = t.rightOuterJoin(r2).collect
val fullJoined = t.fullOuterJoin(r2).collect
-----------------------------------------------
joined: Array[(String, (String, String))] = Array((darren,(1,33)))
leftJoined: Array[(String, (String, Option[String]))] = Array((andrew,(v2,None)), (darren,(1,Some(33))))
rightJoined: Array[(String, (Option[String], String))] = Array((darren,(Some(1),33)))
fullJoined: Array[(String, (Option[String], Option[String]))] = Array((andrew,(Some(v2),None)), (darren,(Some(1),Some(33))))

revision history

  • 2015/7/2 initial draft
  • 2015/7/5 distinct/union/intersection/subtract added
  • 2015/7/21 sample,reduceByKey, keyBy, groupByKey added
  • 2016/1/25 join, leftOuterJoin, rightOuterJoin added