理论依据

使用 hybrid hash join 实现的hash join,大意是:
outer 和 inner 无法完全使用内存实现 hash join,所以需要把数据分区 dump 到磁盘中,每个分区称为一个 batch,在 优化阶段,会使用已有的统计信息决定分区大小,但是实际执行中, 如果有偏差,则分区按2倍扩容,分区运行时不会减少

nbatch    // number of batches 
nbuckets  // buckets in the in-memory hash table

bucketno = hashvalue % nbuckets
batchno = (hashvalue / nbuckets) % nbatch

hybrid hash join batch 0 不会落盘,算是一种优化

大致过程

1. Scan inner table ,构造 hashtable

可以细分为下面几种情况

  1. 如果判断内存完全可以存下 innert table 的数据,则只使用一个 batch
  2. 否则计算 hashvalue, 使用 hashvalue 计算 batchno ,不属于 batch 0 的数据写到磁盘中对应批次的文件
  3. 如果内存中 hahstable 由于实际使用的数据超过内存上限,则需要进行分裂
    1. nbatch 翻倍, scan hashtable , 使用新的 nbatch 计算 batchno
    2. 属于 batch 0 的数据继续放在内存中,其他的 dump 到磁盘对应的文件
    3. 原来磁盘中已有的文件不动, 后续 probe 的时候才需要重新计算 batch,也可能会写文件

2. Scan outer table ,尝试匹配数据

和第一步情况类似

  1. 读取 outer table 的数据,计算 hashvalue, 使用 nbatch 计算 batchno ,这里需要注意的是 nbatch 是最新值,前面构造 hashtable 的时候可能发生变化
  2. 如果 batchno 匹配当前 hashtable , 则尝试进行匹配
  3. 否则写磁盘到对应批次的文件中
    • 这里需要注意的是,由于之前 inner table 构造 hashtable 的过程中,如果发生 分裂, 则批次较前的临时文件比较大,因为文件中的数据可能不全 属于这一批, 但是对于 outer table,由于是使用新的 nbatch 计算的,所以 tuple 会更分散,所以直观的感受就是临时文件大小是明显小于 inner table 的文件
  4. 处理完当前 批次,则读取磁盘中下一个 batch 文件,此时 inner 表的 hahstable 可能是分裂之前的批次,需要使用新的 nbatch 处理,也可能写磁盘
  5. 重复处理完所有的文件即可

并行

并行需要区分不同的执行计划

  1. hash join 并行,但是 hash 不并行
    inner 侧会在各自得 worker 私有内存中构建 hashtable,然后并行 scan outer,进行 hash join,最终得结果就是 gather 之后得结果
  2. hash join 并行,hash 并行
    inner 侧会在 共享内存中构建一个 hashtable,大小为 work_memo * hash_mem_multiplier * num_workers, 所有的 inner tuple 存入这个 hashtable 中。并行 scan outer,进行 hash join

再加上非并行 hash , hash join 一般有三种情况, inner 并行。 但是 outer 不并行的情况一般没有, 因为 outer 一般是大表

并行有下面几点需要注意:

  • hash 是一个单独得算子,所以是否并行是可以单独使用参数 enable_parallel_hash 控制的
  • hash 的并行也需要考虑内存中是否能完全处理 inner tuple 的情况
    • 如果不能,则除了 batch 0 之外的数据需要 dump 到磁盘
    • 由于此时是使用 SharedTuplestore 进行管理,所以可能存在多个 worker 都写如到一个文件的情况, 可能导致文件太大,后面 load 的时候超过内存限制
      • 所以判断如果此时文件太大,是否需要增加 nbatch 重新计算位置
  • 如果 nbatch 大于1, outer table 也需要 进行类似的分区操作,把不属于 batch 0 的数据 dump 到文件
    • nbatch 0 则直接读取即可
  • batch 0 消耗完,则读取下一个文件,构造 hashtable

CheckPoint

  1. 避免频繁文件读写, 可以适当加大 work_mem, 或者调节 hash_mem_multiplier

  2. 优化器会读取统计信息, 避免数据倾斜的情况,如果存在数据倾斜,则优先把倾斜的数据存放在内存中,使用专门的 skewBucket 管理

    • 但是也需要注意内存的使用量,如果 hashtable 达到内存限制, 则会把最后一个 skewBucket dump 到其对应的临时文件中
  3. 但是对于 NULL 值

    • 并不收集 MCV 信息,所以 key 存在大量 NULL ,且是 outer join 得时候, 可能由于估算不准确,导致 hahstable 分裂频繁
    • 此时会 NULL 值的 hashtable 会消耗大量 内存
    • 并且由于此时 nbatch 增加,所以可能导致数据分散到大量的文件中,可能导致产生大量的临时文件,且由每个文件的大小达不到 flush 的标准,不但会消耗 inode, 又会加剧内存的消耗
  4. 对于是否需要保留 null 值取决于是不是 outer join , 使用 HJ_FILL_INNER 和 HJ_FILL_OUTER 判断

    • 输出的时候,由状态 HJ_FILL_OUTER_TUPLE 和 HJ_FILL_INNER_TUPLES 管理
  5. 对于并行

    • 这里是 能者多劳 的原则,当新 worker 需要工作的时候, 如果没有多余的任务,例如空闲的 file ,是不会处于等待状态的,他也会加入到 probe 状态中
    • 使用 barrier 划分整个 hash join 为 不同的阶段, 用于不同 worker 之间的任务同步
    • 读取文件构造 hashtable 的时候,不一定是一个 worker 独占一个 file 。会存在 不同的 worker 读取同一个 file 的可能
    • 假如 batch0 频繁分裂,部分数据已经写到文件了,此时处理后续数据的时候,没有看见重新计算之前文件的 tuple hash,在 ExecParallelHashJoinNewBatch 中的 ExecParallelHashTableInsertCurrentBatch 看见 Assert(batchno == hashtable->curbatch); ,这是怎么保证的 ?

其他 数据库 Hash join 实现及优化方案

  • 比较明显的一点是 Hash join 需要在 inner table 构造完 hahstable 之后才启动 outer table,所以完全可以在构造 inner table 的时候,采集 inner table 的信息,用于提前过滤 outer table 的信息
    • 当前部分 MPP 数据库已经实现功能 Runtime Filter
    • 可进行动态分区裁剪,和 Runtime Filter 类似的思路
  • 特殊 条件下,可以在 build hash table 之后,直接判定 join 是否有结果返回, 例如 left anti semi join (not in),gp 实现 Squelch 功能(当然此功能不仅仅只是 hash join 使用)
  • Partition Hash Join, 对 hash table 进行分区,确保所有分区可以 fit in cache
  • Hash Join 最大问题在于 cache miss , 其性能损耗可能超过 70%
    • 使用计算掩盖 cache miss 的时间,使用 coroutine 实现,依据就是 协程的切换时间需要小于 cache miss 的 prefetch 时间
    • 协程 需要是 无栈协程,有栈协程 可能 没有这个先决条件 ?
    • 没有深究,但是有效, mark 一下

  • 需要继续了解 pg 的内存模型以及进程间的消息通信机制

  • ref1

  • ref2

  • ref3

  • ref4