SparkSQL大数据实战金沙官网线上:揭开Join的神秘

本文来自 网易云社区 。

本文由网易云 发布。

 

在之前的文章中简要介绍了Join在大数据领域中的使用背景以及常用的几种算法-broadcast hash join 、shuffle hash join以及 sort merge join等,对每一种算法的核心应用场景也做了相关介绍,这里再重点说明一番:大表与小表进行join会使用broadcast hash join,一旦小表稍微大点不再适合广播分发就会选择shuffle hash join,最后,两张大表的话无疑选择sort merge join。

Join操作是数据库和大数据计算中的高级特性,大多数场景都需要进行复杂的Join操作,本文从原理层面介绍了SparkSQL支持的常见Join算法及其适用场景。

好了,问题来了,说是这么一说,但到底选择哪种算法归根结底是SQL执行引擎干的事情,按照上文逻辑,SQL执行引擎肯定要知 道参与Join的两表大小,才能选择最优的算法喽!那么斗胆问一句,怎么知道两表大小?衡量两表大小的是物理大小还是纪录多少 抑或两者都有?其实,这是另一门学问-基于代价优化(Cost Based Optimization,简称CBO),它不仅能够解释Join算法的选 择问题,更重要的,它还能确定多表联合Join场景下的Join顺序问题。

Join背景介绍

Join是数据库查询永远绕不开的话题,传统查询SQL技术总体可以分为简单操作(过滤操作-where、排序操作-limit等),聚合操作-groupby以及Join操作等。其中Join操作是最复杂、代价最大的操作类型,也是OLAP场景中使用相对较多的操作。因此很有必要对其进行深入研究。

 

另外,从业务层面来讲,用户在数仓建设的时候也会涉及Join使用的问题。通常情况下,数据仓库中的表一般会分为“低层次表”和“高层次表”。

 

所谓“低层次表”,就是数据源导入数仓之后直接生成的表,单表列值较少,一般可以明显归为维度表或事实表,表和表之间大多存在外健依赖,所以查询起来会遇到大量Join运算,查询效率很差。而“高层次表”是在“低层次表”的基础上加工转换而来,通常做法是使用SQL语句将需要Join的表预先进行合并形成“宽表”,在宽表上的查询不需要执行大量Join,效率很高。但宽表缺点是数据会有大量冗余,且相对生成较滞后,查询结果可能并不及时。

 

为了获得时效性更高的查询结果,大多数场景都需要进行复杂的Join操作。Join操作之所以复杂,主要是通常情况下其时间空间复杂度高,且有很多算法,在不同场景下需要选择特定算法才能获得最好的优化效果。本文将介绍SparkSQL所支持的几种常见的Join算法及其适用场景。

是不是对CBO很期待呢?好吧,这里先刨个坑,下一个话题我们再聊。那今天要聊点什么呢?Join算法选择、Join顺序选择确实对 Join性能影响极大,但,还有一个很重要的因素对Join的性能至关重要,那就是Join算法优化!无论是broadcast hash join、 shuffle hash join还是sort merge join,都是最基础的join算法,有没有什么优化方案呢?还真有,这就是今天要聊的主角- Runtime Filter。

Join常见分类以及基本实现机制

当前SparkSQL支持三种Join算法:shuffle hash join、broadcast hash join以及sort merge join。其中前两者归根到底都属于hash join,只不过在hash join之前需要先shuffle还是先broadcast。其实,hash join算法来自于传统数据库,而shuffle和broadcast是大数据的皮(分布式),两者一结合就成了大数据的算法了。因此可以说,大数据的根就是传统数据库。既然hash join是“内核”,那就刨出来看看,看完把“皮”再分析一下。

RF预备知识:bloom filter

hash join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。现在假设这个Join采用的是hash join算法,整个过程会经历三步:

  1. 确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table。
  2. 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。
  3. 探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起。

金沙官网线上 1

 

基本流程可以参考上图,这里有两个小问题需要关注:

  1. hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条街。
  2. 为什么Build Table选择小表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。

上文说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过一定的分布式改造,就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。hash join分布式改造一般有两种经典方案:

  1. broadcast hash join:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景。
  2. shuffler hash join:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。

下面分别进行详细讲解。

RF说白了是使用bloomfilter对参与join的表进行过滤,减少实际参与join的数据量。为了下文详细解释整个流程,有必要先解释一 下bloomfilter这个数据结构(对之熟悉的看官可以绕道)。Bloom Filter使用位数组来实现过滤,初始状态下位数组每一位都为 0,如下图所示:

broadcast hash join

如下图所示,broadcast hash join可以分为两步:

  1. broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于BitTorrent的TorrentBroadcast。
  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探。

金沙官网线上 2

        3.SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。

金沙官网线上 3

shuffle hash join

在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步:

  1. shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle。
  2. hash join阶段:每个分区节点上的数据单独执行单机hash join算法。

金沙官网线上 4

 

看到这里,可以初步总结出来如果两张小表join可以直接使用单机版hash join;如果一张大表join一张极小表,可以选择broadcast hash join算法;而如果是一张大表join一张小表,则可以选择shuffle hash join算法;那如果是两张大表进行join呢?

假如此时有一个集合S = {x1,x2,...,xn},Bloom Filter使用k个独立的hash函数,分别将集合中的每一个元素映射到{1,…,m}的范围。 对于任何一个元素,被映射到的数字作为对应的位数组的索引,该位会被置为1。比如元素x1被hash函数映射到数字8,那么位数组 的第8位就会被置为1。下图中集合S只有两个元素x和y,分别被3个hash函数进行映射,映射到的位置分别为和,对 应的位会被置为1:

sort merge join

SparkSQL对两张大表join采用了全新的算法-sort-merge join,如下图所示,整个过程分为三个步骤:

金沙官网线上 5

 

  1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理。
  2. sort阶段:对单个分区节点的两表数据,分别进行排序。
  3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边。如下图所示:

金沙官网线上 6

 

经过上文的分析,很明显可以得出来这几种Join的代价关系:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join),数据仓库设计时最好避免大表与大表的join查询,SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调大,让更多join实际执行为broadcast hash join。

金沙官网线上 7

总结

Join操作是数据库和大数据计算中的高级特性,因为其独特的复杂性,很少有同学能够讲清楚其中的原理。本文试图带大家真正走进Join的世界,了解常用的几种Join算法以及各自的适用场景。后面两篇文章将会在此基础上不断深入Join内部,一点一点地揭开它的面纱,敬请关注!

 

本文已由作者范欣欣授权网易云社区发布,原文链接:SparkSQL大数据实战:揭开Join的神秘面纱

现在假如要判断另一个元素是否是在此集合中,只需要被这3个hash函数进行映射,查看对应的位置是否有0存在,如果有的话,表 示此元素肯定不存在于这个集合,否则有可能存在。下图所示就表示z肯定不在集合{x,y}中:

金沙官网线上 8

RF算法理论

为了更好地说明整个过程,这里使用一个SQL示例对RF算法进行完整讲解,SQL:select item.name,order.*金沙官网线上, from order,item where order.item_id =item.idand item.category = ‘book’,其中order为订单表,item为商品表,两张表根据商品id字段 进行join,该SQL意为取出商品类别为书籍的所有订单详情。假设商品类型为书籍的商品并不多,join算法因此确定为broadcast hash join。整个流程如下图所示:

金沙官网线上 9

Step 1:将item表的join字段经过多个hash函数映射处理为一个bloomfilter(如果对bloomfilter不了解,自行 google);

Step 2:将映射好的bloomfilter分别广播到order表的所有partition上,准备进行过滤;

Step 3:以Partition2为例,存储进程(比如DataNode进程)将order表中join列(order.item_id)数据一条一条读出来,使用 bloomfilter进行过滤。淘汰该订单数据不是书籍相关商品的订单,这条数据直接跳过;否则该条订单数据有可能是待检索订单,将 该行数据全部扫描出来;

Step 4:将所有未被bloomfilter过滤掉的订单数据,通过本地socket通信发送到计算进程;

Step 5:再将所有书籍商品数据广播到所有Partition节点与step4所得订单数据进行真正的hashjoin操作,得到最终的选择结果。

RF算法分析

上面通过一个SQL示例简单演示了整个RF算法在broadcast hash join中的操作流程,根据流程对该算法进行一下理论层次分析:

RF本质:通过谓词( bloomfilter)下推,在存储层通过bloomfilter对数据进行过滤,可以从三个方面实现对Join的优化。其一, 如果可以跳过很多记录,就可以减少了数据IO扫描次数。这点需要重点解释一下,许多朋友会有这样的疑问:既然需要把数据扫描 出来使用BloomFilter进行过滤,为什么还会减少IO扫描次数呢?这里需要关注一个事实:大多数表存储行为都是列存,列之间独 立存储,扫描过滤只需要扫描join列数据,如果某一列被过滤掉了,其他对应的同一行的列就不需要扫描了,这 样减少IO扫描次数。其二,减少了数据从存储层通过socket发送到计算层的开销,其三,减少了最终hash join执行的 开销。

RF代价:对照未使用RF的Broadcast Hash Join来看,前者主要增加了bloomfilter的生成、广播以及大表根据bloomfilter进行过 滤这三个开销。通常情况下,这几个步骤在小表较小的情况下代价并不大,基本可以忽略。

RF优化效果:基本取决于bloomfilter的过滤效果,如果大量数据被过滤掉了,那么join的性能就会得到极大提升;否则性能提升就 会有限。

RF实现:和常见的谓词下推(’=‘,’>’,’<‘等)一样,RF实现需要在计算层以及存储层分别进行相关逻辑实现,计算层 要构造bloomfilter并将bloomfilter下传到存储层,存储层要实现使用该bloomfilter对指定数据进行过滤。

RF效果验证

事实上,RF这个东东的优化效果是在组内同事何大神做impala on parquet以及impala on kudu的基准对比测试的时候分析发现 的。实际测试中,impala on parquet 比之impala on kudu性能有明显优势,目测至少10倍性能提升。同一SQL解析引擎,不同 存储引擎,性能竟然天壤之别!为了分析具体原因,同事就使用impala的执行计划分析工具对两者的执行计划分别进行了分析,才 透过蛛丝马迹发现前者使用了RF,而后者并没有(当然可能还有其他因素,但RF肯定是原因之一)。

简单复盘一下这次测试吧,基准测试使用TPCDS测试,数据规模为1T,本文使用测试过程中的一个典型SQL作为示例对RF 的神奇功效进行回放演示。下图是Q40的对比性能,直观上来看RF可以直接带来40x的性能提升,40倍哎,这到底是怎么做到的?

金沙官网线上 10

先来简单看看Q40的SQL语句,如下所示,看起来比较复杂,核心涉及到3个表(catalog_sales join date_dim 、catalog_sales join warehouse 、catalog_sales join item)的join操作:

select

w_state, i_item_id,

sum(case when (cast(d_date as date) <

cast ('1998-04-08' as date))

then cs_sales_price –

coalesce(cr_refunded_cash,0)

else 0 end) as sales_before,

本文由金沙官网线上发布于数据库,转载请注明出处:SparkSQL大数据实战金沙官网线上:揭开Join的神秘

您可能还会对下面的文章感兴趣: