语句的具体信息参考pg说明文档

  1. http://postgres.cn/docs/12/routine-vacuuming.html
  2. http://postgres.cn/docs/12/sql-vacuum.html
  3. http://postgres.cn/docs/12/sql-analyze.html

antdb对功能进行增强,可以cn上执行语句然后收集统计信息,更新cn的元数据信息等操作,且数据变动会同步到其他cn节点和gtm 节点

vacuum

cn执行vacuum的入口为ExecVacuum,具体执行函数为vacuum,此函数在后续会被其他节点调用,用来更新本节点的元数据信息,所以他按不同的角色执行不同的代码逻辑

对于当前执行语句的cn,主要任务为

  1. 执行语句
  2. 使用ExecClusterCustomFunction通知其他节点进行vacuum操作
  3. 进行数据中转
    1. 从dn收集统计信息的数据
    2. 转发到其他cn和gtm以及slave

其他cn和gtm

  1. 接受cn的消息,启动vacuum操作,但是由于本地没有数据,所以主要目的是为了能执行相同的代码逻辑,进行元数据更新
  2. 接收cn的数据,更新本节点元数据信息

dn

  1. 接受cn的消息,启动vacuum操作
  2. 更新本节点的元数据信息
  3. 发送数据到cn

大致流程如下图

rplan1

具体的数据交互过程以vacuum t1为例,如下图

rplan1

recvfrom(11, "Q\0\0\0\20vacuum  t1;\0", 8192, 0, NULL, NULL) = 17
sendto(10, "\1\0\0\0 \0\0\0\nX\243\335\215\214\2\0\352\266\233\335\215\214\2\0w2\0\0\0\0\0\0", 32, 0, NULL, 0) = 32
sendto(17, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
sendto(15, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
sendto(50, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
sendto(49, "p\0\0\0\234\v\377\377\377\20\0\0\0cluster_vacuum\0\0\1\0\0"..., 157, 0, NULL, 0) = 157
recvfrom(17, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
recvfrom(15, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
recvfrom(50, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
recvfrom(49, "W\0\0\0\7\1\0\0", 16384, 0, NULL, NULL) = 8
sendto(17, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:59200->b3b87dd4fb6c:65032 (ESTABLISHED) cn1
sendto(15, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:46498->b3b87dd4fb6c:65011 (ESTABLISHED) gtm
sendto(50, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:54044->b3b87dd4fb6c:65013 (ESTABLISHED) dn1
sendto(49, "d\0\0\0\23\200\232\2\0\0public\0t1\0", 20, 0, NULL, 0) = 20 TCP b3b87dd4fb6c:55242->b3b87dd4fb6c:65014 (ESTABLISHED) dn2
recvfrom(50, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 16384, 0, NULL, NULL) = 237
recvfrom(49, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 16384, 0, NULL, NULL) = 237
sendto(17, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 231, 0, NULL, 0) = 231
sendto(15, "d\0\0\0\266T\5\0\0\0\0new_rel_pages\0\377\377\377\377\0\0\0"..., 231, 0, NULL, 0) = 231
recvfrom(17, "d\0\0\0\5Md\0\0\0\5M", 16384, 0, NULL, NULL) = 12
recvfrom(15, "d\0\0\0\5Md\0\0\0\5M", 16384, 0, NULL, NULL) = 12
sendto(10, "\n\0\0\0000\0\0\0w2\0\0\4@\0\0\0\2142\177\262U\0\0y\224\243\335\215\214\2\0"..., 48, 0, NULL, 0) = 48
sendto(17, "c\0\0\0\4", 5, 0, NULL, 0)  = 5
sendto(15, "c\0\0\0\4", 5, 0, NULL, 0)  = 5
sendto(50, "c\0\0\0\4", 5, 0, NULL, 0)  = 5
sendto(49, "c\0\0\0\4", 5, 0, NULL, 0)  = 5
recvfrom(17, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
recvfrom(15, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
recvfrom(50, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
recvfrom(49, "c\0\0\0\4Z\0\0\0\5I", 16384, 0, NULL, NULL) = 11
sendto(10, "\2\0\0\0\230\0\0\0w2\0\0\1\0\0\0\3\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 152, 0, NULL, 0) = 152
sendto(10, "\2\0\0\0\230\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 152, 0, NULL, 0) = 152
sendto(10, "\16\0\0\0H\0\0\0\6\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1\0\0\0\0\0\0\0"..., 72, 0, NULL, 0) = 72
sendto(11, "C\0\0\0\vVACUUM\0Z\0\0\0\5I", 18, 0, NULL, 0) = 18

从具体的数据交互的trace信息可以看出,cn需要使用p消息发送一个函数调用到具体的节点上去执行,cn执行vacuum的函数主要是在vacuum_rel中进行的,这里会把需要操作的表传输到其他节点上去,主要传输的数据如下,他先设置消息类型为CLUSTER_VACUUM_CMD_VACUUM,之后再打包xid和name。消息类型在后面的process_master_vacuum_cmd中会进行处理。CLUSTER_VACUUM_CMD_VACUUM对应vacuum

			appendStringInfoChar(&buf, CLUSTER_VACUUM_CMD_VACUUM);
			appendBinaryStringInfo(&buf, (char*)&xid, sizeof(xid));
			namespace = get_namespace_name(RelationGetNamespace(onerel));
			save_node_string(&buf, namespace);
			save_node_string(&buf, RelationGetRelationName(onerel));

发送数据之后按原来的逻辑继续运行,后面使用lazy_vacuum_rel_ext函数处理vacuum,这里需要注意的是pg支持两种vacuum操作,一种是vacuum lazy,一种是vacuum full,前者不会跨page操作表,只是标记下过期数据然后更新fsm和vm,后者简单的理解会重建的表

(openGauss/PostgreSQL vacuum full源码解析 - 腾讯云开发者社区-腾讯云 (tencent.com))

lazy_vacuum_rel_ext中,则进行具体的vacuum操作,cn会接收dn的数据,转发到其他节点。

	if (!IsToastRelation(onerel) && (onerel->rd_locator_info != NULL))
	{
		if (IsCnMaster() && !IsConnFromCoord())
		{
			if (dn_conns != NIL)
			{
				/* recv vacuum sync info from datanode */
				VACUUM_CLUSTER_DEBUG_LOG((errmsg("cnmaster relname %s wait vacuum sync info from dn\n", RelationGetRelationName(onerel))));
				acquire_vsi_coord_master(dn_conns, &sum_vsi);
				VACUUM_CLUSTER_DEBUG_LOG((errmsg("acquire_vsi_coord_master:\n")));
				PrintDebugVsi(&sum_vsi);
				new_rel_pages = sum_vsi.new_rel_pages;
				new_live_tuples = sum_vsi.new_live_tuples;
				new_rel_allvisible = sum_vsi.new_rel_allvisible;
				new_frozen_xid = sum_vsi.new_frozen_xid;
				new_min_multi = sum_vsi.new_min_multi;
			}

			if (cn_conns != NIL)
			{
				/* send relfreezeid to other coordinator */
				VACUUM_CLUSTER_DEBUG_LOG((errmsg("cnmaster relname %s send vacuum info to other cn slave:\n", RelationGetRelationName(onerel))));
				send_vsi_other_coord(cn_conns, &sum_vsi);
			}

			if (cn_conns)
			{
				VACUUM_CLUSTER_DEBUG_LOG((errmsg("cnmaster relname %s waiter cn slave ack:\n", RelationGetRelationName(onerel))));
				cluster_recv_exec_end(cn_conns);
			}
		}
		else if ((params->options & VACOPT_IN_CLUSTER) == VACOPT_IN_CLUSTER &&
			IS_PGXC_COORDINATOR &&
			IsConnFromCoord())
		{
			/* recv freeze xid from master */
			VACUUM_CLUSTER_DEBUG_LOG((errmsg("cn slave relname %s waiter cn master vacuum info:\n", RelationGetRelationName(onerel))));
			acquire_relvsi_coord_slave(&sum_vsi);
			VACUUM_CLUSTER_DEBUG_LOG((errmsg("cn slave relname %s get cn master vacuum info:\n", RelationGetRelationName(onerel))));
			PrintDebugVsi(&sum_vsi);
			new_rel_pages = sum_vsi.new_rel_pages;
			new_live_tuples = sum_vsi.new_live_tuples;
			new_rel_allvisible = sum_vsi.new_rel_allvisible;
			new_frozen_xid = sum_vsi.new_frozen_xid;
			new_min_multi = sum_vsi.new_min_multi;
		}
	}

对于DN

cn发送的函数为cluster_vacuum,此函数会设置标记VACOPT_IN_CLUSTER,用来区分在vacuum中的角色,之后会调用vacuum函数

void cluster_vacuum(struct StringInfoData *msg)
{
	StringInfoData	buf;
	VacuumParams	params;

	buf.data = mem_toc_lookup(msg, REMOTE_KEY_VACUUM_INFO, &buf.maxlen);
	if (buf.data == NULL)
	{
		ereport(ERROR,
				(errcode(ERRCODE_PROTOCOL_VIOLATION),
				 errmsg("Can not found vacuum info")));
	}
	buf.cursor = 0;
	buf.len = buf.maxlen;

	pq_copymsgbytes(&buf, (char*)&params, sizeof(params));

	params.options |= VACOPT_IN_CLUSTER;
	vacuum(NIL,
		   &params,
		   NULL,
		   true);
}

这里调用的时候relation指定为null,后面SimpleNextCopyFromNewFE会从cn接收数据进行具体的vacuum操作

#ifdef ADB
		if (params->options & VACOPT_IN_CLUSTER)
		{
			ClusterVacuumCmdContext context;
			context.params = params;
			context.use_own_xacts = use_own_xacts;
			context.in_outer_xact = in_outer_xact;

			SimpleNextCopyFromNewFE((SimpleCopyDataFunction)process_master_vacuum_cmd, &context);
		}
#endif /* ADB */

这里的回调函数process_master_vacuum_cmd会按照语句的类型执行不同的方法,如果是vacuum方法,则执行vacuum_rel,最终在lazy_vacuum_rel_ext中,执行完vacuum之后得到具体的统计数据,在更新完pg_class之后,调用send_localvsi_to_coord发送数据到cn。

	if (!IsToastRelation(onerel) && IsConnFromCoord() && IS_PGXC_DATANODE)
	{
		/* send freeze xid to master */
		VACUUM_CLUSTER_DEBUG_LOG((errmsg("dn relname %s send local vacuum info to cn master :\n", RelationGetRelationName(onerel))));
		PrintDebugVsi(&local_vsi);
		send_localvsi_to_coord(&local_vsi);
	}

lazy_vacuum_rel_ext中的流程大致如下

rplan1

analyze

analyze 的大致流程和vacuum类似,vacuum支持使用vacuum analyze命令在进行vacuum之后同时调用analyze收集统计信息。所以在功能上他们是类似的,所以pg把vacuum和analyze实现在同一个函数中,最终在执行的时候,vacuum使用vacuum_rel进行vacuum,之后紧接着使用analyze_rel执行analyze,在analyze_rel中,数据的交互和vacuum类似,在下发到dn的数据打包的时候,标记类型为CLUSTER_VACUUM_CMD_ANALYZE或者CLUSTER_VACUUM_CMD_ANALYZE_FORCE_INH

			initStringInfo(&buf);
			if (onerel->rd_rel->relhassubclass)
				appendStringInfoChar(&buf, CLUSTER_VACUUM_CMD_ANALYZE_FORCE_INH);
			else
				appendStringInfoChar(&buf, CLUSTER_VACUUM_CMD_ANALYZE);
			
			xid = GetTopTransactionId();
			appendBinaryStringInfo(&buf, (char*)&xid, sizeof(xid));
			save_node_string(&buf, namespace);
			save_node_string(&buf, RelationGetRelationName(onerel));

dn上process_master_vacuum_cmd使用analyze_rel函数处理analyze操作,在进行analyze之后,会把数据传送到cn,在analyze_rel中,也区分角色,执行不同的操作流程

	if (acquirefunc == acquire_sample_rows_coord_master ||
		((params->options & VACOPT_IN_CLUSTER) == VACOPT_IN_CLUSTER &&
		 IsCnMaster() &&
		 (onerel->rd_rel->relkind == RELKIND_MATVIEW ||
		  onerel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)))
	{
        ......
		if (onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
		{
			if (dnlist != NIL)
			{
				/* recv relpages from datanode */
				relpages = acquire_relpage_num_coord_master(dnlist);
			}

			if (cnlist != NIL)
			{
				/* send relpages to other coordinator */
				send_relpage_num_to_other_coord(cnlist, relpages);
			}
		}
	}else if (acquirefunc == acquire_sample_rows_coord_slave &&
		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
	{
		/* recv relpages from master */
		relpages = acquire_relpage_num_coord_slave();
	}else if (IS_PGXC_DATANODE &&
		IsConnFromCoord() &&
		(params->options & VACOPT_IN_CLUSTER) == VACOPT_IN_CLUSTER &&
		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
	{
		/* send relpages to master */
		send_relpage_num_to_coord(relpages);
	}

统计信息主要用于指导优化器优化执行计划。元数据中pg_statistic主要存储统计信息,但是不是可以人读的,官网的解释是表中的数据是比较敏感的。所以pg建立了两个视图,pg_statspg_statistic_ext,前者是常规的统计信息,后者主要是自定义表达式的统计信息。

pg_statistic的数据由analyze 收集,具体的收集过程在函数analyze_rel中的do_analyze_rel函数,在analyze_rel前面处理完前置判断之后,do_analyze_rel读表抽样,然后计算统计信息,读表使用的是acquire_sample_rows函数,这里需要注意的是

  1. 抽样比例按表的大小,默认最多只抽取30000行,少于30000则全量统计,有一个参数可以进行控制,
  2. 抽样有具体的算法,和列类型密切相关
  3. 先抽取所有page,然后在page里面在抽样

可以参考代码std_typanalyze里面的片段,其中attstattarget由guc参数default_statistics_target控制,默认为100.

	if (OidIsValid(eqopr) && OidIsValid(ltopr))
	{
		/* Seems to be a scalar datatype */
		stats->compute_stats = compute_scalar_stats;
		/*--------------------
		 * The following choice of minrows is based on the paper
		 * "Random sampling for histogram construction: how much is enough?"
		 * by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
		 * Proceedings of ACM SIGMOD International Conference on Management
		 * of Data, 1998, Pages 436-447.  Their Corollary 1 to Theorem 5
		 * says that for table size n, histogram size k, maximum relative
		 * error in bin size f, and error probability gamma, the minimum
		 * random sample size is
		 *		r = 4 * k * ln(2*n/gamma) / f^2
		 * Taking f = 0.5, gamma = 0.01, n = 10^6 rows, we obtain
		 *		r = 305.82 * k
		 * Note that because of the log function, the dependence on n is
		 * quite weak; even at n = 10^12, a 300*k sample gives <= 0.66
		 * bin size error with probability 0.99.  So there's no real need to
		 * scale for n, which is a good thing because we don't necessarily
		 * know it at this point.
		 *--------------------
		 */
		stats->minrows = 300 * attr->attstattarget;
	}

确定行数之后使用acquire_sample_rows抽取数据,存储到row中,这里需要注意的是他使用不同的随机算法,需要均衡到page和tuples,是有具体的理论(Jeff Vitter)指导的,大致来说就是两阶段采样

第一阶段对数据页进行采样。在这个阶段,数据页是可以准确获得的,使用随机采样法-S算法,相对简单。第二阶段是对元组进行采样由于数据量不可知,采用的是蓄水池算法-Z(Vitter)算法。

之后就是使用具体的统计算法对数据进行处理,当前代码中有

  1. compute_scalar_stats
  2. compute_distinct_stats
  3. compute_trivial_stats

具体适用的场景见注释,大致是否是满足可以使用=>对列进行操作,所以这里的具体的算法选择和列类型有关。

处理完之后使用update_attstats函数更新到 pg_statistics 。

AntDB 在dn处理完自身的任务之后,还会把数据传输到cn节点,由cn再进行计算。cn先是汇总所有的dn抽样之后的数据之后,在此基础上进行30000行的抽样,然后在进行常规的统计信息的计算,他属于抽样之上再抽样。这里不能直接使用dn计算之后的结果直接更新统计信息,因为统计信息需要综合数据,从数据中进行计算,当然如果非常清楚算法的计算过程,然后保留中间结果,然后由cn进行二次计算理论上也可行,只是现在cn上的统计信息则是基于30000*count(dn)计算,应该是可以更准确的。但是没有仔细探究算法实现,所以还不知道是否确实可行。

还有一点问题就是,当前的cn会把数据转发到其他cn或者gtm,由他们在数据上进行二次计算,所以网络io较大,是不是可以直接传输计算的结果,毕竟理论上cn的数据都得保持一致。

使用下面的例子,然后使用strace 观察cn的网络交互,可以明显的看出上面的问题

create table t1 (c1 int , c2 int , c3 int);
insert into t1 select  floor(random() * 100000)::int % 10, floor(random() * 100000)::int % 10, generate_series(1,100000);

analyze verbose t1;
INFO:  "t1":containing 100000 live rows and 0 dead rows;30000 rows in sample, 60000 total sample rows from datanode(s)

strace -p  1443 -e trace=network

总结

  1. vacuum和analyze执行代码在上层是相同的,底层使用analyze_relvacuum_rel执行不同的操作,或者在vacuum analyze table中一起执行。
  2. antdb在其逻辑上进行扩展,在原有的逻辑上添加接口,和其他节点进行数据交互。
  3. vacuum full使用cluster_rel实现,没有实现分布式vacuum功能。执行vacuum full之后,cn的pg_class数据不变,但是dn的是变动的。
  4. vacuum是把dn的数据进行汇总相加,然后更新到元数据信息。
  5. analyze也是类似的操作,但是analyze需要在接收到抽样数据之后在cn上进行抽样再计算,因为pg_statsitics 表中的信息是直方图,直接汇总相加会不正确。
  6. analyze时cn会收集所有dn的抽样数据,转发到其他节点,而不是计算之后的数据,理论上所有cn上的元数据信息应该一致,所以这里是不是可以传输计算之后的数据而不是rows