1.RDD基础

Spark对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。RDD其实就是分布式的元素集合。在 Spark中,对数据的所有操作不外乎创建 RDD、转化已有RDD以及调用RDD操作进行求值。而在这一切背后,Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。

2. 创建RDD

Spark提供了两种创建 RDD 的方式:

  • 读取外部数据集
1
val lines = sc.textFile("/path/to/README.md")
  • 在程序中对一个集合进行并行化
1
val lines = sc.parallelize(List("pandas", "i like pandas"))

3. RDD操作

RDD 支持两种操作: 转化操作 transformation行动操作 action

3.1 转化操作

转化操作是返回一个新的 RDD 的操作,比如 map() 和 filter()
map() 接收一个函数,把这个函数用于 RDD 中的每个元素, 将函数的返回结果作为结果RDD中对应元素的值。 如用Scala 实现map计算 RDD中各值的平方:

1
2
3
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

flatMap()和 map() 类似,函数被分别应用到了输入 RDD 的每个元素上,不过返回的不是一个元素,而是一个返回值序列的迭代器。即对每个输入元素生成多个输出元素,得到由各列表中的元素组成的 RDD。
filter() 则接收一个函数,并将 RDD 中满足该函数的 元素放入新的 RDD 中返回,如用 Scala 实现 filter() 转化操作:

1
2
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
函数 功能 示例 结果
map() 将函数应用于 RDD 中的每个元素,将返回值构成新的RDD rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() 将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来切分单词 rdd.flatMap(x => x.to(3)) {1, 2, 3,2, 3, 3, 3}
filter() 返回一个由通过传给 filter() 的函数的元素组成的 RDD rdd.filter(x => x != 1) {2, 3, 3}
distinct() 去重 rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, [seed]) 对 RDD 采样,以及是否替换 rdd.sample(false, 0.5) 非确定的

3.2 行动操作

行动操作则是向程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first()
在 Scala 中使用行动操作对错误进行计数:

1
2
3
println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)

reduce()接收一个函数作为参数,这个函数操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数 + ,可以用它来对RDD进行累加。
Scala 中的 reduce():

1
val sum = rdd.reduce((x, y) => x + y)

fold() 和 reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个 “初始值”来作为每个分区第一次调用时的结果。例如 + 对应的 0, * 对应的 1,或拼接操作对应的空列表

collect() 函数可以用来获取整个 RDD 中的数据,但collect() 不能用在大规模数据集上,仅当整个数据集能在单台机器的内存中放得下时才能使用。
saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来。

4. RDD持久化

默认情况下RDD的内容是临时的,但Spark提供了在RDD中持久化数据的机制。第一次调用动作并计算出RDD内容后,RDD的内容可以存储在集群的内存或磁盘上。这样下一次需要调用依赖该RDD的动作时,就不需要从依赖关系中重新计算RDD,数据可以从缓存分区中直接返回:

cached.cache()
cached.count()
cached.take(10)

在上述代码中,cache方法调用指示在下次计算RDD后,要把RDD存储起来。调用count会导致第一次计算RDD。采取(take)这个动作返回一个本地的Array,包含RDD的前10个元素。但调用take时,访问的是cached已经缓存好的元素,而不是从cached的依赖关系中重新计算出来的。

当Spark持久化存储一个RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。Spark为持久化RDD定义了几种不同的机制,用不同的StorageLevel值表示。

rdd.cache()是rdd.persist(StorageLevel.MEMORY)的简写,它将RDD存储为未序列化的对象。

本文地址: http://easonlv.github.io/2016/10/03/spark学习之RDD/