diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index de6dc416130a..d19fe6a1c61a 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush();
(1 row)
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
- slot_name | spill_txns | spill_count | total_txns | total_bytes
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t | t | t | t
- regression_slot_stats2 | t | t | t | t
- regression_slot_stats3 | t | t | t | t
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Filtered
+-- bytes would be set only when there's a change that was passed to the plugin
+-- but was filtered out. Depending upon the background transactions, filtered
+-- bytes may or may not be zero.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes
+------------------------+------------+-------------+------------+-------------+------------------+------------+----------------
+ regression_slot_stats1 | t | t | t | t | 1 | t | t
+ regression_slot_stats2 | t | t | t | t | 1 | t | t
+ regression_slot_stats3 | t | t | t | t | 1 | t | t
(3 rows)
RESET logical_decoding_work_mem;
@@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
(1 row)
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
- slot_name | spill_txns | spill_count | total_txns | total_bytes
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t | t | f | f
- regression_slot_stats2 | t | t | t | t
- regression_slot_stats3 | t | t | t | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes
+------------------------+------------+-------------+------------+-------------+------------------+------------+----------------
+ regression_slot_stats1 | t | t | f | f | | |
+ regression_slot_stats2 | t | t | t | t | 1 | t | t
+ regression_slot_stats3 | t | t | t | t | 1 | t | t
(3 rows)
-- reset stats for all slots
@@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
(1 row)
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
- slot_name | spill_txns | spill_count | total_txns | total_bytes
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t | t | f | f
- regression_slot_stats2 | t | t | f | f
- regression_slot_stats3 | t | t | f | f
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes
+------------------------+------------+-------------+------------+-------------+------------------+-------------------+-----------------------
+ regression_slot_stats1 | t | t | f | f | | |
+ regression_slot_stats2 | t | t | f | f | | |
+ regression_slot_stats3 | t | t | f | f | | |
(3 rows)
-- verify accessing/resetting stats for non-existent slot does something reasonable
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
- slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+ slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+-------------
+ do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | |
(1 row)
SELECT pg_stat_reset_replication_slot('do-not-exist');
ERROR: replication slot "do-not-exist" does not exist
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
- slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+ slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+-------------
+ do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | |
(1 row)
-- spilling the xact
@@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
- slot_name
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+ slot_name | plugin
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
(3 rows)
-SELECT slot_name FROM pg_stat_replication_slots;
- slot_name
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+ slot_name | plugin
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
(3 rows)
COMMIT;
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index a022fe1bf075..1077cea5855a 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,21 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Filtered
+-- bytes would be set only when there's a change that was passed to the plugin
+-- but was filtered out. Depending upon the background transactions, filtered
+-- bytes may or may not be zero.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
RESET logical_decoding_work_mem;
-- reset stats for one slot, others should be unaffected
SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-- reset stats for all slots
SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-- verify accessing/resetting stats for non-existent slot does something reasonable
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
@@ -46,8 +51,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-SELECT slot_name FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
COMMIT;
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d84..76dd86fc420c 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -23,10 +23,16 @@ sub test_slot_stats
my ($node, $expected, $msg) = @_;
+ # If there are background transactions which are filtered out by the output
+ # plugin, plugin_filtered_bytes may be greater than 0. But it's not
+ # guaranteed that such transactions would be present.
my $result = $node->safe_psql(
'postgres', qq[
SELECT slot_name, total_txns > 0 AS total_txn,
- total_bytes > 0 AS total_bytes
+ total_bytes > 0 AS total_bytes,
+ plugin_sent_txns > 0 AS sent_txn,
+ plugin_sent_bytes > 0 AS sent_bytes,
+ plugin_filtered_bytes >= 0 AS filtered_bytes
FROM pg_stat_replication_slots
ORDER BY slot_name]);
is($result, $expected, $msg);
@@ -80,9 +86,9 @@ sub test_slot_stats
# restart.
test_slot_stats(
$node,
- qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+ qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t
+regression_slot3|t|t|t|t|t),
'check replication statistics are updated');
# Test to remove one of the replication slots and adjust
@@ -104,8 +110,8 @@ sub test_slot_stats
# restart.
test_slot_stats(
$node,
- qq(regression_slot1|t|t
-regression_slot2|t|t),
+ qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t),
'check replication statistics after removing the slot file');
# cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index f671a7d4b312..ea5c527644bf 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->only_local = false;
ctx->output_plugin_private = data;
+ ctx->stats = palloc0(sizeof(OutputPluginStats));
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
opt->receive_rewrites = false;
@@ -310,6 +311,7 @@ static void
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
+ ctx->stats->sentTxns++;
if (data->include_xids)
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
else
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index b803a819cf1f..8ac10cda90c7 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -938,6 +938,33 @@ typedef struct OutputPluginOptions
needs to have a state, it can
use ctx->output_plugin_private to store it.
+
+
+ The startup callback may initialize ctx->stats,
+ typically as follows, if it chooses to maintain and report statistics
+ about its activity in pg_stat_replication_slots.
+
+ctx->stats = palloc0(sizeof(OutputPluginStats));
+
+ where OutputPluginStats is defined as follows:
+
+typedef struct OutputPluginStats
+{
+ int64 sentTxns;
+ int64 sentBytes;
+ int64 filteredBytes;
+} OutputPluginStats;
+
+ sentTxns is the number of transactions sent downstream
+ by the output plugin. sentBytes is the amount of data
+ sent downstream by the output plugin.
+ OutputPluginWrite is expected to update this counter
+ if ctx->stats is initialized by the output plugin.
+ filteredBytes is the size of changes in bytes that are
+ filtered out by the output plugin. Function
+ ReorderBufferChangeSize may be used to find the size of
+ filtered ReorderBufferChange.
+
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3f4a27a736e2..e121f55c9c2a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1545,6 +1545,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ plugin text
+
+
+ The base name of the shared object containing the output plugin this
+ logical slot is using. This column is same as the one in
+ pg_replication_slots.
+
+
+
spill_txns bigint
@@ -1644,6 +1655,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ plugin_filtered_bytes bigint
+
+
+ Amount of changes, from total_bytes, filtered
+ out by the output plugin and not sent downstream. Please note that it
+ does not include the changes filtered before a change is handed over to
+ the output plugin, e.g. the changes filtered by origin. The count is
+ maintained by the output plugin mentioned in
+ plugin. It is NULL when statistics is not
+ initialized or immediately after a reset or when not maintained by the
+ output plugin.
+
+
+
+
+
+ plugin_sent_txns bigint
+
+
+ Number of decoded transactions sent downstream for this slot. This
+ counts top-level transactions only, and is not incremented for
+ subtransactions. These transactions are subset of transctions sent to
+ the decoding plugin. Hence this count is expected to be lesser than or
+ equal to total_txns. The count is maintained
+ by the output plugin mentioned in plugin. It
+ is NULL when statistics is not initialized or immediately after a reset or
+ when not maintained by the output plugin.
+
+
+
+
+
+ plugin_sent_bytesbigint
+
+
+ Amount of transaction changes sent downstream for this slot by the
+ output plugin after applying filtering and converting into its output
+ format. The count is maintained by the output plugin mentioned in
+ plugin. It is NULL when statistics is not
+ initialized or immediately after a reset or when not maintained by the
+ output plugin.
+
+
+
+
stats_reset timestamp with time zone
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c77fa0234bb7..d38c21150b0c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1053,6 +1053,7 @@ CREATE VIEW pg_replication_slots AS
CREATE VIEW pg_stat_replication_slots AS
SELECT
s.slot_name,
+ r.plugin,
s.spill_txns,
s.spill_count,
s.spill_bytes,
@@ -1061,6 +1062,9 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_bytes,
s.total_txns,
s.total_bytes,
+ s.plugin_filtered_bytes,
+ s.plugin_sent_txns,
+ s.plugin_sent_bytes,
s.stats_reset
FROM pg_replication_slots as r,
LATERAL pg_stat_get_replication_slot(slot_name) as s
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c68c0481f427..b26ac29e32f2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1952,13 +1952,14 @@ void
UpdateDecodingStats(LogicalDecodingContext *ctx)
{
ReorderBuffer *rb = ctx->reorder;
+ OutputPluginStats *stats = ctx->stats;
PgStat_StatReplSlotEntry repSlotStat;
/* Nothing to do if we don't have any replication stats to be sent. */
if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64,
rb,
rb->spillTxns,
rb->spillCount,
@@ -1967,7 +1968,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamCount,
rb->streamBytes,
rb->totalTxns,
- rb->totalBytes);
+ rb->totalBytes,
+ stats ? "plugin has stats" : "plugin has no stats",
+ stats ? stats->sentTxns : 0,
+ stats ? stats->sentBytes : 0,
+ stats ? stats->filteredBytes : 0);
repSlotStat.spill_txns = rb->spillTxns;
repSlotStat.spill_count = rb->spillCount;
@@ -1977,6 +1982,15 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_bytes = rb->streamBytes;
repSlotStat.total_txns = rb->totalTxns;
repSlotStat.total_bytes = rb->totalBytes;
+ if (stats)
+ {
+ repSlotStat.plugin_has_stats = true;
+ repSlotStat.sent_txns = stats->sentTxns;
+ repSlotStat.sent_bytes = stats->sentBytes;
+ repSlotStat.filtered_bytes = stats->filteredBytes;
+ }
+ else
+ repSlotStat.plugin_has_stats = false;
pgstat_report_replslot(ctx->slot, &repSlotStat);
@@ -1988,6 +2002,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamBytes = 0;
rb->totalTxns = 0;
rb->totalBytes = 0;
+ if (stats)
+ {
+ stats->sentTxns = 0;
+ stats->sentBytes = 0;
+ stats->filteredBytes = 0;
+ }
}
/*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 25f890ddeeda..788967e2ab1f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -89,6 +89,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+ /*
+ * If output plugin has chosen to maintain its stats, update the amount of
+ * data sent downstream.
+ */
+ if (ctx->stats)
+ ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId);
p->returned_rows++;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4736f993c374..12579dff2c15 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
* memory accounting
* ---------------------------------------
*/
-static Size ReorderBufferChangeSize(ReorderBufferChange *change);
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
ReorderBufferTXN *txn,
@@ -4436,7 +4435,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Size of a change in memory.
*/
-static Size
+Size
ReorderBufferChangeSize(ReorderBufferChange *change)
{
Size sz = sizeof(ReorderBufferChange);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd3..339babbeb568 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -450,6 +450,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ALLOCSET_SMALL_SIZES);
ctx->output_plugin_private = data;
+ ctx->stats = palloc0(sizeof(OutputPluginStats));
/* This plugin uses binary protocol. */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@@ -591,6 +592,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
txndata->sent_begin_txn = true;
+ ctx->stats->sentTxns++;
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
@@ -1469,7 +1471,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *new_slot = NULL;
if (!is_publishable_relation(relation))
+ {
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
return;
+ }
/*
* Remember the xid for the change in streaming mode. We need to send xid
@@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
+ {
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
return;
+ }
break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate)
+ {
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
return;
+ }
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
+ {
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
return;
+ }
/*
* This is only possible if deletes are allowed even when replica
@@ -1505,6 +1519,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!change->data.tp.oldtuple)
{
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
return;
}
break;
@@ -1560,7 +1575,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* of the row filter for old and new tuple.
*/
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+ {
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
goto cleanup;
+ }
/*
* Send BEGIN if we haven't yet.
@@ -1688,6 +1706,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
change->data.truncate.restart_seqs);
OutputPluginWrite(ctx, true);
}
+ else
+ ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
+
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 59822f22b8d0..d9217ce49aaa 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1573,6 +1573,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* output previously gathered data in a CopyData packet */
pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
+ /*
+ * If output plugin maintains statistics, update the amount of data sent
+ * downstream.
+ */
+ if (ctx->stats)
+ ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */
+
CHECK_FOR_INTERRUPTS();
/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index ccfb11c49bf8..ed055324a996 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -96,6 +96,13 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
REPLSLOT_ACC(stream_bytes);
REPLSLOT_ACC(total_txns);
REPLSLOT_ACC(total_bytes);
+ statent->plugin_has_stats = repSlotStat->plugin_has_stats;
+ if (repSlotStat->plugin_has_stats)
+ {
+ REPLSLOT_ACC(sent_txns);
+ REPLSLOT_ACC(sent_bytes);
+ REPLSLOT_ACC(filtered_bytes);
+ }
#undef REPLSLOT_ACC
pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index c756c2bebaaa..796dacddcfb7 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
text *slotname_text = PG_GETARG_TEXT_P(0);
NameData slotname;
TupleDesc tupdesc;
@@ -2129,7 +2129,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "plugin_filtered_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_sent_txns",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2154,11 +2160,23 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
values[6] = Int64GetDatum(slotent->stream_bytes);
values[7] = Int64GetDatum(slotent->total_txns);
values[8] = Int64GetDatum(slotent->total_bytes);
+ if (slotent->plugin_has_stats)
+ {
+ values[9] = Int64GetDatum(slotent->filtered_bytes);
+ values[10] = Int64GetDatum(slotent->sent_txns);
+ values[11] = Int64GetDatum(slotent->sent_bytes);
+ }
+ else
+ {
+ nulls[9] = true;
+ nulls[10] = true;
+ nulls[11] = true;
+ }
if (slotent->stat_reset_timestamp == 0)
- nulls[9] = true;
+ nulls[12] = true;
else
- values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+ values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c876..d3b8bc1835df 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5687,9 +5687,9 @@
{ oid => '6169', descr => 'statistics: information about replication slot',
proname => 'pg_stat_get_replication_slot', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'text',
- proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+ proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
{ oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f402b17295c8..87afeaed8a58 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -395,6 +395,10 @@ typedef struct PgStat_StatReplSlotEntry
PgStat_Counter stream_bytes;
PgStat_Counter total_txns;
PgStat_Counter total_bytes;
+ bool plugin_has_stats;
+ PgStat_Counter sent_txns;
+ PgStat_Counter sent_bytes;
+ PgStat_Counter filtered_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_StatReplSlotEntry;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 2e562bee5a9c..010c59f783d6 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext
OutputPluginCallbacks callbacks;
OutputPluginOptions options;
+ OutputPluginStats *stats;
/*
* User specified options
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71887d..02018f0593c3 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -29,6 +29,19 @@ typedef struct OutputPluginOptions
bool receive_rewrites;
} OutputPluginOptions;
+/*
+ * Statistics about the transactions decoded and sent downstream by the output
+ * plugin.
+ */
+typedef struct OutputPluginStats
+{
+ int64 sentTxns; /* number of transactions decoded and sent
+ * downstream */
+ int64 sentBytes; /* amount of data decoded and sent downstream */
+ int64 filteredBytes; /* amount of data from reoder buffer that was
+ * filtered out by the output plugin */
+} OutputPluginStats;
+
/*
* Type of the shared library symbol _PG_output_plugin_init that is looked up
* when loading an output plugin shared library.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f86..3ea2d9885b6f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -715,6 +715,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids);
extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, ReorderBufferChange *change,
bool toast_insert);
+extern Size ReorderBufferChangeSize(ReorderBufferChange *change);
extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
Snapshot snap, XLogRecPtr lsn,
bool transactional, const char *prefix,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 2137c4e5e305..b04a0d9f8db5 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -212,10 +212,10 @@
# Stats exist for stats test slot 1
is( $node_primary->safe_psql(
'postgres',
- qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+ qq(SELECT total_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
),
- qq(t|t),
- qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+ qq(t|t|t),
+ qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
);
# Do reset of stats for stats test slot 1
@@ -233,10 +233,10 @@
is( $node_primary->safe_psql(
'postgres',
- qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+ qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
),
- qq(t|t),
- qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+ qq(t|t|t),
+ qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and plugin_sent_bytes were set to 0 and NULL respectively.)
);
# Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 35e8aad7701b..2a048af3569c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2132,6 +2132,7 @@ pg_stat_replication| SELECT s.pid,
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_replication_slots| SELECT s.slot_name,
+ r.plugin,
s.spill_txns,
s.spill_count,
s.spill_bytes,
@@ -2140,9 +2141,12 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_bytes,
s.total_txns,
s.total_bytes,
+ s.plugin_filtered_bytes,
+ s.plugin_sent_txns,
+ s.plugin_sent_bytes,
s.stats_reset
FROM pg_replication_slots r,
- LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+ LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset)
WHERE (r.datoid IS NOT NULL);
pg_stat_slru| SELECT name,
blks_zeroed,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e90af5b2ad36..8f6af48b04a2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1830,6 +1830,7 @@ OuterJoinClauseInfo
OutputPluginCallbacks
OutputPluginOptions
OutputPluginOutputType
+OutputPluginStats
OverridingKind
PACE_HEADER
PACL