# (一)概述 {#一-概述}
算子从功能上可以分为Transformations转换算子和Action行动算子。转换算子用来做数据的转换操作,比如map、flatMap、reduceByKey等都是转换算子,这类算子通过懒加载执行。行动算子的作用是触发执行,比如foreach、collect、count等都是行动算子,只有程序运行到行动算子时,转换算子才会去执行。
本文将介绍开发过程中常用的转换算子和行动算子,Spark代码基于Java编写,前置代码如下:
public class SparkTransformationTest {
public static void main(String[] args) {
// 前置准备
SparkConf conf = new SparkConf();
conf.setMaster("local[1]");
conf.setAppName("SPARK ES");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
}
}
1
2
3
4
5
6
7
8
9
10
# (二)转换算子 {#二-转换算子}
# map {#map}
map(func):通过函数func传递的每个元素,返回一个新的RDD。
JavaRDD<Object> map = javaRdd.map((Function<String, Object>)
item -> "new" + item);
map.foreach(x -> System.out.println(x));
1
2
3
返回一个新的RDD,数据是newa、newb、newc、newd、newe
# filter {#filter}
filter(func):筛选通过func处理后返回 true 的元素,返回一个新的RDD。
JavaRDD<String> filter = javaRdd.filter(item -> item.equals("a") || item.equals("b"));
filter.foreach(x -> System.out.println(x));
1
2
返回的新RDD数据是a和b。
# flatMap {#flatmap}
flatMap(func):类似于 map,但每个输入项可以映射到 0 个或更多输出项。
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a,b", "c,d,e", "f,g"));
JavaRDD<String> flatMap = javaRdd.flatMap((FlatMapFunction<String, String>)
s -> Arrays.asList(s.split(",")).iterator());
flatMap.foreach(x -> System.out.println(x));
1
2
3
4
入参只有3个,经过flatMap映射后返回了长度为7的RDD。
# mapPartitions {#mappartitions}
mapPartitions(func):类似于map,但该函数是在RDD每个partition上单独运行,因此入参会是Iterator<Object>
。
JavaRDD<String> mapPartitions = javaRdd.mapPartitions((FlatMapFunction<Iterator<String>, String>) stringIterator -> {
ArrayList<String> list = new ArrayList<>();
while (stringIterator.hasNext()) {
list.add(stringIterator.next());
}
return list.iterator();
});
mapPartitions.foreach(x -> System.out.println(x));
1
2
3
4
5
6
7
8
除了是对Iterator 进行处理之外,其他的都和map一样。
# union {#union}
union(otherDataset):返回一个新数据集,包含两个数据集合的并集。
JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("1", "2", "3", "4", "5"));
JavaRDD<String> unionRdd = javaRdd.union(newJavaRdd);
unionRdd.foreach(x-> System.out.println(x));
1
2
3
# intersection {#intersection}
intersection(otherDataset):返回一个新数据集,包含两个数据集合的交集。
JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("a", "b", "3", "4", "5"));
JavaRDD<String> intersectionRdd = javaRdd.intersection(newJavaRdd);
intersectionRdd.foreach(x-> System.out.println(x));
1
2
3
# groupByKey {#groupbykey}
groupByKey ([ numPartitions ]):在 (K, V) 对的数据集上调用时,返回 (K, Iterable ) 对的数据集,可以传递一个可选numPartitions参数来设置不同数量的任务。
这里需要了解Java中的另外一种RDD,JavaPairRDD。JavaPairRDD是一种key-value类型的RDD,groupByKey就是针对JavaPairRDD的API。
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
String[] split = s.split(":");
return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupByKey = javaPairRDD.groupByKey();
groupByKey.foreach(x -> System.out.println(x._1()+x._2()));
1
2
3
4
5
6
7
最终输出结果:
a[1, 2]
b[1]
c[3]
1
2
3
# reduceByKey {#reducebykey}
reduceByKey(func, [numPartitions]):在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合。和groupByKey不同的地方在于reduceByKey对value进行了聚合处理。
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
String[] split = s.split(":");
return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> reduceRdd = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
reduceRdd.foreach(x -> System.out.println(x._1()+x._2()));
1
2
3
4
5
6
7
最终输出结果:
a3
b1
c3
1
2
3
# aggregateByKey {#aggregatebykey}
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):aggregateByKey这个算子相比上面这些会复杂很多,主要参数有zeroValue、seqOp、combOp,numPartitions可选。
zeroValue是该算子设置的初始值,seqOp函数是将rdd中的value值和zeroValue进行处理,combOp是将相同key的数据进行处理。
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
String[] split = s.split(":");
return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> aggregateRdd = javaPairRDD.aggregateByKey(1,
// seqOp函数中的第一个入参是 zeroValue,第二个入参是rdd的value,这里对所有的value+1
(Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2,
// combOp函数是对同一个key的value进行处理,这里对相同key的value进行相加
(Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
aggregateRdd.foreach(x -> System.out.println(x._1()+":"+x._2()));
1
2
3
4
5
6
7
8
9
10
11
最终输出结果如下:
a:4
b:2
c:4
1
2
3
# (三)行动算子 {#三-行动算子}
# reduce {#reduce}
reduce(func):使用函数func聚合数据集的元素(它接受两个参数并返回一个)。 下面这段代码对所有rdd进行相加:
String reduce = javaRdd.reduce((Function2<String, String, String>) (v1, v2) -> {
System.out.println(v1 + ":" + v2);
return v1+v2;
});
System.out.println("result:"+reduce);
1
2
3
4
5
最终结果如下,从结果可以看出,每次对v1都是上一次reduce运行之后的结果:
a:b
ab:c
abc:d
abcd:e
result:abcde
1
2
3
4
5
# collect() {#collect}
collect():将driver中的所有元素数据通过集合的方式返回,适合小数据量的场景,大数据量会导致内存溢出。
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> collect = javaRdd.collect();
1
2
# count() {#count}
count():返回一个RDD中元素的数量。
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
long count = javaRdd.count();
1
2
# first() {#first}
first():返回第一个元素。
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
String first = javaRdd.first();
1
2
# take {#take}
take(n):返回前N个元素,take(1)=first()。
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.take(3);
1
2
# takeOrdered {#takeordered}
takeOrdered(n, [ordering]):返回自然排序的前N个元素,或者指定排序方法后的前N个元素。首先写一个排序类。
public class MyComparator implements Comparator<String>, Serializable {
@Override
public int compare(String o1, String o2) {
return o2.compareTo(o1);
}
}
1
2
3
4
5
6
接着是调用方式:
JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.takeOrdered(3, new MyComparator());
1
2
# foreach {#foreach}
foreach(func):该函数对数据集的每个RDD运行func函数,foreach算子在上面的代码中已经使用到,这里不再做代码案例展示。
# (四)总结 {#四-总结}
Spark的开发可以用Java或者Scala,Spark本身使用Scala编写,具体使用哪种语言进行开发需要根据项目情况考虑时间和学习成本。具体的API都可以在Spark官网查询:https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html