diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index de6dc416130a..d19fe6a1c61a 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | t | t - regression_slot_stats2 | t | t | t | t - regression_slot_stats3 | t | t | t | t +-- total_txns may vary based on the background activity but sent_txns should +-- always be 1 since the background transactions are always skipped. Filtered +-- bytes would be set only when there's a change that was passed to the plugin +-- but was filtered out. Depending upon the background transactions, filtered +-- bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes +------------------------+------------+-------------+------------+-------------+------------------+------------+---------------- + regression_slot_stats1 | t | t | t | t | 1 | t | t + regression_slot_stats2 | t | t | t | t | 1 | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t (3 rows) RESET logical_decoding_work_mem; @@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | f | f - regression_slot_stats2 | t | t | t | t - regression_slot_stats3 | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes +------------------------+------------+-------------+------------+-------------+------------------+------------+---------------- + regression_slot_stats1 | t | t | f | f | | | + regression_slot_stats2 | t | t | t | t | 1 | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t (3 rows) -- reset stats for all slots @@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | f | f - regression_slot_stats2 | t | t | f | f - regression_slot_stats3 | t | t | f | f +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes +------------------------+------------+-------------+------------+-------------+------------------+-------------------+----------------------- + regression_slot_stats1 | t | t | f | f | | | + regression_slot_stats2 | t | t | f | f | | | + regression_slot_stats3 | t | t | f | f | | | (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) -- spilling the xact @@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) COMMIT; diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index a022fe1bf075..1077cea5855a 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,16 +15,21 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +-- total_txns may vary based on the background activity but sent_txns should +-- always be 1 since the background transactions are always skipped. Filtered +-- bytes would be set only when there's a change that was passed to the plugin +-- but was filtered out. Depending upon the background transactions, filtered +-- bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); @@ -46,8 +51,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; -SELECT slot_name FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; COMMIT; diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 0de62edb7d84..76dd86fc420c 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -23,10 +23,16 @@ sub test_slot_stats my ($node, $expected, $msg) = @_; + # If there are background transactions which are filtered out by the output + # plugin, plugin_filtered_bytes may be greater than 0. But it's not + # guaranteed that such transactions would be present. my $result = $node->safe_psql( 'postgres', qq[ SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes + total_bytes > 0 AS total_bytes, + plugin_sent_txns > 0 AS sent_txn, + plugin_sent_bytes > 0 AS sent_bytes, + plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name]); is($result, $expected, $msg); @@ -80,9 +86,9 @@ sub test_slot_stats # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t -regression_slot3|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t +regression_slot3|t|t|t|t|t), 'check replication statistics are updated'); # Test to remove one of the replication slots and adjust @@ -104,8 +110,8 @@ sub test_slot_stats # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t), 'check replication statistics after removing the slot file'); # cleanup diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index f671a7d4b312..ea5c527644bf 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->only_local = false; ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; opt->receive_rewrites = false; @@ -310,6 +311,7 @@ static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) { OutputPluginPrepareWrite(ctx, last_write); + ctx->stats->sentTxns++; if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b803a819cf1f..8ac10cda90c7 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -938,6 +938,33 @@ typedef struct OutputPluginOptions needs to have a state, it can use ctx->output_plugin_private to store it. + + + The startup callback may initialize ctx->stats, + typically as follows, if it chooses to maintain and report statistics + about its activity in pg_stat_replication_slots. + +ctx->stats = palloc0(sizeof(OutputPluginStats)); + + where OutputPluginStats is defined as follows: + +typedef struct OutputPluginStats +{ + int64 sentTxns; + int64 sentBytes; + int64 filteredBytes; +} OutputPluginStats; + + sentTxns is the number of transactions sent downstream + by the output plugin. sentBytes is the amount of data + sent downstream by the output plugin. + OutputPluginWrite is expected to update this counter + if ctx->stats is initialized by the output plugin. + filteredBytes is the size of changes in bytes that are + filtered out by the output plugin. Function + ReorderBufferChangeSize may be used to find the size of + filtered ReorderBufferChange. + diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3f4a27a736e2..e121f55c9c2a 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1545,6 +1545,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + plugin text + + + The base name of the shared object containing the output plugin this + logical slot is using. This column is same as the one in + pg_replication_slots. + + + spill_txns bigint @@ -1644,6 +1655,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + plugin_filtered_bytes bigint + + + Amount of changes, from total_bytes, filtered + out by the output plugin and not sent downstream. Please note that it + does not include the changes filtered before a change is handed over to + the output plugin, e.g. the changes filtered by origin. The count is + maintained by the output plugin mentioned in + plugin. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + + + + + + plugin_sent_txns bigint + + + Number of decoded transactions sent downstream for this slot. This + counts top-level transactions only, and is not incremented for + subtransactions. These transactions are subset of transctions sent to + the decoding plugin. Hence this count is expected to be lesser than or + equal to total_txns. The count is maintained + by the output plugin mentioned in plugin. It + is NULL when statistics is not initialized or immediately after a reset or + when not maintained by the output plugin. + + + + + + plugin_sent_bytesbigint + + + Amount of transaction changes sent downstream for this slot by the + output plugin after applying filtering and converting into its output + format. The count is maintained by the output plugin mentioned in + plugin. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c77fa0234bb7..d38c21150b0c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1053,6 +1053,7 @@ CREATE VIEW pg_replication_slots AS CREATE VIEW pg_stat_replication_slots AS SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -1061,6 +1062,9 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_bytes, s.total_txns, s.total_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c68c0481f427..b26ac29e32f2 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1952,13 +1952,14 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; + OutputPluginStats *stats = ctx->stats; PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1967,7 +1968,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamCount, rb->streamBytes, rb->totalTxns, - rb->totalBytes); + rb->totalBytes, + stats ? "plugin has stats" : "plugin has no stats", + stats ? stats->sentTxns : 0, + stats ? stats->sentBytes : 0, + stats ? stats->filteredBytes : 0); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; @@ -1977,6 +1982,15 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_bytes = rb->streamBytes; repSlotStat.total_txns = rb->totalTxns; repSlotStat.total_bytes = rb->totalBytes; + if (stats) + { + repSlotStat.plugin_has_stats = true; + repSlotStat.sent_txns = stats->sentTxns; + repSlotStat.sent_bytes = stats->sentBytes; + repSlotStat.filtered_bytes = stats->filteredBytes; + } + else + repSlotStat.plugin_has_stats = false; pgstat_report_replslot(ctx->slot, &repSlotStat); @@ -1988,6 +2002,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamBytes = 0; rb->totalTxns = 0; rb->totalBytes = 0; + if (stats) + { + stats->sentTxns = 0; + stats->sentBytes = 0; + stats->filteredBytes = 0; + } } /* diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 25f890ddeeda..788967e2ab1f 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -89,6 +89,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len)); tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + + /* + * If output plugin has chosen to maintain its stats, update the amount of + * data sent downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId); p->returned_rows++; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4736f993c374..12579dff2c15 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t * memory accounting * --------------------------------------- */ -static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, @@ -4436,7 +4435,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Size of a change in memory. */ -static Size +Size ReorderBufferChangeSize(ReorderBufferChange *change) { Size sz = sizeof(ReorderBufferChange); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 80540c017bd3..339babbeb568 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -450,6 +450,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ALLOCSET_SMALL_SIZES); ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -591,6 +592,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); txndata->sent_begin_txn = true; + ctx->stats->sentTxns++; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -1469,7 +1471,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *new_slot = NULL; if (!is_publishable_relation(relation)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * Remember the xid for the change in streaming mode. We need to send xid @@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * This is only possible if deletes are allowed even when replica @@ -1505,6 +1519,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!change->data.tp.oldtuple) { elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; } break; @@ -1560,7 +1575,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * of the row filter for old and new tuple. */ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); goto cleanup; + } /* * Send BEGIN if we haven't yet. @@ -1688,6 +1706,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, change->data.truncate.restart_seqs); OutputPluginWrite(ctx, true); } + else + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); + MemoryContextSwitchTo(old); MemoryContextReset(data->context); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d0..d9217ce49aaa 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1573,6 +1573,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); + /* + * If output plugin maintains statistics, update the amount of data sent + * downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */ + CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ccfb11c49bf8..ed055324a996 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -96,6 +96,13 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_bytes); REPLSLOT_ACC(total_txns); REPLSLOT_ACC(total_bytes); + statent->plugin_has_stats = repSlotStat->plugin_has_stats; + if (repSlotStat->plugin_has_stats) + { + REPLSLOT_ACC(sent_txns); + REPLSLOT_ACC(sent_bytes); + REPLSLOT_ACC(filtered_bytes); + } #undef REPLSLOT_ACC pgstat_unlock_entry(entry_ref); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index c756c2bebaaa..796dacddcfb7 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 13 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2129,7 +2129,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "plugin_filtered_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_sent_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2154,11 +2160,23 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[6] = Int64GetDatum(slotent->stream_bytes); values[7] = Int64GetDatum(slotent->total_txns); values[8] = Int64GetDatum(slotent->total_bytes); + if (slotent->plugin_has_stats) + { + values[9] = Int64GetDatum(slotent->filtered_bytes); + values[10] = Int64GetDatum(slotent->sent_txns); + values[11] = Int64GetDatum(slotent->sent_bytes); + } + else + { + nulls[9] = true; + nulls[10] = true; + nulls[11] = true; + } if (slotent->stat_reset_timestamp == 0) - nulls[9] = true; + nulls[12] = true; else - values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 03e82d28c876..d3b8bc1835df 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5687,9 +5687,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f402b17295c8..87afeaed8a58 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -395,6 +395,10 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_bytes; PgStat_Counter total_txns; PgStat_Counter total_bytes; + bool plugin_has_stats; + PgStat_Counter sent_txns; + PgStat_Counter sent_bytes; + PgStat_Counter filtered_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9c..010c59f783d6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext OutputPluginCallbacks callbacks; OutputPluginOptions options; + OutputPluginStats *stats; /* * User specified options diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 8d4d5b71887d..02018f0593c3 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -29,6 +29,19 @@ typedef struct OutputPluginOptions bool receive_rewrites; } OutputPluginOptions; +/* + * Statistics about the transactions decoded and sent downstream by the output + * plugin. + */ +typedef struct OutputPluginStats +{ + int64 sentTxns; /* number of transactions decoded and sent + * downstream */ + int64 sentBytes; /* amount of data decoded and sent downstream */ + int64 filteredBytes; /* amount of data from reoder buffer that was + * filtered out by the output plugin */ +} OutputPluginStats; + /* * Type of the shared library symbol _PG_output_plugin_init that is looked up * when loading an output plugin shared library. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index fa0745552f86..3ea2d9885b6f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -715,6 +715,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids); extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert); +extern Size ReorderBufferChangeSize(ReorderBufferChange *change); extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 2137c4e5e305..b04a0d9f8db5 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -212,10 +212,10 @@ # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) + qq(t|t|t), + qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) ); # Do reset of stats for stats test slot 1 @@ -233,10 +233,10 @@ is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.) + qq(t|t|t), + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 35e8aad7701b..2a048af3569c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2132,6 +2132,7 @@ pg_stat_replication| SELECT s.pid, JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -2140,9 +2141,12 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_bytes, s.total_txns, s.total_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e90af5b2ad36..8f6af48b04a2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1830,6 +1830,7 @@ OuterJoinClauseInfo OutputPluginCallbacks OutputPluginOptions OutputPluginOutputType +OutputPluginStats OverridingKind PACE_HEADER PACL