同步操作将从 Java精选/Ebooks 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
这是因为Yarn支持动态资源配置。Standalone模式只支持简单的固定资源分配策略,每个任务固定数量的core,各Job按顺序依次分配在资源,资源不够的时候就排队。这种模式比较适合单用户的情况,多用户的情境下,会有可能有些用户的任务得不到资源。
Yarn作为通用的种子资源调度平台,除了Spark提供调度服务之外,还可以为其他系统提供调度,如Hadoop MapReduce、Hive等。
Spark是一种快速、通用、可扩展的大数据分析引擎。
2009年诞生于加州大学伯克利分校AMPLab。2010年开源,2013年6月成为Apache孵化项目。2014年2月成为Apache顶级项目。
目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、SparkStreaming、GraphX、MLlib等子项目。
Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。
当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
当发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序。
一、数据问题造成的数据倾斜
找出异常的key,如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。 选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个。
比如
df.select("key").sample(false,0.1)
.(k=>(k,1)).reduceBykey(+)
.map(k=>(k._2,k._1))
.sortByKey(false).take(10)
如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。
经过分析,倾斜的数据主要有以下三种情况:
1)null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
2)无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
3)有效数据,业务导致的正常数据分布。
解决办法
第1,2种情况,直接对数据进行过滤即可(因为该数据对当前业务不会产生影响)。
第3种情况则需要进行一些特殊操作,常见的有以下几种做法
1)隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
2)对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
3)使用reduceByKey 代替 groupByKey(reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义)
4)使用map join。
案例
如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:
1)将原始的 key 转化为 key + 随机值(例如Random.nextInt)
2)对数据进行 reduceByKey(func)
3)将key+随机值 转成 key
4)再对数据进行reduceByKey(func)
案例操作流程分析:
假设说有倾斜的Key,给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么解决?进行map操作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作,这样就可以把原本倾斜的Key通过分而治之方案分散开来,最后又进行了全局聚合。
注意1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。
注意2: 单独处理异常数据时,可以配合使用Map Join解决。
二、Spark使用不当造成的数据倾斜
提高shuffle并行度
1)dataFrame和sparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。
2)rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。
3)局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。
使用map join 代替reduce join
在小表不是特别大(取决于executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰。这是因为局限性的问题,也就是先将小数据发送到每个executor上,所以数据量不能太大。
map:用于遍历RDD,将函数应用于每一个元素,返回新的RDD(transformation算子)
foreach:用于遍历RDD,将函数应用于每一个元素,无返回值(action算子)
mapPatitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)
foreachPatition:用于遍历操作RDD中的每一个分区,无返回值(action算子)
简单总结:一般使用mapPatitions和foreachPatition算子比map和foreach更加高效,推荐使用。
Spark Streaming与Storm都可以用于进行实时流计算。但是他们两者的区别是非常大的。
Spark Streaming和Storm的计算模型完全不一样,Spark Streaming是基于RDD的,因此需要将一小段时间内的,比如1秒内的数据,收集起来,作为一个RDD,然后再针对这个batch的数据进行处理。而Storm却可以做到每来一条数据,都可以立即进行处理和计算。因此,Spark Streaming实际上严格意义上来说,只能称作准实时的流计算框架;而Storm是真正意义上的实时计算框架。
Storm支持的一项高级特性,是Spark Streaming暂时不具备的,即Storm支持在分布式流式计算程序(Topology)在运行过程中,可以动态地调整并行度,从而动态提高并发处理能力。而Spark Streaming是无法动态调整并行度的。但是Spark Streaming也有其优点,首先Spark Streaming由于是基于batch进行处理的,因此相较于 Storm 基于单条数据进行处理,具有数倍甚至数十倍的吞吐量。
Spark Streaming由于也身处于Spark生态圈内,因此Spark Streaming可以与Spark Core、Spark SQL,甚至是Spark MLlib、Spark GraphX进行无缝整合。流式处理完的数据,可以立即进行各种map、reduce转换操作,可以立即使用sql进行查询,甚至可以立即使用machine learning或者图计算算法进行处理。这种一站式的大数据处理功能和优势,是Storm无法匹敌的。 因此,综合上述来看,通常在对实时性要求特别高,而且实时数据量不稳定,比如在白天有高峰期的情况下,可以选择使用Storm。但是如果是对实时性要求一般,允许1秒的准实时处理,而且不要求动态调整并行度的话,选择Spark Streaming是更好的选择。
对比点 | Storm | Spark Streaming |
---|---|---|
实时计算模型 | 纯实时,来一条数据,处理一条数据 | 准实时,对一个时间段内的数据收集起来,作为一个RDD,再处理 |
实时计算延迟度 | 毫秒级 | 秒级 |
吞吐量 | 低 | 高 |
事务机制 | 支持完善 | 支持,但不够完善 |
健壮性 / 容错性 | ZooKeeper,Acker,非常强 | Checkpoint,WAL,一般 |
动态调整并行度 | 支持 | 不支持 |
1)从high-level的角度来看,两者并没有大的差别。都是将mapper(Spark 里是 ShuffleMapTask)的输出进行partition,不同的partition送到不同的reducer(Spark 里reducer可能是下一个stage里的ShuffleMapTask,也可能是 ResultTask)。Reducer以内存作缓冲区,边shuffle边 aggregate 数据,等到数据 aggregate 好以后进行 reduce()(Spark 里可能是后续的一系列操作)。
2)从low-level的角度来看,两者差别不小。Hadoop MapReduce是sort-based,进入combine()和reduce()的records必须先sort。这样的好处在于combine/reduce()可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer的shuffle对排好序的每段数据做归并)。
目前的Spark默认选择的是hash-based,通常使用HashMap来对shuffle来的数据进行aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似sortByKey()的操作;如果是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。Hadoop MapReduce将处理流程划分出明显的几个阶段:map()、spill、merge、shuffle、sort、reduce()等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在Spark中,没有这样功能明确的阶段,只有不同的stage和一系列的transformation(),所以spill、merge、aggregate等操作需要蕴含在transformation()中。
如果将map端划分数据、持久化数据的过程称为shuffle write,而将reducer读入数据、aggregate数据的过程称为shuffle read。那么在Spark中,问题就变为怎么在job的逻辑或者物理执行图中加入shuffle write和shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?Shuffle write由于不要求数据有序,shuffle write的任务很简单:将数据partition好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了fault-tolerance。
1、技术角度上,面向的数据集类型不同
ML的API是面向Dataset的(Dataframe 是 Dataset的子集,也就是Dataset[Row]), mllib是面对RDD的。
Dataset和RDD有啥不一样呢?
Dataset的底端是RDD。Dataset对RDD进行了更深一层的优化,比如说有sql语言类似的黑魔法,Dataset支持静态类型分析所以在compile time就能报错,各种combinators(map,foreach 等)性能会更好,等等。
2、编程过程上,构建机器学习算法的过程不同
ML提倡使用pipelines,把数据想成水,水从管道的一段流入,从另一端流出。ML是1.4比 Mllib更高抽象的库,它解决如果简洁的设计一个机器学习工作流的问题,而不是具体的某种机器学习算法。未来这两个库会并行发展。
1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点。
2)功能:负责向集群申请资源,向master注册信息,负责了作业的调度,,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
1、窄依赖指的是每一个parent RDD的partition最多被子RDD的一个partition使用(一子一亲)
2、宽依赖指的是多个子RDD的partition会依赖同一个parent RDD的partition(多子一亲) RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区,每个分区就是一个dataset片段。RDD可以相互依赖。
首先,窄依赖可以支持在同一个cluster node上,以pipeline形式执行多条命令(也叫同一个 stage 的操作),例如在执行了map后,紧接着执行filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。
其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的parent partition即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会分配到多个节点进行),相反的是宽依赖牵涉RDD各级的多个parent partition。
driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来,抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。