同步操作将从 dearHaoGeGe/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
map:用于遍历RDD,将函数应用于每一个元素,返回新的RDD(transformation算子)
foreach:用于遍历RDD,将函数应用于每一个元素,无返回值(action算子)
mapPatitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)
foreachPatition:用于遍历操作RDD中的每一个分区,无返回值(action算子)
简单总结:一般使用mapPatitions和foreachPatition算子比map和foreach更加高效,推荐使用。
管理当前节点内存,CPU的使用情况,接受master发送过来的资源指令,通过executorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务,需要注意的是:
1)worker会不会汇报当前信息给master?
worker心跳给master主要只有workid,不会以心跳的方式发送资源信息给master,这样master就知道worker是否存活,只有故障的时候才会发送资源信息;
2)worker不会运行代码,具体运行的是executor,可以运行具体application斜的业务逻辑代码,操作代码的节点,不会去运行代码。
collect、reduce、take、count、saveAsTextFile等。
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD中的数据可以存储在内存或者是磁盘,而且RDD中的分区是可以改变的。
序列化可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU。
join常见分为两类:map-side join 和 reduce-side join。
当大表和小表join时,用map-side join能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的join操作一般会将所有数据根据key发送到所有的reduce分区中去,也就是shuffle的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为reduce-side-join。
如果其中有张表较小的话,则可以自身实现在 map端实现数据关联,跳过大量数据进行shuffle的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。
spark.default.parallelism只有在处理RDD时有效;而spark.sql.shuffle.partitions则是只对SparkSQL有效。
spark.sql.shuffle.partitions: 设置的是 RDD1做shuffle处理后生成的结果RDD2的分区数。
默认值:200
**spark.default.parallelism: ** 设置的是 RDD1做shuffle处理/并行处理(窄依赖算子)后生成的结果RDD2的分区数。
默认值:
对于分布式的shuffle算子, 默认值使用了结果RDD2所依赖的所有父RDD中分区数最大的, 作为自己的分区数。
对于并行处理算子(窄依赖的),有父依赖的,结果RDD分区数=父RDD分区数,没有父依赖的看集群配置:
Local mode:给定的core个数
Mesos fine grained mode: 8
Others: max(RDD分区数为总core数, 2)
1)从high-level的角度来看,两者并没有大的差别。都是将mapper(Spark 里是 ShuffleMapTask)的输出进行partition,不同的partition送到不同的reducer(Spark 里reducer可能是下一个stage里的ShuffleMapTask,也可能是 ResultTask)。Reducer以内存作缓冲区,边shuffle边 aggregate 数据,等到数据 aggregate 好以后进行 reduce()(Spark 里可能是后续的一系列操作)。
2)从low-level的角度来看,两者差别不小。Hadoop MapReduce是sort-based,进入combine()和reduce()的records必须先sort。这样的好处在于combine/reduce()可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer的shuffle对排好序的每段数据做归并)。
目前的Spark默认选择的是hash-based,通常使用HashMap来对shuffle来的数据进行aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似sortByKey()的操作;如果是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。Hadoop MapReduce将处理流程划分出明显的几个阶段:map()、spill、merge、shuffle、sort、reduce()等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在Spark中,没有这样功能明确的阶段,只有不同的stage和一系列的transformation(),所以spill、merge、aggregate等操作需要蕴含在transformation()中。
如果将map端划分数据、持久化数据的过程称为shuffle write,而将reducer读入数据、aggregate数据的过程称为shuffle read。那么在Spark中,问题就变为怎么在job的逻辑或者物理执行图中加入shuffle write和shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?Shuffle write由于不要求数据有序,shuffle write的任务很简单:将数据partition好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了fault-tolerance。
资源参数调优
1、num-executors:设置Spark作业总共要用多少个Executor进程来执行
2、executor-memory:设置每个Executor进程的内存
3、executor-cores:设置每个Executor进程的CPU core数量
4、driver-memory:设置Driver进程的内存
5、spark.default.parallelism:设置每个stage的默认task数量
开发调优
1、避免创建重复的RDD
2、尽可能复用同一个RDD
3、对多次使用的RDD进行持久化
4、尽量避免使用shuffle类算子
5、使用map-side预聚合的shuffle操作
6、使用高性能的算子
①使用reduceByKey/aggregateByKey替代groupByKey
②使用mapPartitions替代普通map
③使用foreachPartitions替代foreach
④使用filter之后进行coalesce操作
⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作
7、广播大变量:在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能。
8、使用Kryo优化序列化性能
9、优化数据结构:在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
Spark Streaming与Storm都可以用于进行实时流计算。但是他们两者的区别是非常大的。
Spark Streaming和Storm的计算模型完全不一样,Spark Streaming是基于RDD的,因此需要将一小段时间内的,比如1秒内的数据,收集起来,作为一个RDD,然后再针对这个batch的数据进行处理。而Storm却可以做到每来一条数据,都可以立即进行处理和计算。因此,Spark Streaming实际上严格意义上来说,只能称作准实时的流计算框架;而Storm是真正意义上的实时计算框架。
Storm支持的一项高级特性,是Spark Streaming暂时不具备的,即Storm支持在分布式流式计算程序(Topology)在运行过程中,可以动态地调整并行度,从而动态提高并发处理能力。而Spark Streaming是无法动态调整并行度的。但是Spark Streaming也有其优点,首先Spark Streaming由于是基于batch进行处理的,因此相较于 Storm 基于单条数据进行处理,具有数倍甚至数十倍的吞吐量。
Spark Streaming由于也身处于Spark生态圈内,因此Spark Streaming可以与Spark Core、Spark SQL,甚至是Spark MLlib、Spark GraphX进行无缝整合。流式处理完的数据,可以立即进行各种map、reduce转换操作,可以立即使用sql进行查询,甚至可以立即使用machine learning或者图计算算法进行处理。这种一站式的大数据处理功能和优势,是Storm无法匹敌的。 因此,综合上述来看,通常在对实时性要求特别高,而且实时数据量不稳定,比如在白天有高峰期的情况下,可以选择使用Storm。但是如果是对实时性要求一般,允许1秒的准实时处理,而且不要求动态调整并行度的话,选择Spark Streaming是更好的选择。
对比点 | Storm | Spark Streaming |
---|---|---|
实时计算模型 | 纯实时,来一条数据,处理一条数据 | 准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理 |
实时计算延迟度 | 毫秒级 | 秒级 |
吞吐量 | 低 | 高 |
事务机制 | 支持完善 | 支持,但不够完善 |
健壮性 / 容错性 | ZooKeeper,Acker,非常强 | Checkpoint,WAL,一般 |
动态调整并行度 | 支持 | 不支持 |
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。