vacuum 和 analyze 过程分析
语句的具体信息参考pg说明文档
- http://postgres.cn/docs/12/routine-vacuuming.html
- http://postgres.cn/docs/12/sql-vacuum.html
- http://postgres.cn/docs/12/sql-analyze.html
antdb对功能进行增强,可以cn上执行语句然后收集统计信息,更新cn的元数据信息等操作,且数据变动会同步到其他cn节点和gtm 节点
vacuum
cn执行vacuum的入口为ExecVacuum
,具体执行函数为vacuum
,此函数在后续会被其他节点调用,用来更新本节点的元数据信息,所以他按不同的角色执行不同的代码逻辑
对于当前执行语句的cn,主要任务为
- 执行语句
- 使用
ExecClusterCustomFunction
通知其他节点进行vacuum操作 - 进行数据中转
- 从dn收集统计信息的数据
- 转发到其他cn和gtm以及slave
其他cn和gtm
- 接受cn的消息,启动vacuum操作,但是由于本地没有数据,所以主要目的是为了能执行相同的代码逻辑,进行元数据更新
- 接收cn的数据,更新本节点元数据信息
dn
- 接受cn的消息,启动vacuum操作
- 更新本节点的元数据信息
- 发送数据到cn
大致流程如下图
具体的数据交互过程以vacuum t1
为例,如下图
recvfrom(11, "Q\0\0\0\20vacuum t1;\0", 8192, 0, NULL, NULL) = 17
sendto(10, "\1\0\0\0 \0\0\0\nX\243\335\215\214\2\0\352\266\233\335\215\214\2\0w2\0\0\0\0\0\0", 32, 0, NULL, 0) = 32
sendto(17, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
sendto(15, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
sendto(50, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
sendto(49, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
recvfrom(17, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
recvfrom(15, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
recvfrom(50, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
recvfrom(49, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
sendto(17, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:59200->b3b87dd4fb6c:65032 (ESTABLISHED) cn1
sendto(15, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:46498->b3b87dd4fb6c:65011 (ESTABLISHED) gtm
sendto(50, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:54044->b3b87dd4fb6c:65013 (ESTABLISHED) dn1
sendto(49, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:55242->b3b87dd4fb6c:65014 (ESTABLISHED) dn2
recvfrom(50, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 16384, 0, NULL, NULL) = 237
recvfrom(49, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 16384, 0, NULL, NULL) = 237
sendto(17, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 231, 0, NULL, 0) = 231
sendto(15, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 231, 0, NULL, 0) = 231
recvfrom(17, "d\0\0\0\5Md\0\0\0\5M", 16384, 0, NULL, NULL) = 12
recvfrom(15, "d\0\0\0\5Md\0\0\0\5M", 16384, 0, NULL, NULL) = 12
sendto(10, "\n\0\0\0000\0\0\0w2\0\0\4@\0\0\0\2142\177\262U\0\0y\224\243\335\215\214\2\0"..., 48, 0, NULL, 0) = 48
sendto(17, "c\0\0\0\4", 5, 0, NULL, 0) = 5
sendto(15, "c\0\0\0\4", 5, 0, NULL, 0) = 5
sendto(50, "c\0\0\0\4", 5, 0, NULL, 0) = 5
sendto(49, "c\0\0\0\4", 5, 0, NULL, 0) = 5
recvfrom(17, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
recvfrom(15, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
recvfrom(50, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
recvfrom(49, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
sendto(10, "\2\0\0\0\230\0\0\0w2\0\0\1\0\0\0\3\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 152, 0, NULL, 0) = 152
sendto(10, "\2\0\0\0\230\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 152, 0, NULL, 0) = 152
sendto(10, "\16\0\0\0H\0\0\0\6\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0"..., 72, 0, NULL, 0) = 72
sendto(11, "C\0\0\0\vVACUUM\0Z\0\0\0\5I", 18, 0, NULL, 0) = 18
从具体的数据交互的trace信息可以看出,cn需要使用p消息发送一个函数调用到具体的节点上去执行,cn执行vacuum的函数主要是在vacuum_rel
中进行的,这里会把需要操作的表传输到其他节点上去,主要传输的数据如下,他先设置消息类型为CLUSTER_VACUUM_CMD_VACUUM
,之后再打包xid和name。消息类型在后面的process_master_vacuum_cmd
中会进行处理。CLUSTER_VACUUM_CMD_VACUUM
对应vacuum
appendStringInfoChar(&buf, CLUSTER_VACUUM_CMD_VACUUM);
appendBinaryStringInfo(&buf, (char*)&xid, sizeof(xid));
namespace = get_namespace_name(RelationGetNamespace(onerel));
save_node_string(&buf, namespace);
save_node_string(&buf, RelationGetRelationName(onerel));
发送数据之后按原来的逻辑继续运行,后面使用lazy_vacuum_rel_ext
函数处理vacuum,这里需要注意的是pg支持两种vacuum操作,一种是vacuum lazy
,一种是vacuum full
,前者不会跨page操作表,只是标记下过期数据然后更新fsm和vm,后者简单的理解会重建的表
(openGauss/PostgreSQL vacuum full源码解析 - 腾讯云开发者社区-腾讯云 (tencent.com))
在lazy_vacuum_rel_ext
中,则进行具体的vacuum操作,cn会接收dn的数据,转发到其他节点。
if (!IsToastRelation(onerel) && (onerel->rd_locator_info != NULL))
{
if (IsCnMaster() && !IsConnFromCoord())
{
if (dn_conns != NIL)
{
/* recv vacuum sync info from datanode */
VACUUM_CLUSTER_DEBUG_LOG((errmsg("cnmaster relname %s wait vacuum sync info from dn\n", RelationGetRelationName(onerel))));
acquire_vsi_coord_master(dn_conns, &sum_vsi);
VACUUM_CLUSTER_DEBUG_LOG((errmsg("acquire_vsi_coord_master:\n")));
PrintDebugVsi(&sum_vsi);
new_rel_pages = sum_vsi.new_rel_pages;
new_live_tuples = sum_vsi.new_live_tuples;
new_rel_allvisible = sum_vsi.new_rel_allvisible;
new_frozen_xid = sum_vsi.new_frozen_xid;
new_min_multi = sum_vsi.new_min_multi;
}
if (cn_conns != NIL)
{
/* send relfreezeid to other coordinator */
VACUUM_CLUSTER_DEBUG_LOG((errmsg("cnmaster relname %s send vacuum info to other cn slave:\n", RelationGetRelationName(onerel))));
send_vsi_other_coord(cn_conns, &sum_vsi);
}
if (cn_conns)
{
VACUUM_CLUSTER_DEBUG_LOG((errmsg("cnmaster relname %s waiter cn slave ack:\n", RelationGetRelationName(onerel))));
cluster_recv_exec_end(cn_conns);
}
}
else if ((params->options & VACOPT_IN_CLUSTER) == VACOPT_IN_CLUSTER &&
IS_PGXC_COORDINATOR &&
IsConnFromCoord())
{
/* recv freeze xid from master */
VACUUM_CLUSTER_DEBUG_LOG((errmsg("cn slave relname %s waiter cn master vacuum info:\n", RelationGetRelationName(onerel))));
acquire_relvsi_coord_slave(&sum_vsi);
VACUUM_CLUSTER_DEBUG_LOG((errmsg("cn slave relname %s get cn master vacuum info:\n", RelationGetRelationName(onerel))));
PrintDebugVsi(&sum_vsi);
new_rel_pages = sum_vsi.new_rel_pages;
new_live_tuples = sum_vsi.new_live_tuples;
new_rel_allvisible = sum_vsi.new_rel_allvisible;
new_frozen_xid = sum_vsi.new_frozen_xid;
new_min_multi = sum_vsi.new_min_multi;
}
}
对于DN
cn发送的函数为cluster_vacuum,此函数会设置标记VACOPT_IN_CLUSTER
,用来区分在vacuum中的角色,之后会调用vacuum函数
void cluster_vacuum(struct StringInfoData *msg)
{
StringInfoData buf;
VacuumParams params;
buf.data = mem_toc_lookup(msg, REMOTE_KEY_VACUUM_INFO, &buf.maxlen);
if (buf.data == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("Can not found vacuum info")));
}
buf.cursor = 0;
buf.len = buf.maxlen;
pq_copymsgbytes(&buf, (char*)¶ms, sizeof(params));
params.options |= VACOPT_IN_CLUSTER;
vacuum(NIL,
¶ms,
NULL,
true);
}
这里调用的时候relation指定为null,后面SimpleNextCopyFromNewFE
会从cn接收数据进行具体的vacuum操作
#ifdef ADB
if (params->options & VACOPT_IN_CLUSTER)
{
ClusterVacuumCmdContext context;
context.params = params;
context.use_own_xacts = use_own_xacts;
context.in_outer_xact = in_outer_xact;
SimpleNextCopyFromNewFE((SimpleCopyDataFunction)process_master_vacuum_cmd, &context);
}
#endif /* ADB */
这里的回调函数process_master_vacuum_cmd
会按照语句的类型执行不同的方法,如果是vacuum方法,则执行vacuum_rel
,最终在lazy_vacuum_rel_ext
中,执行完vacuum之后得到具体的统计数据,在更新完pg_class
之后,调用send_localvsi_to_coord
发送数据到cn。
if (!IsToastRelation(onerel) && IsConnFromCoord() && IS_PGXC_DATANODE)
{
/* send freeze xid to master */
VACUUM_CLUSTER_DEBUG_LOG((errmsg("dn relname %s send local vacuum info to cn master :\n", RelationGetRelationName(onerel))));
PrintDebugVsi(&local_vsi);
send_localvsi_to_coord(&local_vsi);
}
lazy_vacuum_rel_ext
中的流程大致如下
analyze
analyze 的大致流程和vacuum类似,vacuum支持使用vacuum analyze
命令在进行vacuum之后同时调用analyze收集统计信息。所以在功能上他们是类似的,所以pg把vacuum和analyze实现在同一个函数中,最终在执行的时候,vacuum使用vacuum_rel
进行vacuum,之后紧接着使用analyze_rel
执行analyze,在analyze_rel
中,数据的交互和vacuum类似,在下发到dn的数据打包的时候,标记类型为CLUSTER_VACUUM_CMD_ANALYZE
或者CLUSTER_VACUUM_CMD_ANALYZE_FORCE_INH
initStringInfo(&buf);
if (onerel->rd_rel->relhassubclass)
appendStringInfoChar(&buf, CLUSTER_VACUUM_CMD_ANALYZE_FORCE_INH);
else
appendStringInfoChar(&buf, CLUSTER_VACUUM_CMD_ANALYZE);
xid = GetTopTransactionId();
appendBinaryStringInfo(&buf, (char*)&xid, sizeof(xid));
save_node_string(&buf, namespace);
save_node_string(&buf, RelationGetRelationName(onerel));
dn上process_master_vacuum_cmd
使用analyze_rel函数处理analyze操作,在进行analyze之后,会把数据传送到cn,在analyze_rel中,也区分角色,执行不同的操作流程
if (acquirefunc == acquire_sample_rows_coord_master ||
((params->options & VACOPT_IN_CLUSTER) == VACOPT_IN_CLUSTER &&
IsCnMaster() &&
(onerel->rd_rel->relkind == RELKIND_MATVIEW ||
onerel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)))
{
......
if (onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
{
if (dnlist != NIL)
{
/* recv relpages from datanode */
relpages = acquire_relpage_num_coord_master(dnlist);
}
if (cnlist != NIL)
{
/* send relpages to other coordinator */
send_relpage_num_to_other_coord(cnlist, relpages);
}
}
}else if (acquirefunc == acquire_sample_rows_coord_slave &&
onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
{
/* recv relpages from master */
relpages = acquire_relpage_num_coord_slave();
}else if (IS_PGXC_DATANODE &&
IsConnFromCoord() &&
(params->options & VACOPT_IN_CLUSTER) == VACOPT_IN_CLUSTER &&
onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
{
/* send relpages to master */
send_relpage_num_to_coord(relpages);
}
统计信息主要用于指导优化器优化执行计划。元数据中pg_statistic主要存储统计信息,但是不是可以人读的,官网的解释是表中的数据是比较敏感的。所以pg建立了两个视图,pg_stats
和pg_statistic_ext
,前者是常规的统计信息,后者主要是自定义表达式的统计信息。
pg_statistic的数据由analyze 收集,具体的收集过程在函数analyze_rel
中的do_analyze_rel
函数,在analyze_rel
前面处理完前置判断之后,do_analyze_rel
读表抽样,然后计算统计信息,读表使用的是acquire_sample_rows
函数,这里需要注意的是
- 抽样比例按表的大小,默认最多只抽取30000行,少于30000则全量统计,有一个参数可以进行控制,
- 抽样有具体的算法,和列类型密切相关
- 先抽取所有page,然后在page里面在抽样
可以参考代码std_typanalyze
里面的片段,其中attstattarget由guc参数default_statistics_target控制,默认为100.
if (OidIsValid(eqopr) && OidIsValid(ltopr))
{
/* Seems to be a scalar datatype */
stats->compute_stats = compute_scalar_stats;
/*--------------------
* The following choice of minrows is based on the paper
* "Random sampling for histogram construction: how much is enough?"
* by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
* Proceedings of ACM SIGMOD International Conference on Management
* of Data, 1998, Pages 436-447. Their Corollary 1 to Theorem 5
* says that for table size n, histogram size k, maximum relative
* error in bin size f, and error probability gamma, the minimum
* random sample size is
* r = 4 * k * ln(2*n/gamma) / f^2
* Taking f = 0.5, gamma = 0.01, n = 10^6 rows, we obtain
* r = 305.82 * k
* Note that because of the log function, the dependence on n is
* quite weak; even at n = 10^12, a 300*k sample gives <= 0.66
* bin size error with probability 0.99. So there's no real need to
* scale for n, which is a good thing because we don't necessarily
* know it at this point.
*--------------------
*/
stats->minrows = 300 * attr->attstattarget;
}
确定行数之后使用acquire_sample_rows
抽取数据,存储到row中,这里需要注意的是他使用不同的随机算法,需要均衡到page和tuples,是有具体的理论(Jeff Vitter)指导的,大致来说就是两阶段采样
第一阶段对数据页进行采样。在这个阶段,数据页是可以准确获得的,使用随机采样法-S算法,相对简单。第二阶段是对元组进行采样由于数据量不可知,采用的是蓄水池算法-Z(Vitter)算法。
之后就是使用具体的统计算法对数据进行处理,当前代码中有
compute_scalar_stats
compute_distinct_stats
compute_trivial_stats
具体适用的场景见注释,大致是否是满足可以使用=
和>
对列进行操作,所以这里的具体的算法选择和列类型有关。
处理完之后使用update_attstats
函数更新到 pg_statistics 。
AntDB 在dn处理完自身的任务之后,还会把数据传输到cn节点,由cn再进行计算。cn先是汇总所有的dn抽样之后的数据之后,在此基础上进行30000行的抽样,然后在进行常规的统计信息的计算,他属于抽样之上再抽样。这里不能直接使用dn计算之后的结果直接更新统计信息,因为统计信息需要综合数据,从数据中进行计算,当然如果非常清楚算法的计算过程,然后保留中间结果,然后由cn进行二次计算理论上也可行,只是现在cn上的统计信息则是基于30000*count(dn)
计算,应该是可以更准确的。但是没有仔细探究算法实现,所以还不知道是否确实可行。
还有一点问题就是,当前的cn会把数据转发到其他cn或者gtm,由他们在数据上进行二次计算,所以网络io较大,是不是可以直接传输计算的结果,毕竟理论上cn的数据都得保持一致。
使用下面的例子,然后使用strace 观察cn的网络交互,可以明显的看出上面的问题
create table t1 (c1 int , c2 int , c3 int);
insert into t1 select floor(random() * 100000)::int % 10, floor(random() * 100000)::int % 10, generate_series(1,100000);
analyze verbose t1;
INFO: "t1":containing 100000 live rows and 0 dead rows;30000 rows in sample, 60000 total sample rows from datanode(s)
strace -p 1443 -e trace=network
总结
- vacuum和analyze执行代码在上层是相同的,底层使用
analyze_rel
和vacuum_rel
执行不同的操作,或者在vacuum analyze table
中一起执行。 - antdb在其逻辑上进行扩展,在原有的逻辑上添加接口,和其他节点进行数据交互。
vacuum full
使用cluster_rel
实现,没有实现分布式vacuum功能。执行vacuum full
之后,cn的pg_class数据不变,但是dn的是变动的。- vacuum是把dn的数据进行汇总相加,然后更新到元数据信息。
- analyze也是类似的操作,但是analyze需要在接收到抽样数据之后在cn上进行抽样再计算,因为pg_statsitics 表中的信息是直方图,直接汇总相加会不正确。
- analyze时cn会收集所有dn的抽样数据,转发到其他节点,而不是计算之后的数据,理论上所有cn上的元数据信息应该一致,所以这里是不是可以传输计算之后的数据而不是rows