同步操作将从 Java精选/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
cogroup的函数实现:这个实现根据两个要进行合并的两个RDD操作,生成一个CoGroupedRDD的实例,这个RDD的返回结果是把相同的key中两个RDD分别进行合并操作,最后返回的RDD的value是一个Pair的实例。
这个实例包含两个Iterable的值,第一个值表示的是RDD1中相同KEY的值,第二个值表示的是RDD2中相同key的值。
由于做cogroup的操作,需要通过partitioner进行重新分区的操作,因此,执行这个流程时,需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd,同时他们对应的partitioner相同时,就不需要执行shuffle。
DAG Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage(血缘),RDD的依赖关系使用Dependency维护。DAG在Spark中的对应的实现为DAGScheduler。
RDD RDD是Spark的灵魂,也称为弹性分布式数据集。一个RDD代表一个可以被分区的只读数据集。RDD内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。
Rdd的五个特征: 1)dependencies: 建立RDD的依赖关系,主要RDD之间是宽窄依赖的关系,具有窄依赖关系的RDD可以在同一个stage中进行计算。
2)partition: 一个RDD会有若干个分区,分区的大小决定了对这个RDD计算的粒度,每个RDD的分区的计算都在一个单独的任务中进行。
3)preferedlocations: 按照“移动数据不如移动计算”原则,在Spark进行任务调度的时候,优先将任务分配到数据块存储的位置。
4)compute: Spark中的计算都是以分区为基本单位的,compute函数只是对迭代器进行复合,并不保存单次计算的结果。
5)partitioner: 只存在于(K,V)类型的RDD中,非(K,V)类型的partitioner的值就是None。
RDD的算子主要分成2类,action和transformation。这里的算子概念,可以理解成就是对数据集的变换。action会触发真正的作业提交,而transformation算子是不会立即触发作业提交的。每一个transformation方法返回一个新的 RDD。只是某些transformation比较复杂,会包含多个子transformation,因而会生成多个RDD。这就是实际RDD个数比我们想象的多一些 的原因。通常是,当遇到action算子时会触发一个job的提交,然后反推回去看前面的transformation算子,进而形成一张有向无环图。
Stage在DAG中又进行stage的划分,划分的依据是依赖是否是shuffle的,每个stage又可以划分成若干task。接下来的事情就是driver发送task到executor,executor自己的线程池去执行这些task,完成之后将结果返回给driver。action算子是划分不同job的依据。
1)本地模式:适用于测试。
2)standalone 模式:使用spark自带的资源调度框架。
3)spark on yarn 模式:最流行的方式,使用yarn集群调度资源。
4)mesos模式:国外使用比较多。
driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来,抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
方法1:
1)按照key对数据进行聚合(groupByKey)
2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)
注意:当数据量太大时,会导致OOM
方法2:
1)取出所有的key
2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
2)对每个分区运用spark的排序算子进行排序
这是因为Yarn支持动态资源配置。Standalone模式只支持简单的固定资源分配策略,每个任务固定数量的core,各Job按顺序依次分配在资源,资源不够的时候就排队。这种模式比较适合单用户的情况,多用户的情境下,会有可能有些用户的任务得不到资源。
Yarn作为通用的种子资源调度平台,除了Spark提供调度服务之外,还可以为其他系统提供调度,如Hadoop MapReduce、Hive等。
资源参数调优
1、num-executors:设置Spark作业总共要用多少个Executor进程来执行
2、executor-memory:设置每个Executor进程的内存
3、executor-cores:设置每个Executor进程的CPU core数量
4、driver-memory:设置Driver进程的内存
5、spark.default.parallelism:设置每个stage的默认task数量
开发调优
1、避免创建重复的RDD
2、尽可能复用同一个RDD
3、对多次使用的RDD进行持久化
4、尽量避免使用shuffle类算子
5、使用map-side预聚合的shuffle操作
6、使用高性能的算子
①使用reduceByKey/aggregateByKey替代groupByKey
②使用mapPartitions替代普通map
③使用foreachPartitions替代foreach
④使用filter之后进行coalesce操作
⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作
7、广播大变量:在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能。
8、使用Kryo优化序列化性能
9、优化数据结构:在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
1)自动的进行内存和磁盘的存储切换;
2)基于Lingage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG TASK调度和资源无关;
7)数据分片的高度弹性
a. 分片很多碎片可以合并成大的
b. par
1、cluster模式会在集群的某个节点上为Spark程序启动一个称为Master的进程,然后Driver程序会运行正在这个Master进程内部,由这种进程来启动Driver程序,客户端完成提交的步骤后就可以退出,不需要等待Spark程序运行结束,这是四一职中适合生产环境的运行方式。
2、client模式也有一个Master进程,但是Driver程序不会运行在这个Master进程内部,而是运行在本地,只是通过Master来申请资源,直到运行结束,这种模式非常适合需要交互的计算。显然Driver在client模式下会对本地资源造成一定的压力。
1、基本原理
1) MapReduce:基于磁盘的大数据批量处理系统
2)Spark:基于RDD(弹性分布式数据集)数据处理,显示将RDD数据存储到磁盘和内存中。
2、模型
1) MapReduce可以处理超大规模的数据,适合日志分析挖掘等较少的迭代的长任务需求,结合了数据的分布式的计算。
2) Spark:适合数据的挖掘,机器学习等多轮迭代式计算任务。
总结来说:
1)基于内存计算,减少低效的磁盘交互;
2)高效的调度算法,基于DAG;
3)容错机制Linage,精华部分就是DAG和Lingae。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。