데이터 분석을 위해 파일을 저장해야 할 필요가 있었다. 처음에는 csv파일 형식으로 저장을 했는데, 시간이 지남에 따라서 새로운 컬럼이 생기는 요구사항이 생겼다. 이런 경우 csv는 어떤 정보가 몇번째 컬럼에 있는지를 기술하지 않기 때문에 또 다른 파일에 컬럼 정보를 기록하고 데이터 타입등도 기술을 해줘야 하는 불편함이 생긴다. 언뜻 parquet이 그런 일을 하는것이라 어렴풋이 알고 있었기 때문에 이번에 parquet을 적용해 볼겸 조사를 해봤다.
특징들..
압축 지원. 50% 정도 세이브 할 수 있다고 함. 스토리지 비용이 반이라는 얘기.
여러가지 serialize framework 지원 (Avro, Thrift, protocol buffer)
column based
columnar storage다. 이렇게 접근한 이유는 크게 2가지.
보통 쿼리를 할때 모든 컬럼의 정보가 다 필요한 경우는 많지 않다. row based 라면 전체 열을 다 읽어야 쿼리를 수행할 수 있지만, parquet는 필요한 컬럼만 로드하면 된다. 여기서 속도 향샹이 생긴다.
그리고 컬럼끼리 모아서 압축을 하면 압축률이 더 좋아진다. timestamp를 가지는 컬럼이라고 생각하면 델터 인코딩 방식으로 압축을 하면 좋을 것이고, 각 컬럼의 특징이 살아 있으니 더 유리하다.
2G의 원본 csv를 SNAPPY압축을 이용해서 저장하니 1G로 50%정도나 줄어들었다.
그리고 query performance도 일반 텍스트에 비해서 2배 정도 성능이 좋다고 한다.
단점
마냥 좋기만 한것은 아니다. 분석용으로 최고의 파일 형식이라고 한 것은 데이터의 업데이트가 없다는 뜻이다. 즉, readonly일때 좋은 것이다.
자세한 사항은 avro spec을 보면 되고, 아래처럼 정의 하면 된다. 이 파일을 avro-tools.jar를 이용하면 POJO class를 만들 수 있고 이 파일을 이용하면 프로그래밍이 조금더 이뻐질 수 있다. 아래 read/save 예제에서 User class를 사용하는데 이것이 스키마를 바탕으로 생성된 클래스이다. 필수는 아니고 POJO class가 없을때는 GenericRecord를 사용할 수 도 있다.
// Write a record with GenericRecord GenericRecord r = new GenericData.Record(schema); r.put("uid", "darren"); r.put("age", 22); r.put("weight", 70.0); writer.write(r); writer.close();
/* // write a record using generated POJO class called User ParquetWriter<User> writer = AvroParquetWriter.<User>builder(path) .withCompressionCodec(CompressionCodecName.SNAPPY) .withSchema(schema) .build(); User p = new Profile(); p.setId("darren"); p.setAge(22); p.setWeight(70.0); writer.write(p); writer.close(); */ Configuration conf = new Configuration(); AvroReadSupport.setAvroReadSchema(conf, Profile.SCHEMA$); ParquetReader<Profile> reader = AvroParquetReader.<Profile>builder(path) .withConf(conf) .build(); Profile p1 = reader.read();
val p = sqlContext.parquetFile("s3://test.parquet") val multipleParquet = sqlContext.parquetFile("s3://p1", "s3://p2")
schema merge
스키마가 다른 여러 parquet 파일을 로드할때는 스키마 머지가 필요한데, 스파크 1.5.0 부터는 디폴트로 옵션이 꺼져 있다. 그래서 교집합?만 로드가 된다. 아래처럼 mergeSchema 옵션을 켜고 하면 합집합(?)으로 로드 된다.
1
val p = sqlContext.read.option("mergeSchema", "true").parquetFile("s3://v1.parquet", "s3://v2.parquet")
avro schema evolution
avro를 사용하면 좋은 점은 스키마 변화가 있을때 유연하게 대처할 수 있는 점인데, 아래의 경우를 생각해보자.
Profile은 name필드만 있었는데, v2에서 “create_time”, “newStringNullDefault”, “union” 등이 추가 되었다. create_time, newStringNullDefault은 default값이 있고, union(둘중 하나)은 union으로 선언되어졌다. 이때 Profile 스키마로 저장된 parquet파일을 Profile2로 읽으면 어떻게 될까?
default 값을 가진 필드는 디폴트 값이 채워지고, union들은 생략된다. 아래 예제 참고.
// Write a record with a null value GenericRecord r = new GenericData.Record(schema); for (int i = 0; i < 100; i++) { r.put("name", "darren"); writer.write(r); } writer.close();
// Let's load the same file with v2 schema Schema v2 = new Schema.Parser().parse(new File("src/test/avro/profile2.avro")); Configuration conf = new Configuration(); AvroReadSupport.setAvroReadSchema(conf, v2);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file) .withConf(conf).build(); GenericRecord gr = reader.read();