Spark에서는 data를 RDD로 abstract한다. RDD는 distributable 하고, 하나의 노드가 사고로 죽었을때에도 다시 생성해 낼수 있다. 캐쉬의 단위이기도 하고 파일에 쓸 수 도 있다. 스파크의 기본 데이터 스트럭쳐 되겠다. RDD는 쉽게 생각해서 row의 묶음 이라고 생각하면 된다. 각 row가 transformation의 대상이 되는 것이고.
이 RDD는 크게 2개의 종류가 있고. 이는 transformation과 actions이다. transformation는 바로 실행하지 않고 defer될 수 있고 action이 일어날때 lazy 하게 실행된다. action은 collect, count, take 등이고, 나머지는 모두 transformation이다. 보면 당장의 결과가 필요한 애들만 action이다. 그래서 transformation이 lazy하게 시행될 수 있는 것이다.
basic
map, flatMap
가장 많이 사용하는 map! 은 말 그대로 하나의 입력을 인자로 주어지는 function을 통해서 다른 입력값으로 map 한다.
아래 예제는 주어진 수를 제곱해주는 예제.
1 2 3 4
val rdd = sc.parallelize(List(1,2,3,4)) val squared = rdd.map( n => n*n).collect -------------------------------------- squared: Array[Int] = Array(1, 4, 9, 16)
flatMap이 map과 다른 점은 interable한 값을 리턴하는 대신 그것을 flatten해서 변환한다는 것이다. 아래예제를 보면 split을 하게 되면 String array가 만들어지는데 이것을 개별로 리턴해준다. split vs split2 비교해보면 이해가 잘 될듯하다.
1 2 3 4 5 6
val rdd = sc.parallelize(List("a b","v x")) val split = rdd.map( _.split(" ")).collect val split2 = rdd.flatMap( n => n.split(" ")).collect -------------------------------------- split: Array[Array[String]] = Array(Array(a, b), Array(v, x)) split2: Array[String] = Array(a, b, v, x)
filter
filter는 말그대로 true를 리턴하는 값만 가지고 나머지는 다 버리겠다는 것.
1 부터 10까지의 수중 짝수만 남았다
1 2 3 4
val rdd = sc.parallelize((1 to 10).toList) val filtered = rdd.filter(_ %2==0).collect -------------------------------------- filtered: Array[Int] = Array(2, 4, 6, 8, 10)
distinct, union, intersection, subtract
distinct는 중복값을 제거한후 리스트를 제공하고, union는 두 rdd의 합집합을, intersection은 교집합, subtract는 원집합에서 교집합을 제거 한다.
1 2 3 4 5 6 7 8 9 10 11 12
val a = sc.parallelize(List(1,1,2,3,3,4)) val b = sc.parallelize(List(3,4,5,5,6))