Inertia

RDD transformation and actions

Spark에서는 data를 RDD로 abstract한다. RDD는 distributable 하고, 하나의 노드가 사고로 죽었을때에도 다시 생성해 낼수 있다. 캐쉬의 단위이기도 하고 파일에 쓸 수 도 있다. 스파크의 기본 데이터 스트럭쳐 되겠다. RDD는 쉽게 생각해서 row의 묶음 이라고 생각하면 된다. 각 row가 transformation의 대상이 되는 것이고.

이 RDD는 크게 2개의 종류가 있고. 이는 transformation과 actions이다. transformation는 바로 실행하지 않고 defer될 수 있고 action이 일어날때 lazy 하게 실행된다. action은 collect, count, take 등이고, 나머지는 모두 transformation이다. 보면 당장의 결과가 필요한 애들만 action이다. 그래서 transformation이 lazy하게 시행될 수 있는 것이다.

basic

map, flatMap

가장 많이 사용하는 map! 은 말 그대로 하나의 입력을 인자로 주어지는 function을 통해서 다른 입력값으로 map 한다.

아래 예제는 주어진 수를 제곱해주는 예제.

1
2
3
4
val rdd = sc.parallelize(List(1,2,3,4))
val squared = rdd.map( n => n*n).collect
--------------------------------------
squared: Array[Int] = Array(1, 4, 9, 16)

flatMap이 map과 다른 점은 interable한 값을 리턴하는 대신 그것을 flatten해서 변환한다는 것이다. 아래예제를 보면 split을 하게 되면 String array가 만들어지는데 이것을 개별로 리턴해준다. split vs split2 비교해보면 이해가 잘 될듯하다.

1
2
3
4
5
6
val rdd = sc.parallelize(List("a b","v x"))
val split = rdd.map( _.split(" ")).collect
val split2 = rdd.flatMap( n => n.split(" ")).collect
--------------------------------------
split: Array[Array[String]] = Array(Array(a, b), Array(v, x))
split2: Array[String] = Array(a, b, v, x)

filter

filter는 말그대로 true를 리턴하는 값만 가지고 나머지는 다 버리겠다는 것.

1 부터 10까지의 수중 짝수만 남았다

1
2
3
4
val rdd = sc.parallelize((1 to 10).toList)
val filtered = rdd.filter(_ %2==0).collect
--------------------------------------
filtered: Array[Int] = Array(2, 4, 6, 8, 10)

distinct, union, intersection, subtract

distinct는 중복값을 제거한후 리스트를 제공하고, union는 두 rdd의 합집합을, intersection은 교집합, subtract는 원집합에서 교집합을 제거 한다.

1
2
3
4
5
6
7
8
9
10
11
12
val a = sc.parallelize(List(1,1,2,3,3,4))
val b = sc.parallelize(List(3,4,5,5,6))

val distinct_a = a.distinct.collect
val union_ab = a.union(b).collect
val intersect_ab = a.intersection(b).collect
val subtract_ab = a.subtract(b).collect
-------------------------------------
distinct_a: Array[Int] = Array(1, 2, 3, 4)
union_ab: Array[Int] = Array(1, 1, 2, 3, 3, 4, 3, 4, 5, 5, 6)
intersect_ab: Array[Int] = Array(3, 4)
subtract_ab: Array[Int] = Array(1, 1, 2)

cartesian

두 rdd의 Cartesian product을 구한다. pair(a in A RDD,b in B RDD)로 나온다.

1
2
3
4
5
6
// cartesian
val users = sc.parallelize(List("darren", "andrew") )
val products = sc.parallelize(List("mac", "cellphone"))
val cartesian = users.cartesian(products).collect
-------------------------------------
cartesian: Array[(String, String)] = Array((darren,mac), (darren,cellphone), (andrew,mac), (andrew,cellphone))

sample

rdd중에서 주어진 확률값으로 샘플링한 rdd를 리턴한다.

1
2
3
4
val rdd = sc.parallelize(1 to 10, 1)
val sampled = rdd.sample(false, 0.2).collect
-------------------------------------
sampled: Array[Int] = Array(3, 9)

key/value pair

reduceByKey

key가 값은 값을 tuple(u,v) 로 넘겨주면 이것을 어떻게 reduce할지 변환 함수를 주면 된다. 아래 예에서 _+_ 는 그냥 두 값을 concatenate하라는 뜻이다.

1
2
3
4
5
6
val rdd = sc.parallelize(List((20,"darren"), (10,"andrew"), (10, "Joy")))
val reduced = rdd.reduceByKey(_+_).collect
val reduced1 = rdd.reduceByKey( (u,v) => (s"$u|$v")).collect
-------------------------------------
reduced: Array[(Int, String)] = Array((20,darren), (10,andrewJoy))
reduced1: Array[(Int, String)] = Array((20,darren), (10,andrew|Joy))

keyBy

각 값의 키를 만드는 함수를 제공해주면, RDD(K,V)를 리턴해준다.

1
2
3
4
val rdd = sc.parallelize(List("darren", "andrew", "Joy"))
val tuples = rdd.keyBy(_.length).collect
-------------------------------------
tuples: Array[(Int, String)] = Array((6,darren), (6,andrew), (3,Joy))

groupByKey

같은 키값을 가진 것끼리 묶어서 RDD[K, Interable[V]]를 리턴한다.

1
2
3
4
val rdd = sc.parallelize(List((20,"darren"), (10,"andrew"), (10, "Joy")))
val grouped = rdd.groupByKey.collect
-------------------------------------
grouped: Array[(Int, Iterable[String])] = Array((10,CompactBuffer(andrew, Joy)), (20,CompactBuffer(darren)))

join

(,) tuple 이어야만 가능한 rdd operation이다.
join은 교집합만
leftOuterJoin은 left 원본 + 교집합 머지
rightOuterJoin은 right원본 + 교집합 머지
fullOuterJoin은 left/right + 교집합 머지

1
2
3
4
5
6
7
8
9
10
11
12
val t=sc.parallelize(Array(("darren", "1"), ("andrew", "v2")))
val t2 = sc.parallelize(Array( ("darren","33") ))

val joined = t.join(r2).collect
val leftJoined = t.leftOuterJoin(r2).collect
val rightJoined = t.rightOuterJoin(r2).collect
val fullJoined = t.fullOuterJoin(r2).collect
-----------------------------------------------
joined: Array[(String, (String, String))] = Array((darren,(1,33)))
leftJoined: Array[(String, (String, Option[String]))] = Array((andrew,(v2,None)), (darren,(1,Some(33))))
rightJoined: Array[(String, (Option[String], String))] = Array((darren,(Some(1),33)))
fullJoined: Array[(String, (Option[String], Option[String]))] = Array((andrew,(Some(v2),None)), (darren,(Some(1),Some(33))))

revision history

  • 2015/7/2 initial draft
  • 2015/7/5 distinct/union/intersection/subtract added
  • 2015/7/21 sample,reduceByKey, keyBy, groupByKey added
  • 2016/1/25 join, leftOuterJoin, rightOuterJoin added