아파치 스파크 : map vs mapPartitions?
RDD map
와 mapPartitions
방법 의 차이점은 무엇입니까 ? 그리고 flatMap
좋아 map
하거나 좋아 mapPartitions
합니까? 감사.
(편집) 즉, 의미 적으로 또는 실행 측면에서 차이점은 무엇입니까?
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
과:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
RDD의 map과 mapPartitions 메소드의 차이점은 무엇입니까?
메소드 맵 은 함수를 적용하여 소스 RDD의 각 요소 를 결과 RDD의 단일 요소로 변환 합니다. mapPartitions 는 소스 RDD의 각 파티션 을 결과의 여러 요소 (아마도 없음)로 변환합니다.
그리고 flatMap은 map 또는 mapPartitions처럼 동작합니까?
flatMap 은 단일 요소 (as map
) 에서 작동 하지 않으며 결과의 여러 요소 (as )를 생성합니다 mapPartitions
.
꼬마 도깨비. 팁 :
RDD
요소 당 한 번이 아니라 여러 요소에 대해 한 번만 수행해야하는 강력한 초기화가있을 때RDD
, 타사 라이브러리에서 객체 생성과 같은이 초기화를 직렬화 할 수없는 경우 (Spark에서 클러스터로 전송할 수 있도록) 작업자 노드)mapPartitions()
대신을 사용하십시오map()
. 예 를 들어 데이터 요소mapPartitions()
당 한 번이 아닌 작업자 작업 / 스레드 / 파티션 당 한 번 초기화를 수행 할 수 있습니다 ( 예 : 아래 참조).RDD
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2. 않는
flatMap
행동하라지도처럼 나처럼mapPartitions
?
예. flatmap
.. 자체 설명 의 예 2를 참조하십시오 .
Q1. RDD의 차이 무엇
map
과mapPartitions
map
mapPartitions
파티션 레벨에서 기능 을 수행하는 동안 요소 별 레벨에서 사용되는 기능을 작동시킵니다 .
시나리오 예 : 특정RDD
파티션에 100K 요소가있는경우 사용할 때 매핑 변환에 사용되는 함수를 100K 회 실행합니다map
.
반대로 사용 mapPartitions
하면 특정 함수를 한 번만 호출하지만 모든 100K 레코드를 전달하고 한 번의 함수 호출로 모든 응답을 다시 가져옵니다.
이후 성능 향상이있을 것입니다 map
함수가 뭔가 우리가 (의 경우 한 번에 모든 요소에 전달 된 경우가 수행 할 필요가 없습니다 것이라고 비싼마다하고있다 특히, 특정 기능을 너무 여러 번에 작품 mappartitions
).
지도
RDD의 각 항목에 변환 함수를 적용하고 결과를 새 RDD로 리턴합니다.
변형 나열
데프 맵 [U : ClassTag] (f : T => U) : RDD [U]
예 :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose.
preservesPartitioning
indicates whether the input function preserves the partitioner, which should befalse
unless this is a pair RDD and the input function doesn't modify the keys.Listing Variants
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
Example 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Example 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
The above program can also be written using flatMap as follows.
Example 2 using flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Conclusion :
mapPartitions
transformation is faster than map
since it calls your function once/partition, not once/element..
Further reading : foreach Vs foreachPartitions When to use What?
Map :
- It processes one row at a time , very similar to map() method of MapReduce.
- You return from the transformation after every row.
MapPartitions
- It processes the complete partition in one go.
- You can return from the function only once after processing the whole partition.
- All intermediate results needs to be held in memory till you process the whole partition.
- Provides you like setup() map() and cleanup() function of MapReduce
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/
참고URL : https://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions
'Programming' 카테고리의 다른 글
CRC가 MD5 / SHA1보다 사용하기에 더 적합한시기는 언제입니까? (0) | 2020.07.18 |
---|---|
jQuery가 요소를 찾지 못했는지 확인 (0) | 2020.07.18 |
Windows 배치 스크립트에서“@”의 의미 (0) | 2020.07.18 |
JavaScript에서 REST 웹 서비스 API를 호출하는 방법은 무엇입니까? (0) | 2020.07.18 |
Objective-C에서 객체를 캐스팅하는 방법 (0) | 2020.07.18 |