HOME> 世界杯第二> Hadoop/Spark的Shuffle过程为什么需要进行排序?

Hadoop/Spark的Shuffle过程为什么需要进行排序?

世界杯第二 2025-06-05 15:26:23
可能很多小伙伴和我一样会有这样的好奇,对大量的数据进行排序非常耗时,我的业务场景明明不需要进行排序,为什么shuffle过程还少不了排序...

可能很多小伙伴和我一样会有这样的好奇,对大量的数据进行排序非常耗时,我的业务场景明明不需要进行排序,为什么shuffle过程还少不了排序呢?以下介绍一些我个人对shuffle排序的理解~包括:

什么是shuffle?

shuffle过程为什么需要进行排序?

Spark的shuffle到底需不需要排序?

(如有问题欢迎帮忙指正!)

一,什么是shuffle

shuffle是并行计算的解决方案之一MapReduce必不可少的一步,它是Map和Reduce的桥梁。

试想这样一个场景,你有十台机器,每台机器分别存储了一份数据,这些数据记录了每个顾客的购物流水。此时,如果需要统计这些顾客的购物总金额,你首先会想到每台机器先在本地并行进行数据整理,整理为 [顾客A :消费金额A] 的格式。这样的行为就是Map操作,即可以并行执行的操作。

但是顾客的信息是分散在不同机器的,如果想统计每个顾客的消费总额,一种办法是将十台机器的数据合并到一台机器上,然后在最终的机器上统计每个顾客的购物金额。当数据量小的时候,这样的方式不失为一种好办法,但当面对海量数据时,单台机器想要统计这样的海量数据几乎很难实现~为此,MapReduce提出了一种解决方式,就是先对数据进行大洗牌(shuffle),让数据按照要求进行分组存储,然后各台机器并行统计各组内的数据情况(Reduce)。由此可以看出,shuffle的主要目的是:1,对Map机器上的数据进行重新分组;2,让每个Reduce知道它需要的数据分别在每个Map机器的哪里。

二、shuffle过程为什么需要进行排序

注意,Hadoop和Spark都是分布式并行计算!意味着所有计算和数据都分布在多台机器上(虽然它们对使用者来说可以像在单台机器上进行操作一样),那么一台机器想要处理另一台机器的数据,就需要通过数据拉取+本地计算的方式(它不可能在别的机器上进行计算)。

一种简单的办法是每个Reducer机器都拉取全部Map的全部数据,然后进行遍历去寻找自己需要的内容,这样显然非常低效。于是,shuffle自然而然的提出它可以对Map机器上的数据按照设定的hash key进行排序,此时每个Map机器shuffle后的结果就像一个有序长数组一样,相同Hash key的数据按顺序存储在一起,我们只需要记录每个Reducer需要的key起始和结束位置(偏移量)。这样每个Reducer都可以得知自己要处理的数据是哪些,直接拉取和计算对应的数据,避免了大量无用数据的存储和计算!

这里我们由此知,shuffle过程之所以要排序,不是为了满足业务需要,而是为了计算每个Reducer需要数据的偏移量,告诉Reducer他们需要的数据位置。

三、Spark的shuffle到底需不需要排序

先说结论,Spark shuffle方式不止一种,有的需要排序,有的不需要。

1,不需要排序,把每个Reducer需要的数据存储在不同文件中,文件内部无序。这种方式由HashShuffleManager实现,是spark1.2之前默认的shuffle方式。这种方法听起来很美好,把排序的过程转移为分文件,对于一个Mapper,将每一个Reducer需要的内容拆分成不同的文件,如果有N个Mapper,M个Reducer,那么shuffle产生的文件数量将是N*M。100个Mapper和200个Reducer产生的文件就是20000个!虽然减少了排序的步骤,可是有增加了非常多磁盘IO的消耗!为此,spark提出了一种consolidate的方式,同一个CPU Core产生的文件还是进行一次合并(spark.Shuffle.consolidateFiles)。

2,同Hadoop一致,需要排序,用偏移量来表示Reducer需要数据的位置。另外,spark可以可以设置bypass阈值(spark.Shuffle.sort.bypassMergeThreshold),当reduce数量小于阈值时使用上面的hash方式,不过会多一步合并的过程,把Map结果进行合并并创建索引文件。当超过阈值时会直接进行排序,按照sort的方式处理,以防产生的中间文件太多。