51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spark算子实战Java版,学到了

# (一)概述 {#一-概述}

算子从功能上可以分为Transformations转换算子和Action行动算子。转换算子用来做数据的转换操作,比如map、flatMap、reduceByKey等都是转换算子,这类算子通过懒加载执行。行动算子的作用是触发执行,比如foreach、collect、count等都是行动算子,只有程序运行到行动算子时,转换算子才会去执行。

本文将介绍开发过程中常用的转换算子和行动算子,Spark代码基于Java编写,前置代码如下:

public class SparkTransformationTest {
    public static void main(String[] args) {
        // 前置准备
        SparkConf conf = new SparkConf();
        conf.setMaster("local[1]");
        conf.setAppName("SPARK ES");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
    }
}

1
2
3
4
5
6
7
8
9
10

# (二)转换算子 {#二-转换算子}

# map {#map}

map(func):通过函数func传递的每个元素,返回一个新的RDD。

JavaRDD<Object> map = javaRdd.map((Function<String, Object>) 
        item -> "new" + item);
map.foreach(x -> System.out.println(x));

1
2
3

返回一个新的RDD,数据是newa、newb、newc、newd、newe

# filter {#filter}

filter(func):筛选通过func处理后返回 true 的元素,返回一个新的RDD。

JavaRDD<String> filter = javaRdd.filter(item -> item.equals("a") || item.equals("b"));
filter.foreach(x -> System.out.println(x));

1
2

返回的新RDD数据是a和b。

# flatMap {#flatmap}

flatMap(func):类似于 map,但每个输入项可以映射到 0 个或更多输出项。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a,b", "c,d,e", "f,g"));
JavaRDD<String> flatMap = javaRdd.flatMap((FlatMapFunction<String, String>) 
        s -> Arrays.asList(s.split(",")).iterator());
flatMap.foreach(x -> System.out.println(x));

1
2
3
4

入参只有3个,经过flatMap映射后返回了长度为7的RDD。

# mapPartitions {#mappartitions}

mapPartitions(func):类似于map,但该函数是在RDD每个partition上单独运行,因此入参会是Iterator<Object>

JavaRDD<String> mapPartitions = javaRdd.mapPartitions((FlatMapFunction<Iterator<String>, String>) stringIterator -> {
    ArrayList<String> list = new ArrayList<>();
    while (stringIterator.hasNext()) {
        list.add(stringIterator.next());
    }
    return list.iterator();
});
mapPartitions.foreach(x -> System.out.println(x));

1
2
3
4
5
6
7
8

除了是对Iterator 进行处理之外,其他的都和map一样。

# union {#union}

union(otherDataset):返回一个新数据集,包含两个数据集合的并集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("1", "2", "3", "4", "5"));
JavaRDD<String> unionRdd = javaRdd.union(newJavaRdd);
unionRdd.foreach(x-> System.out.println(x));

1
2
3

# intersection {#intersection}

intersection(otherDataset):返回一个新数据集,包含两个数据集合的交集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("a", "b", "3", "4", "5"));
JavaRDD<String> intersectionRdd = javaRdd.intersection(newJavaRdd);
intersectionRdd.foreach(x-> System.out.println(x));

1
2
3

# groupByKey {#groupbykey}

groupByKey ([ numPartitions ]):在 (K, V) 对的数据集上调用时,返回 (K, Iterable ) 对的数据集,可以传递一个可选numPartitions参数来设置不同数量的任务。

这里需要了解Java中的另外一种RDD,JavaPairRDD。JavaPairRDD是一种key-value类型的RDD,groupByKey就是针对JavaPairRDD的API。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupByKey = javaPairRDD.groupByKey();
groupByKey.foreach(x -> System.out.println(x._1()+x._2()));

1
2
3
4
5
6
7

最终输出结果:

a[1, 2]
b[1]
c[3]

1
2
3

# reduceByKey {#reducebykey}

reduceByKey(func, [numPartitions]):在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合。和groupByKey不同的地方在于reduceByKey对value进行了聚合处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> reduceRdd = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
reduceRdd.foreach(x -> System.out.println(x._1()+x._2()));

1
2
3
4
5
6
7

最终输出结果:

a3
b1
c3

1
2
3

# aggregateByKey {#aggregatebykey}

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):aggregateByKey这个算子相比上面这些会复杂很多,主要参数有zeroValue、seqOp、combOp,numPartitions可选。

zeroValue是该算子设置的初始值,seqOp函数是将rdd中的value值和zeroValue进行处理,combOp是将相同key的数据进行处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> aggregateRdd = javaPairRDD.aggregateByKey(1,
        // seqOp函数中的第一个入参是 zeroValue,第二个入参是rdd的value,这里对所有的value+1
        (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2,
        // combOp函数是对同一个key的value进行处理,这里对相同key的value进行相加
        (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
aggregateRdd.foreach(x -> System.out.println(x._1()+":"+x._2()));

1
2
3
4
5
6
7
8
9
10
11

最终输出结果如下:

a:4
b:2
c:4

1
2
3

# (三)行动算子 {#三-行动算子}

# reduce {#reduce}

reduce(func):使用函数func聚合数据集的元素(它接受两个参数并返回一个)。 下面这段代码对所有rdd进行相加:

String reduce = javaRdd.reduce((Function2<String, String, String>) (v1, v2) -> {
    System.out.println(v1 + ":" + v2);
    return v1+v2;
});
System.out.println("result:"+reduce);

1
2
3
4
5

最终结果如下,从结果可以看出,每次对v1都是上一次reduce运行之后的结果:

a:b
ab:c
abc:d
abcd:e
result:abcde

1
2
3
4
5

# collect() {#collect}

collect():将driver中的所有元素数据通过集合的方式返回,适合小数据量的场景,大数据量会导致内存溢出。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> collect = javaRdd.collect();

1
2

# count() {#count}

count():返回一个RDD中元素的数量。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
long count = javaRdd.count();

1
2

# first() {#first}

first():返回第一个元素。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
String first = javaRdd.first();

1
2

# take {#take}

take(n):返回前N个元素,take(1)=first()。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.take(3);

1
2

# takeOrdered {#takeordered}

takeOrdered(n, [ordering]):返回自然排序的前N个元素,或者指定排序方法后的前N个元素。首先写一个排序类。

public class MyComparator implements Comparator<String>, Serializable {
    @Override
    public int compare(String o1, String o2) {
        return o2.compareTo(o1);
    }
}

1
2
3
4
5
6

接着是调用方式:

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.takeOrdered(3, new MyComparator());

1
2

# foreach {#foreach}

foreach(func):该函数对数据集的每个RDD运行func函数,foreach算子在上面的代码中已经使用到,这里不再做代码案例展示。

# (四)总结 {#四-总结}

Spark的开发可以用Java或者Scala,Spark本身使用Scala编写,具体使用哪种语言进行开发需要根据项目情况考虑时间和学习成本。具体的API都可以在Spark官网查询:https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

赞(4)
未经允许不得转载:工具盒子 » Spark算子实战Java版,学到了