Wilder's Blog.

spark 笔记

字数统计: 1.2k阅读时长: 4 min
2018/10/12 Share

Spark

什么是RDD?

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,

partitioned collection of elements that can be operated on in parallel. This class contains the

basic operations available on all RDDs, such as map, filter, and persist.

从官方文档的解释上来看,RDD是一个弹性分布式数据集。RDD是一个不可变对象,也就是说一个RDD定义之后就不能够发生改变,Spark通过操作之后的结果应该产生的是一个新的RDD。

而且RDD是一个抽象类,不可以直接使用,我们要操纵RDD时是对RDD的子类进行操作。这个类包含了对RDD的所有基本操作,包括map、filter、persist。

我们继续看官方文档的说明,了解一下RDD的五个特点:

Internally, each RDD is characterized by five main properties :

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

RDD是一个划分的集合,也就是说将数据集划分为多个分区(partitions);

对RDD实现了一个功能,这个时候是对RDD里面所有区块进行计算;

RDD之间是可以相互依赖的,比如说RDDA=>RDDB=>RDDC,B依赖于A,C依赖于B,那么C也依赖于A,这样的话就有一个优点,当RDDC里面的数据出现丢失等情况,那么可以通过重新计算B分区中的数据来得到C分区,这就体现了RDD弹性的特点;

RDD分区上存放的是键值对的信息;

preferred locations,对每一个分片计算之后的结果会寻找一个合适的位置进行存放,这里是locations(多个地址),就像HDFS一样,对于一个块来说可能会存放于多个位置中,就像HDFS一样。

我们来看一下RDD的代码头:

1
2
3
4
5
6
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

}

从这里我们可以看到RDD有一下几大特点:

1、抽象类 abstract:RDD必然是有子类实现的,我们直接使用其子类即可

2、序列化 Serializable

3、Spark 的核心类 SparkContext

4、@transient

如何创建RDD?

There are two ways to create RDDs:

  • parallelizing an existing collection in your driver program
  • referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

1、从一个已经存在的集合里面进行构建。

1
2
3
val data = Array(1,2,3,4,5)
//这些方法以后再说,要知道通过已经存在的集合构建了RDD
val RDD = sc.parallelize(data)

2、基于一个外部的存储数据集,比如一个共享文件系统,HDFS、HBase或者其它基于Hadoop InputFormat的数据来源。

1
2
3
4
5
6
7
8
9
10
// 从hdfs文件中获取资源,从hdfs文件系统的tmp目录下面读取test.txt文件到RDD中
val hdfsRDD = sc.textFile("hdfs:///tmp/test.txt")

//从本地文件中读取到RDD中
val localRDD = sc.textFile("file:///tmp/test.txt")

//通过collect方法可以打印出RDD中的类型
localRDD.collect
//将处理的文件结果输出到本地中
hdfsRDD.saveAsFile("file:///tmp/out/result.txt")

解释一下,sc是SparkContext,这是scala语言中对sparkContext的定义,textFile 是sc中的一个方法,这个方法可以读取hdfs文件,也可以读取本地文件,除了文件以外他还可以读取文件夹。

RDD 的一些操作

RDD 有以下几类操作:

  • transformations,有如下的几个操作

    • map 函数:map函数中的操作将会对RDD中的每一个元素进行相同的操作,操作结果之后将会返回一个新的RDD(因为RDD的val变量,也就是不可变的变量),比如说RDD(1,2,3,4,5),执行了map(x=>x+1)的操作之后,返回的RDD结果为(2,3,4,5,6)

    从官方文档中可以了解到,spark的 transformations 操作都是lazy,它的意思就是说并不是当你调用这个方法就立刻去执行,仅仅是记住对应的操作,只有等你需要返回一个结果之后才会进行相应的操作。这种设计能够使得我们Spark运行更加高效。

    比如以下操作:

    1
    val resultRDD = RDD.map().filter()

    Spark只会记住对这个RDD将会执行 map 和 filter 两个操作,当你需要返回resultRDD的时候才会进行相应的转换和过滤。

  • actions

    • reduce 函数:作为聚合操作,聚合RDD里面的所有元素,然后返回一个结果,比如对RDD(1,2,3,4,5),执行reduce(x,y=>x+y),就是对RDD中所有的元素进行相加得到一个结果:15

未完待续……

CATALOG
  1. 1. Spark
  2. 2. 什么是RDD?
  3. 3. 如何创建RDD?
  4. 4. RDD 的一些操作