Inertia

Parquet + Avro + Spark

데이터 분석을 위해 파일을 저장해야 할 필요가 있었다. 처음에는 csv파일 형식으로 저장을 했는데, 시간이 지남에 따라서 새로운 컬럼이 생기는 요구사항이 생겼다. 이런 경우 csv는 어떤 정보가 몇번째 컬럼에 있는지를 기술하지 않기 때문에 또 다른 파일에 컬럼 정보를 기록하고 데이터 타입등도 기술을 해줘야 하는 불편함이 생긴다.
언뜻 parquet이 그런 일을 하는것이라 어렴풋이 알고 있었기 때문에 이번에 parquet을 적용해 볼겸 조사를 해봤다.

특징들..

  • 압축 지원. 50% 정도 세이브 할 수 있다고 함. 스토리지 비용이 반이라는 얘기.
  • 여러가지 serialize framework 지원 (Avro, Thrift, protocol buffer)

column based

columnar storage다. 이렇게 접근한 이유는 크게 2가지.

보통 쿼리를 할때 모든 컬럼의 정보가 다 필요한 경우는 많지 않다. row based 라면 전체 열을 다 읽어야 쿼리를 수행할 수 있지만, parquet는 필요한 컬럼만 로드하면 된다. 여기서 속도 향샹이 생긴다.

그리고 컬럼끼리 모아서 압축을 하면 압축률이 더 좋아진다. timestamp를 가지는 컬럼이라고 생각하면 델터 인코딩 방식으로 압축을 하면 좋을 것이고, 각 컬럼의 특징이 살아 있으니 더 유리하다.

2G의 원본 csv를 SNAPPY압축을 이용해서 저장하니 1G로 50%정도나 줄어들었다.

그리고 query performance도 일반 텍스트에 비해서 2배 정도 성능이 좋다고 한다.

단점

마냥 좋기만 한것은 아니다. 분석용으로 최고의 파일 형식이라고 한 것은 데이터의 업데이트가 없다는 뜻이다. 즉, readonly일때 좋은 것이다.

환경 설정

spark + parquet + avro 를 사용하려면 다음과 같은 디펜던시가 필요하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<properties>
<scala.version>2.11.4</scala.version>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<parquet.version>1.8.1</parquet.version>
<avro.version>1.7.7</avro.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
</dependencies>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

avro schema define

자세한 사항은 avro spec을 보면 되고, 아래처럼 정의 하면 된다. 이 파일을 avro-tools.jar를 이용하면 POJO class를 만들 수 있고 이 파일을 이용하면 프로그래밍이 조금더 이뻐질 수 있다. 아래 read/save 예제에서 User class를 사용하는데 이것이 스키마를 바탕으로 생성된 클래스이다. 필수는 아니고 POJO class가 없을때는 GenericRecord를 사용할 수 도 있다.

1
2
3
4
5
6
7
8
9
10
{
"namespace": "com.nberserk.example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "age", "type": "int"},
{"name": "weight", "type":"float"}
]
}

parquet save/read in java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Schema schema = new Schema.Parser().parse(new File("src/test/avro/user.avro"));
File tmp = new File("test.parquet");
Path path = new Path(tmp.getPath());

ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

// Write a record with GenericRecord
GenericRecord r = new GenericData.Record(schema);
r.put("uid", "darren");
r.put("age", 22);
r.put("weight", 70.0);
writer.write(r);
writer.close();

/*
// write a record using generated POJO class called User
ParquetWriter<User> writer = AvroParquetWriter.<User>builder(path)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withSchema(schema)
.build();

User p = new Profile();
p.setId("darren");
p.setAge(22);
p.setWeight(70.0);
writer.write(p);
writer.close();
*/
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, Profile.SCHEMA$);
ParquetReader<Profile> reader = AvroParquetReader.<Profile>builder(path)
.withConf(conf)
.build();
Profile p1 = reader.read();

assertEquals("darren", p1.getUid().toString());
assertEquals(22, p1.getAge());
assertEquals(77.0, p1.getWeight());

Spark에서 parquet 읽기

spark 1.5.0 미만에서는 [[https://issues.apache.org/jira/browse/SPARK-6566][parquet로드시 익셉션을 내는 버그가 있었는데]] 그때는 스파크 버전을 1.5.0 이상으로 올리면 해결 된다.

1
2
val p = sqlContext.parquetFile("s3://test.parquet")
val multipleParquet = sqlContext.parquetFile("s3://p1", "s3://p2")

schema merge

스키마가 다른 여러 parquet 파일을 로드할때는 스키마 머지가 필요한데, 스파크 1.5.0 부터는 디폴트로 옵션이 꺼져 있다. 그래서 교집합?만 로드가 된다.
아래처럼 mergeSchema 옵션을 켜고 하면 합집합(?)으로 로드 된다.

1
val p = sqlContext.read.option("mergeSchema", "true").parquetFile("s3://v1.parquet", "s3://v2.parquet")

avro schema evolution

avro를 사용하면 좋은 점은 스키마 변화가 있을때 유연하게 대처할 수 있는 점인데, 아래의 경우를 생각해보자.

Profile은 name필드만 있었는데, v2에서 “create_time”, “newStringNullDefault”, “union” 등이 추가 되었다. create_time, newStringNullDefault은 default값이 있고, union(둘중 하나)은 union으로 선언되어졌다. 이때 Profile 스키마로 저장된 parquet파일을 Profile2로 읽으면 어떻게 될까?

default 값을 가진 필드는 디폴트 값이 채워지고, union들은 생략된다. 아래 예제 참고.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/*

// Profile avro spec
{"namespace": "com.nberserk.avro",
"type": "record",
"name": "Profile",
"fields": [
{"name": "name", "type": "string"}
]
}

// Profile2 avro spec
{"namespace": "com.nberserk.avro",
"type": "record",
"name": "Profile2",
"fields": [
{"name": "name", "type": "string"}
{"name": "create_time", "type":"long", "default":0},
{"name": "union", "type": ["null", "string"]},
{"name": "value_default", "type": "string", "default": "null"}
]
}
,*/
Schema schema = new Schema.Parser().parse(new File("src/test/avro/profile.avro"));

File tmp = new File("test2.tmp");
tmp.deleteOnExit();
tmp.delete();

Path file = new Path(tmp.getPath());
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(file)
.withSchema(schema)
.build();

// Write a record with a null value
GenericRecord r = new GenericData.Record(schema);
for (int i = 0; i < 100; i++) {
r.put("name", "darren");
writer.write(r);
}
writer.close();


// Let's load the same file with v2 schema
Schema v2 = new Schema.Parser().parse(new File("src/test/avro/profile2.avro"));
Configuration conf = new Configuration();
AvroReadSupport.setAvroReadSchema(conf, v2);

ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file)
.withConf(conf).build();
GenericRecord gr = reader.read();

assertEquals("darren", gr.get("name").toString());
assertEquals(0L, gr.get("create_time"));
assertEquals("null", gr.get("newStringNullDefault").toString());

performance comparison

원본 사이즈, txt로는 20G, parquet로는 6.5G 정도임 row수는 1억1천 row정도의 데이터.

file size, 20G 파일이 parquet으로 저장하니 6.5G 정도로 작아짐. 50% 이상.
query, 특정 값을 가진 사용자들의 수를 쿼리하는 것을 각각 측정해보니 txt의 경우는 186초. parquet의 경우는 105초 정도가 소요됨.

따라서, 스토리지 용량/ 쿼리 속도 면에서 모두 만족할 만한 결과를 보임.

trouble shooting

writing parquet files to s3 is too slow

csv에서 parquet파일로 바꾸고 난후에 S3에 write 할때 시간이 너무 오래 걸려서 분석해보니 write.parquet()의 시간이 너무 오래 걸렸다. 검색을 해보니 [[http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration][spark.sql.parquet.output.committer.class 을 org.apache.spark.sql.parquet.DirectParquetOutputCommitter 로 설정]]하면 괜찮아진다는 얘기가 있다. 수정해보니 효과가 있었다.

1
2
//sc : SparkContext
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class", "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

revision history

  • 2015/12/10 initial draft
  • 2015/12/18 add schema evolution
  • trouble shooting/s3 slow added