2.2 Spark 2.4 Core
本节讲解第二代Tungsten引擎、SparkSession和累加器API的使用。
2.2.1 第二代Tungsten引擎
Spark备受瞩目的原因之一在于它的高性能,Spark开发者为了保持这个优势,一直在不断地进行各种层次的优化,其中最令人兴奋的莫过于钨丝计划(Project Tungsten),因为钨丝计划的提出给Spark带来了极大的性能提升,并且在一定程度上引导了Spark的发展方向。
Spark是使用Scala和Java语言开发的,不可避免地运行于JVM之上。当然,内存管理也是依赖于JVM的内存管理机制,而对于大数据量的基于内存的处理,JVM对象模型对内存的额外开销,以及频繁的GC和Full GC都是非常致命的问题。另外,随着网络带宽和磁盘I/O的不断提升,内存和CPU又重新作为性能瓶颈受到关注,JVM对象的序列化、反序列化带来的性能损耗亟待解决。Spark 1.5版本加入的钨丝计划从3大方面着手解决这些问题:
(1)统一内存管理模型和二进制处理(Binary Processing)。统一内存管理模型代替之前基于JVM的静态内存管理,引入Page来管理堆内存和堆外内存(on-heap和off-heap),并且直接操作内存中的二进制数据,而不是Java对象,很大程度上摆脱了JVM内存管理的限制。
(2)基于缓存感知的计算(Cache-aware Computation)。Spark内存读取操作也会带来一部分性能损耗,钨丝计划便设计了缓存友好的算法和数据结构来提高缓存命中率,充分利用L1/L2/L3三级缓存,大幅提高了内存读取速度,进而缩短了内存中整个计算过程的时间。
(3)代码生成(Code Generation)。在JVM中,所有代码的执行由解释器一步步地解释执行,CodeGeneration这一功能则在Spark运行时动态生成用于部分算子求值的bytecode,减少了对基础数据类型的封装,并且缓解了调用虚函数的额外开销。
Spark 2.0升级了第二代Tungsten引擎。其中最重要的一点是把CodeGeneration作用于全阶段的SparkSQL和DataFrame之上(即全阶段代码生成Whole Stage Code Generation),为常见的算子带来10倍左右的性能提升。
2.2.2 SparkSession
加入SparkSession,取代原来的SQLContext和HiveContext,为了兼容两者,仍然保留。SparkSession使用方法如下:
首先获得SparkSession的Builder,然后使用Builder为SparkSession设置参数,最后使用getOrCreate方法检测当前线程是否有一个已经存在的Thread-local级别的SparkSession,如果有,则返回它;如果没有,则检测是否有全局级别的SparkSession,有,则返回,没有,则创建新的SparkSession。
在程序中如果要使用SparkContext,调用sparkSession.sparkContext即可。在程序的最后我们需要调用sparkContext.stop方法,这个方法会调用sparkContext.stop来关闭sparkContext。
从Spark 2.0开始,DataFrame和DataSet既可以容纳静态、有限的数据,也可以容纳无限的流数据,所以用户也可以使用SparkSession像创建静态数据集一样来创建流式数据集,并且可以使用相同的操作算子。这样,整合了实时流处理和离线处理的框架,结合其他容错、扩展等特性就形成了完整的Lambda架构。
2.2.3 累加器API
Spark 2.0引入了一个更加简单和更高性能的累加器API,如在1.X版本中可以这样使用累加器。
在Spark 2.X版本里使用SparkContext里内置的累加器。
只使用SparkContext里内置的累加器功能肯定不能满足略微复杂的业务类型,此时我们就可以自定义累加器。在1.X版本中的做法是(下面是官网的例子):
上面的累加器元素和返回类型是相同的,在Scala中还有另外一种方式来自定义累加器,用户只需要继承Accumulable,就可以把元素和返回值定义为不同的类型,这样我们就可以完成添加操作(如往Int类型的List里添加整数,此时元素为Int类型,而返回类型为List)。
在Spark 2.X中加入一个新的抽象类——AccumulatorV2,继承这个类要实现以下几种方法。
add方法:指定元素相加操作。
copy方法:指定对自定义累加器的复制操作。
isZero方法:返回该累加器的值是否为“零”。
merge方法:合并两个相同类型的累加器。
reset方法:重置累加器。
value方法:返回累加器当前的值。
重写这几种方法之后,只需实例化自定义累加器,并连同累加器名字一起传给sparkContext.register方法。
下面简单实现一个把字符串合并为数组的累加器。
运行结果显示的ArrayBuffer里的值顺序是不固定的,取决于各个Executor的值到达Driver的顺序。