Programming

아파치 스파크 : map vs mapPartitions?

procodes 2020. 7. 18. 23:06
반응형

아파치 스파크 : map vs mapPartitions?


RDD mapmapPartitions방법 차이점은 무엇입니까 ? 그리고 flatMap좋아 map하거나 좋아 mapPartitions합니까? 감사.

(편집) 즉, 의미 적으로 또는 실행 측면에서 차이점은 무엇입니까?

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

과:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }

RDD의 map과 mapPartitions 메소드의 차이점은 무엇입니까?

메소드 은 함수를 적용하여 소스 RDD의 요소 를 결과 RDD의 단일 요소로 변환 합니다. mapPartitions소스 RDD의 파티션 을 결과의 여러 요소 (아마도 없음)로 변환합니다.

그리고 flatMap은 map 또는 mapPartitions처럼 동작합니까?

flatMap 은 단일 요소 (as map) 에서 작동 하지 않으며 결과의 여러 요소 (as )를 생성합니다 mapPartitions.


꼬마 도깨비. 팁 :

RDD요소 당 한 번이 아니라 여러 요소에 대해 한 번만 수행해야하는 강력한 초기화가있을 때 RDD, 타사 라이브러리에서 객체 생성과 같은이 초기화를 직렬화 할 수없는 경우 (Spark에서 클러스터로 전송할 수 있도록) 작업자 노드) mapPartitions()대신을 사용하십시오 map(). 들어 데이터 요소 mapPartitions()당 한 번이 아닌 작업자 작업 / 스레드 / 파티션 당 한 번 초기화를 수행 할 수 있습니다 ( 예 : 아래 참조).RDD

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. 않는 flatMap행동하라지도처럼 나처럼 mapPartitions?

예. flatmap.. 자체 설명 의 예 2를 참조하십시오 .

Q1. RDD의 차이 무엇 mapmapPartitions

mapmapPartitions파티션 레벨에서 기능 수행하는 동안 요소 별 레벨에서 사용되는 기능을 작동시킵니다 .

시나리오 예 : 특정RDD파티션에 100K 요소가있는경우 사용할 때 매핑 변환에 사용되는 함수를 100K 회 실행합니다map.

반대로 사용 mapPartitions하면 특정 함수를 한 번만 호출하지만 모든 100K 레코드를 전달하고 한 번의 함수 호출로 모든 응답을 다시 가져옵니다.

이후 성능 향상이있을 것입니다 map함수가 뭔가 우리가 (의 경우 한 번에 모든 요소에 전달 된 경우가 수행 할 필요가 없습니다 것이라고 비싼마다하고있다 특히, 특정 기능을 너무 여러 번에 작품 mappartitions).

지도

RDD의 각 항목에 변환 함수를 적용하고 결과를 새 RDD로 리턴합니다.

변형 나열

데프 맵 [U : ClassTag] (f : T => U) : RDD [U]

예 :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.

Listing Variants

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

Example 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Example 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

The above program can also be written using flatMap as follows.

Example 2 using flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusion :

mapPartitions transformation is faster than map since it calls your function once/partition, not once/element..

Further reading : foreach Vs foreachPartitions When to use What?


Map :

  1. It processes one row at a time , very similar to map() method of MapReduce.
  2. You return from the transformation after every row.

MapPartitions

  1. It processes the complete partition in one go.
  2. You can return from the function only once after processing the whole partition.
  3. All intermediate results needs to be held in memory till you process the whole partition.
  4. Provides you like setup() map() and cleanup() function of MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

참고URL : https://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions

반응형