Resume conflict-relevant data retention automatically.
authorAmit Kapila <akapila@postgresql.org>
Mon, 15 Sep 2025 08:44:54 +0000 (08:44 +0000)
committerAmit Kapila <akapila@postgresql.org>
Mon, 15 Sep 2025 08:46:55 +0000 (08:46 +0000)
This commit resumes automatic retention of conflict-relevant data for a
subscription. Previously, retention would stop if the apply process failed
to advance its xmin (oldest_nonremovable_xid) within the configured
max_retention_duration and user needs to manually re-enable
retain_dead_tuples option. With this change, retention will resume
automatically once the apply worker catches up and begins advancing its
xmin (oldest_nonremovable_xid) within the configured threshold.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com

doc/src/sgml/ref/create_subscription.sgml
src/backend/replication/logical/launcher.c
src/backend/replication/logical/worker.c
src/test/subscription/t/035_conflicts.pl

index fc3144373110f96c638da01d18723cb486e5e772..ed82cf1809e5526bf97d3bb72425623cb23adbdf 100644 (file)
@@ -538,10 +538,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           <literal>retain_dead_tuples</literal> is enabled, confirm that the
           retention duration has exceeded the
           <literal>max_retention_duration</literal> set within the corresponding
-          subscription. The retention will not be automatically resumed unless a
-          new subscription is created with <literal>retain_dead_tuples =
-          true</literal>, or the user manually re-enables
-          <literal>retain_dead_tuples</literal>.
+          subscription. The retention will automatically resume when at least one
+          apply worker confirms that the retention duration is within the
+          specified limit, or when a new subscription is created with
+          <literal>retain_dead_tuples = true</literal>. Alternatively, retention
+          can be manually resumed by re-enabling <literal>retain_dead_tuples</literal>.
          </para>
          <para>
           Note that overall retention will not stop if other subscriptions that
index add2e2e066c38ffabba9a4b3088b22c2a5b3bcd2..c900b6cf3b163ca7cdc47705b94cb1103099ab32 100644 (file)
@@ -1261,24 +1261,30 @@ ApplyLauncherMain(Datum main_arg)
 
            LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
            w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-           LWLockRelease(LogicalRepWorkerLock);
 
            if (w != NULL)
            {
                /*
                 * Compute the minimum xmin required to protect dead tuples
                 * required for conflict detection among all running apply
-                * workers.
+                * workers. This computation is performed while holding
+                * LogicalRepWorkerLock to prevent accessing invalid worker
+                * data, in scenarios where a worker might exit and reset its
+                * state concurrently.
                 */
                if (sub->retaindeadtuples &&
                    sub->retentionactive &&
                    can_update_xmin)
                    compute_min_nonremovable_xid(w, &xmin);
 
+               LWLockRelease(LogicalRepWorkerLock);
+
                /* worker is running already */
                continue;
            }
 
+           LWLockRelease(LogicalRepWorkerLock);
+
            /*
             * Can't advance xmin of the slot unless all the workers
             * corresponding to subscriptions actively retaining dead tuples
index ee6ac22329fdc5c33b102781e6072d562287cbba..9b5885d57cf804b11f957a826580f923a7c4c135 100644 (file)
  *   pg_subscription.subretentionactive is updated to false within a new
  *   transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
  *
+ * - RDT_RESUME_CONFLICT_INFO_RETENTION:
+ *   This phase is required only when max_retention_duration is defined. We
+ *   enter this phase if the retention was previously stopped, and the time
+ *   required to advance the non-removable transaction ID in the
+ *   RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
+ *   (or if max_retention_duration is set to 0). During this phase,
+ *   pg_subscription.subretentionactive is updated to true within a new
+ *   transaction, and the worker will be restarted.
+ *
  * The overall state progression is: GET_CANDIDATE_XID ->
  * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
  * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -381,7 +390,8 @@ typedef enum
    RDT_REQUEST_PUBLISHER_STATUS,
    RDT_WAIT_FOR_PUBLISHER_STATUS,
    RDT_WAIT_FOR_LOCAL_FLUSH,
-   RDT_STOP_CONFLICT_INFO_RETENTION
+   RDT_STOP_CONFLICT_INFO_RETENTION,
+   RDT_RESUME_CONFLICT_INFO_RETENTION,
 } RetainDeadTuplesPhase;
 
 /*
@@ -568,10 +578,14 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
 static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
 static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
 static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static bool update_retention_status(bool active);
 static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
 static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
                                        bool new_xid_found);
 
+static void apply_worker_exit(void);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
                                         ResultRelInfo *relinfo,
@@ -4367,10 +4381,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
    if (!MySubscription->retaindeadtuples)
        return false;
 
-   /* No need to advance if we have already stopped retaining */
-   if (!MySubscription->retentionactive)
-       return false;
-
    return true;
 }
 
@@ -4399,6 +4409,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
        case RDT_STOP_CONFLICT_INFO_RETENTION:
            stop_conflict_info_retention(rdt_data);
            break;
+       case RDT_RESUME_CONFLICT_INFO_RETENTION:
+           resume_conflict_info_retention(rdt_data);
+           break;
    }
 }
 
@@ -4522,7 +4535,10 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
     * retaining conflict information for this worker.
     */
    if (should_stop_conflict_info_retention(rdt_data))
+   {
+       rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
        return;
+   }
 
    if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
        rdt_data->remote_wait_for = rdt_data->remote_nextxid;
@@ -4643,7 +4659,10 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
     * retaining conflict information for this worker.
     */
    if (should_stop_conflict_info_retention(rdt_data))
+   {
+       rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
        return;
+   }
 
    /*
     * Update and check the remote flush position if we are applying changes
@@ -4672,6 +4691,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
    if (last_flushpos < rdt_data->remote_lsn)
        return;
 
+   /*
+    * Reaching this point implies should_stop_conflict_info_retention()
+    * returned false earlier, meaning that the most recent duration for
+    * advancing the non-removable transaction ID is within the
+    * max_retention_duration or max_retention_duration is set to 0.
+    *
+    * Therefore, if conflict info retention was previously stopped due to a
+    * timeout, it is now safe to resume retention.
+    */
+   if (!MySubscription->retentionactive)
+   {
+       rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
+       return;
+   }
+
    /*
     * Reaching here means the remote WAL position has been received, and all
     * transactions up to that position on the publisher have been applied and
@@ -4698,13 +4732,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
  * Check whether conflict information retention should be stopped due to
  * exceeding the maximum wait time (max_retention_duration).
  *
- * If retention should be stopped, transition to the
- * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
- * false.
- *
- * Note: Retention won't be resumed automatically. The user must manually
- * disable retain_dead_tuples and re-enable it after confirming that the
- * replication slot maintained by the launcher has been dropped.
+ * If retention should be stopped, return true. Otherwise, return false.
  */
 static bool
 should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4735,11 +4763,6 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
                                    rdt_data->table_sync_wait_time))
        return false;
 
-   rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
-
-   /* process the next phase */
-   process_rdt_phase_transition(rdt_data, false);
-
    return true;
 }
 
@@ -4748,6 +4771,86 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
  */
 static void
 stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+   /* Stop retention if not yet */
+   if (MySubscription->retentionactive)
+   {
+       /*
+        * If the retention status cannot be updated (e.g., due to active
+        * transaction), skip further processing to avoid inconsistent
+        * retention behavior.
+        */
+       if (!update_retention_status(false))
+           return;
+
+       SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+       MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+       SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+       ereport(LOG,
+               errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
+                      MySubscription->name),
+               errdetail("Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms.",
+                         MySubscription->maxretention));
+   }
+
+   Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
+
+   /*
+    * If retention has been stopped, reset to the initial phase to retry
+    * resuming retention. This reset is required to recalculate the current
+    * wait time and resume retention if the time falls within
+    * max_retention_duration.
+    */
+   reset_retention_data_fields(rdt_data);
+}
+
+/*
+ * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
+ */
+static void
+resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+   /* We can't resume retention without updating retention status. */
+   if (!update_retention_status(true))
+       return;
+
+   ereport(LOG,
+           errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
+                  MySubscription->name),
+           MySubscription->maxretention
+           ? errdetail("Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms.",
+                       MySubscription->maxretention)
+           : errdetail("Retention is re-enabled as max_retention_duration is set to unlimited."));
+
+   /*
+    * Restart the worker to let the launcher initialize
+    * oldest_nonremovable_xid at startup.
+    *
+    * While it's technically possible to derive this value on-the-fly using
+    * the conflict detection slot's xmin, doing so risks a race condition:
+    * the launcher might clean slot.xmin just after retention resumes. This
+    * would make oldest_nonremovable_xid unreliable, especially during xid
+    * wraparound.
+    *
+    * Although this can be prevented by introducing heavy weight locking, the
+    * complexity it will bring doesn't seem worthwhile given how rarely
+    * retention is resumed.
+    */
+   apply_worker_exit();
+}
+
+/*
+ * Updates pg_subscription.subretentionactive to the given value within a
+ * new transaction.
+ *
+ * If already inside an active transaction, skips the update and returns
+ * false.
+ *
+ * Returns true if the update is successfully performed.
+ */
+static bool
+update_retention_status(bool active)
 {
    /*
     * Do not update the catalog during an active transaction. The transaction
@@ -4755,7 +4858,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
     * rollback of catalog updates if the application fails subsequently.
     */
    if (IsTransactionState())
-       return;
+       return false;
 
    StartTransactionCommand();
 
@@ -4765,26 +4868,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
     */
    PushActiveSnapshot(GetTransactionSnapshot());
 
-   /* Set pg_subscription.subretentionactive to false */
-   UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
+   /* Update pg_subscription.subretentionactive */
+   UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
 
    PopActiveSnapshot();
    CommitTransactionCommand();
 
-   SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-   MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
-   SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-   ereport(LOG,
-           errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
-                  MySubscription->name),
-           errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
-                     MySubscription->maxretention));
-
    /* Notify launcher to update the conflict slot */
    ApplyLauncherWakeup();
 
-   reset_retention_data_fields(rdt_data);
+   MySubscription->retentionactive = active;
+
+   return true;
 }
 
 /*
@@ -4809,19 +4904,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
 /*
  * Adjust the interval for advancing non-removable transaction IDs.
  *
- * If there is no activity on the node, we progressively double the interval
- * used to advance non-removable transaction ID. This helps conserve CPU
- * and network resources when there's little benefit to frequent updates.
+ * If there is no activity on the node or retention has been stopped, we
+ * progressively double the interval used to advance non-removable transaction
+ * ID. This helps conserve CPU and network resources when there's little benefit
+ * to frequent updates.
  *
  * The interval is capped by the lowest of the following:
- * - wal_receiver_status_interval (if set),
+ * - wal_receiver_status_interval (if set and retention is active),
  * - a default maximum of 3 minutes,
- * - max_retention_duration.
+ * - max_retention_duration (if retention is active).
  *
- * This ensures the interval never exceeds the retention boundary, even if
- * other limits are higher. Once activity resumes on the node, the interval
- * is reset to lesser of 100ms and max_retention_duration, allowing timely
- * advancement of non-removable transaction ID.
+ * This ensures the interval never exceeds the retention boundary, even if other
+ * limits are higher. Once activity resumes on the node and the retention is
+ * active, the interval is reset to lesser of 100ms and max_retention_duration,
+ * allowing timely advancement of non-removable transaction ID.
  *
  * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
  * consider the other interval or a separate GUC if the need arises.
@@ -4829,7 +4925,7 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
 static void
 adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
 {
-   if (!new_xid_found && rdt_data->xid_advance_interval)
+   if (rdt_data->xid_advance_interval && !new_xid_found)
    {
        int         max_interval = wal_receiver_status_interval
            ? wal_receiver_status_interval * 1000
@@ -4842,6 +4938,18 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
        rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
                                             max_interval);
    }
+   else if (rdt_data->xid_advance_interval &&
+            !MySubscription->retentionactive)
+   {
+       /*
+        * Retention has been stopped, so double the interval-capped at a
+        * maximum of 3 minutes. The wal_receiver_status_interval is
+        * intentionally not used as a upper bound, since the likelihood of
+        * retention resuming is lower than that of general activity resuming.
+        */
+       rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+                                            MAX_XID_ADVANCE_INTERVAL);
+   }
    else
    {
        /*
@@ -4851,9 +4959,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
        rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
    }
 
-   /* Ensure the wait time remains within the maximum limit */
-   rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
-                                        MySubscription->maxretention);
+   /*
+    * Ensure the wait time remains within the maximum retention time limit
+    * when retention is active.
+    */
+   if (MySubscription->retentionactive)
+       rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+                                            MySubscription->maxretention);
 }
 
 /*
index 880551fc69d74fdc96e35c0e35929cf14fcd072d..f2aee0f70df215a719a133dc5dacae8365cf4ce9 100644 (file)
@@ -631,6 +631,33 @@ $node_B->safe_psql('postgres',
 $node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
 $node_B->reload;
 
+###############################################################################
+# Check that dead tuple retention resumes when the max_retention_duration is set
+# 0.
+###############################################################################
+
+$log_offset = -s $node_A->logfile;
+
+# Set max_retention_duration to 0
+$node_A->safe_psql('postgres',
+   "ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 0);");
+
+# Confirm that the retention resumes
+$node_A->wait_for_log(
+   qr/logical replication worker for subscription "tap_sub_a_b" will resume retaining the information for detecting conflicts
+.*DETAIL:.* Retention is re-enabled as max_retention_duration is set to unlimited.*/,
+   $log_offset);
+
+ok( $node_A->poll_query_until(
+       'postgres',
+       "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+   ),
+   "the xmin value of slot 'pg_conflict_detection' is valid on Node A");
+
+$result = $node_A->safe_psql('postgres',
+   "SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
+is($result, qq(t), 'retention is active');
+
 ###############################################################################
 # Check that the replication slot pg_conflict_detection is dropped after
 # removing all the subscriptions.