Postgres Hash Join
理论依据
使用 hybrid hash join
实现的hash join,大意是:
outer 和 inner 无法完全使用内存实现 hash join,所以需要把数据分区 dump 到磁盘中,每个分区称为一个 batch,在
优化阶段,会使用已有的统计信息决定分区大小,但是实际执行中, 如果有偏差,则分区按2倍扩容,分区运行时不会减少
nbatch // number of batches
nbuckets // buckets in the in-memory hash table
bucketno = hashvalue % nbuckets
batchno = (hashvalue / nbuckets) % nbatch
hybrid hash join
batch 0 不会落盘,算是一种优化
大致过程
1. Scan inner table
,构造 hashtable
可以细分为下面几种情况
- 如果判断内存完全可以存下 innert table 的数据,则只使用一个 batch
- 否则计算 hashvalue, 使用 hashvalue 计算 batchno ,不属于 batch 0 的数据写到磁盘中对应批次的文件
- 如果内存中 hahstable 由于实际使用的数据超过内存上限,则需要进行分裂
1. nbatch 翻倍, scan hashtable , 使用新的 nbatch 计算 batchno
2. 属于 batch 0 的数据继续放在内存中,其他的 dump 到磁盘对应的文件
3. 原来磁盘中已有的文件不动, 后续 probe 的时候才需要重新计算 batch,也可能会写文件
2. Scan outer table
,尝试匹配数据
和第一步情况类似
- 读取 outer table 的数据,计算 hashvalue, 使用 nbatch 计算 batchno ,这里需要注意的是 nbatch 是最新值,前面构造 hashtable 的时候可能发生变化
- 如果 batchno 匹配当前 hashtable , 则尝试进行匹配
- 否则写磁盘到对应批次的文件中
- 这里需要注意的是,由于之前 inner table 构造 hashtable 的过程中,如果发生 分裂, 则批次较前的临时文件比较大,因为文件中的数据可能不全 属于这一批, 但是对于 outer table,由于是使用新的 nbatch 计算的,所以 tuple 会更分散,所以直观的感受就是临时文件大小是明显小于 inner table 的文件
- 处理完当前 批次,则读取磁盘中下一个 batch 文件,此时 inner 表的 hahstable 可能是分裂之前的批次,需要使用新的 nbatch 处理,也可能写磁盘
- 重复处理完所有的文件即可
并行
并行需要区分不同的执行计划
- hash join 并行,但是 hash 不并行
inner 侧会在各自得 worker 私有内存中构建 hashtable,然后并行 scan outer,进行 hash join,最终得结果就是 gather 之后得结果 - hash join 并行,hash 并行
inner 侧会在 共享内存中构建一个 hashtable,大小为work_memo * hash_mem_multiplier * num_workers
, 所有的 inner tuple 存入这个 hashtable 中。并行 scan outer,进行 hash join
再加上非并行 hash , hash join 一般有三种情况, inner 并行。 但是 outer 不并行的情况一般没有, 因为 outer 一般是大表
并行有下面几点需要注意:
- hash 是一个单独得算子,所以是否并行是可以单独使用参数 enable_parallel_hash 控制的
- hash 的并行也需要考虑内存中是否能完全处理 inner tuple 的情况
- 如果不能,则除了 batch 0 之外的数据需要 dump 到磁盘
- 由于此时是使用 SharedTuplestore 进行管理,所以可能存在多个 worker 都写如到一个文件的情况, 可能导致文件太大,后面 load 的时候超过内存限制
- 所以判断如果此时文件太大,是否需要增加 nbatch 重新计算位置
- 如果 nbatch 大于1, outer table 也需要 进行类似的分区操作,把不属于 batch 0 的数据 dump 到文件
- nbatch 0 则直接读取即可
- batch 0 消耗完,则读取下一个文件,构造 hashtable
CheckPoint
避免频繁文件读写, 可以适当加大 work_mem, 或者调节
hash_mem_multiplier
优化器会读取统计信息, 避免数据倾斜的情况,如果存在数据倾斜,则优先把倾斜的数据存放在内存中,使用专门的 skewBucket 管理
- 但是也需要注意内存的使用量,如果 hashtable 达到内存限制, 则会把最后一个 skewBucket dump 到其对应的临时文件中
但是对于 NULL 值
- 并不收集 MCV 信息,所以 key 存在大量 NULL ,且是 outer join 得时候, 可能由于估算不准确,导致 hahstable 分裂频繁
- 此时会 NULL 值的 hashtable 会消耗大量 内存
- 并且由于此时 nbatch 增加,所以可能导致数据分散到大量的文件中,可能导致产生大量的临时文件,且由每个文件的大小达不到 flush 的标准,不但会消耗 inode, 又会加剧内存的消耗
对于是否需要保留 null 值取决于是不是 outer join , 使用 HJ_FILL_INNER 和 HJ_FILL_OUTER 判断
- 输出的时候,由状态 HJ_FILL_OUTER_TUPLE 和 HJ_FILL_INNER_TUPLES 管理
对于并行
- 这里是 能者多劳 的原则,当新 worker 需要工作的时候, 如果没有多余的任务,例如空闲的 file ,是不会处于等待状态的,他也会加入到 probe 状态中
- 使用 barrier 划分整个 hash join 为 不同的阶段, 用于不同 worker 之间的任务同步
- 读取文件构造 hashtable 的时候,不一定是一个 worker 独占一个 file 。会存在 不同的 worker 读取同一个 file 的可能
- 假如 batch0 频繁分裂,部分数据已经写到文件了,此时处理后续数据的时候,没有看见重新计算之前文件的 tuple hash,在
ExecParallelHashJoinNewBatch
中的ExecParallelHashTableInsertCurrentBatch
看见Assert(batchno == hashtable->curbatch);
,这是怎么保证的 ?
其他 数据库 Hash join 实现及优化方案
- 比较明显的一点是 Hash join 需要在 inner table 构造完 hahstable 之后才启动 outer table,所以完全可以在构造 inner table 的时候,采集 inner table 的信息,用于提前过滤 outer table 的信息
- 当前部分 MPP 数据库已经实现功能
Runtime Filter
- 可进行动态分区裁剪,和
Runtime Filter
类似的思路
- 当前部分 MPP 数据库已经实现功能
- 特殊 条件下,可以在 build hash table 之后,直接判定 join 是否有结果返回, 例如
left anti semi join (not in)
,gp 实现 Squelch 功能(当然此功能不仅仅只是 hash join 使用) - Partition Hash Join, 对 hash table 进行分区,确保所有分区可以 fit in cache
- Hash Join 最大问题在于 cache miss , 其性能损耗可能超过 70%
- 使用计算掩盖 cache miss 的时间,使用 coroutine 实现,依据就是 协程的切换时间需要小于 cache miss 的 prefetch 时间
- 协程 需要是 无栈协程,有栈协程 可能 没有这个先决条件 ?
- 没有深究,但是有效, mark 一下
Read other posts