Skip to content

Commit c33771e

Browse files
Cary HuangCommitfest Bot
authored andcommitted
v7 parallel TID range scan patch
1 parent 6551a05 commit c33771e

File tree

14 files changed

+377
-18
lines changed

14 files changed

+377
-18
lines changed

src/backend/access/heap/heapam.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,6 +1478,19 @@ heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
14781478
/* Set the start block and number of blocks to scan */
14791479
heap_setscanlimits(sscan, startBlk, numBlks);
14801480

1481+
/*
1482+
* If parallel mode is used, store startBlk and numBlks in parallel
1483+
* scan descriptor as well.
1484+
*/
1485+
if (scan->rs_base.rs_parallel != NULL)
1486+
{
1487+
ParallelBlockTableScanDesc bpscan = NULL;
1488+
1489+
bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
1490+
bpscan->phs_startblock = startBlk;
1491+
bpscan->phs_numblock = numBlks;
1492+
}
1493+
14811494
/* Finally, set the TID range in sscan */
14821495
ItemPointerCopy(&lowestItem, &sscan->st.tidrange.rs_mintid);
14831496
ItemPointerCopy(&highestItem, &sscan->st.tidrange.rs_maxtid);

src/backend/access/table/tableam.c

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,34 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
188188
pscan, flags);
189189
}
190190

191+
TableScanDesc
192+
table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan)
193+
{
194+
Snapshot snapshot;
195+
uint32 flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
196+
TableScanDesc sscan;
197+
198+
Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator));
199+
200+
if (!pscan->phs_snapshot_any)
201+
{
202+
/* Snapshot was serialized -- restore it */
203+
snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
204+
RegisterSnapshot(snapshot);
205+
flags |= SO_TEMP_SNAPSHOT;
206+
}
207+
else
208+
{
209+
/* SnapshotAny passed by caller (not serialized) */
210+
snapshot = SnapshotAny;
211+
}
212+
213+
sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
214+
pscan, flags);
215+
216+
return sscan;
217+
}
218+
191219

192220
/* ----------------------------------------------------------------------------
193221
* Index scan related functions.
@@ -398,6 +426,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
398426
bpscan->phs_nblocks > NBuffers / 4;
399427
SpinLockInit(&bpscan->phs_mutex);
400428
bpscan->phs_startblock = InvalidBlockNumber;
429+
bpscan->phs_numblock = InvalidBlockNumber;
401430
pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
402431

403432
return sizeof(ParallelBlockTableScanDescData);
@@ -577,8 +606,22 @@ table_block_parallelscan_nextpage(Relation rel,
577606
pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
578607
}
579608

609+
/*
610+
* In a parallel TID range scan, 'pbscan->phs_numblock' is non-zero if an
611+
* upper TID range limit is specified, or InvalidBlockNumber if no limit
612+
* is given. This value may be less than or equal to 'pbscan->phs_nblocks'
613+
* , which is the total number of blocks in the relation.
614+
*
615+
* The scan can terminate early once 'nallocated' reaches
616+
* 'pbscan->phs_numblock', even if the full relation has remaining blocks
617+
* to scan. This ensures that parallel workers only scan the subset of
618+
* blocks that fall within the TID range.
619+
*/
580620
if (nallocated >= pbscan->phs_nblocks)
581-
page = InvalidBlockNumber; /* all blocks have been allocated */
621+
page = InvalidBlockNumber; /* all blocks have been allocated */
622+
else if (pbscan->phs_numblock != InvalidBlockNumber &&
623+
nallocated >= pbscan->phs_numblock)
624+
page = InvalidBlockNumber; /* upper scan limit reached */
582625
else
583626
page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
584627

src/backend/executor/execParallel.c

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "executor/nodeSort.h"
4242
#include "executor/nodeSubplan.h"
4343
#include "executor/tqueue.h"
44+
#include "executor/nodeTidrangescan.h"
4445
#include "jit/jit.h"
4546
#include "nodes/nodeFuncs.h"
4647
#include "pgstat.h"
@@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
266267
ExecForeignScanEstimate((ForeignScanState *) planstate,
267268
e->pcxt);
268269
break;
270+
case T_TidRangeScanState:
271+
if (planstate->plan->parallel_aware)
272+
ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
273+
e->pcxt);
274+
break;
269275
case T_AppendState:
270276
if (planstate->plan->parallel_aware)
271277
ExecAppendEstimate((AppendState *) planstate,
@@ -493,6 +499,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
493499
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
494500
d->pcxt);
495501
break;
502+
case T_TidRangeScanState:
503+
if (planstate->plan->parallel_aware)
504+
ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
505+
d->pcxt);
506+
break;
496507
case T_AppendState:
497508
if (planstate->plan->parallel_aware)
498509
ExecAppendInitializeDSM((AppendState *) planstate,
@@ -994,6 +1005,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
9941005
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
9951006
pcxt);
9961007
break;
1008+
case T_TidRangeScanState:
1009+
if (planstate->plan->parallel_aware)
1010+
ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1011+
pcxt);
1012+
break;
9971013
case T_AppendState:
9981014
if (planstate->plan->parallel_aware)
9991015
ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
@@ -1020,7 +1036,6 @@ ExecParallelReInitializeDSM(PlanState *planstate,
10201036
case T_MemoizeState:
10211037
/* these nodes have DSM state, but no reinitialization is required */
10221038
break;
1023-
10241039
default:
10251040
break;
10261041
}
@@ -1362,6 +1377,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
13621377
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
13631378
pwcxt);
13641379
break;
1380+
case T_TidRangeScanState:
1381+
if (planstate->plan->parallel_aware)
1382+
ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1383+
pwcxt);
1384+
break;
13651385
case T_AppendState:
13661386
if (planstate->plan->parallel_aware)
13671387
ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);

src/backend/executor/nodeTidrangescan.c

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,3 +405,84 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
405405
*/
406406
return tidrangestate;
407407
}
408+
/* ----------------------------------------------------------------
409+
* Parallel Scan Support
410+
* ----------------------------------------------------------------
411+
*/
412+
413+
/* ----------------------------------------------------------------
414+
* ExecTidRangeScanEstimate
415+
*
416+
* Compute the amount of space we'll need in the parallel
417+
* query DSM, and inform pcxt->estimator about our needs.
418+
* ----------------------------------------------------------------
419+
*/
420+
void
421+
ExecTidRangeScanEstimate(TidRangeScanState *node,
422+
ParallelContext *pcxt)
423+
{
424+
EState *estate = node->ss.ps.state;
425+
426+
node->trss_pscanlen = table_parallelscan_estimate(node->ss.ss_currentRelation,
427+
estate->es_snapshot);
428+
shm_toc_estimate_chunk(&pcxt->estimator, node->trss_pscanlen);
429+
shm_toc_estimate_keys(&pcxt->estimator, 1);
430+
}
431+
432+
/* ----------------------------------------------------------------
433+
* ExecTidRangeScanInitializeDSM
434+
*
435+
* Set up a parallel TID scan descriptor.
436+
* ----------------------------------------------------------------
437+
*/
438+
void
439+
ExecTidRangeScanInitializeDSM(TidRangeScanState *node,
440+
ParallelContext *pcxt)
441+
{
442+
EState *estate = node->ss.ps.state;
443+
ParallelTableScanDesc pscan;
444+
445+
pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen);
446+
table_parallelscan_initialize(node->ss.ss_currentRelation,
447+
pscan,
448+
estate->es_snapshot);
449+
/* disable syncscan in parallel tid range scan. */
450+
pscan->phs_syncscan = false;
451+
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
452+
node->ss.ss_currentScanDesc =
453+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan);
454+
}
455+
456+
/* ----------------------------------------------------------------
457+
* ExecTidRangeScanReInitializeDSM
458+
*
459+
* Reset shared state before beginning a fresh scan.
460+
* ----------------------------------------------------------------
461+
*/
462+
void
463+
ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
464+
ParallelContext *pcxt)
465+
{
466+
ParallelTableScanDesc pscan;
467+
468+
pscan = node->ss.ss_currentScanDesc->rs_parallel;
469+
table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
470+
471+
}
472+
473+
/* ----------------------------------------------------------------
474+
* ExecTidRangeScanInitializeWorker
475+
*
476+
* Copy relevant information from TOC into planstate.
477+
* ----------------------------------------------------------------
478+
*/
479+
void
480+
ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
481+
ParallelWorkerContext *pwcxt)
482+
{
483+
ParallelTableScanDesc pscan;
484+
485+
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
486+
node->ss.ss_currentScanDesc =
487+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan);
488+
}

src/backend/optimizer/path/costsize.c

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,7 +1367,8 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
13671367
Selectivity selectivity;
13681368
double pages;
13691369
Cost startup_cost = 0;
1370-
Cost run_cost = 0;
1370+
Cost cpu_run_cost = 0;
1371+
Cost disk_run_cost = 0;
13711372
QualCost qpqual_cost;
13721373
Cost cpu_per_tuple;
13731374
QualCost tid_qual_cost;
@@ -1396,11 +1397,7 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
13961397

13971398
/*
13981399
* The first page in a range requires a random seek, but each subsequent
1399-
* page is just a normal sequential page read. NOTE: it's desirable for
1400-
* TID Range Scans to cost more than the equivalent Sequential Scans,
1401-
* because Seq Scans have some performance advantages such as scan
1402-
* synchronization and parallelizability, and we'd prefer one of them to
1403-
* be picked unless a TID Range Scan really is better.
1400+
* page is just a normal sequential page read.
14041401
*/
14051402
ntuples = selectivity * baserel->tuples;
14061403
nseqpages = pages - 1.0;
@@ -1417,32 +1414,47 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
14171414
&spc_seq_page_cost);
14181415

14191416
/* disk costs; 1 random page and the remainder as seq pages */
1420-
run_cost += spc_random_page_cost + spc_seq_page_cost * nseqpages;
1417+
disk_run_cost += spc_random_page_cost + spc_seq_page_cost * nseqpages;
14211418

14221419
/* Add scanning CPU costs */
14231420
get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost);
14241421

14251422
/*
14261423
* XXX currently we assume TID quals are a subset of qpquals at this
14271424
* point; they will be removed (if possible) when we create the plan, so
1428-
* we subtract their cost from the total qpqual cost. (If the TID quals
1425+
* we subtract their cost from the total qpqual cost. (If the TID quals
14291426
* can't be removed, this is a mistake and we're going to underestimate
14301427
* the CPU cost a bit.)
14311428
*/
14321429
startup_cost += qpqual_cost.startup + tid_qual_cost.per_tuple;
14331430
cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple -
14341431
tid_qual_cost.per_tuple;
1435-
run_cost += cpu_per_tuple * ntuples;
1432+
cpu_run_cost += cpu_per_tuple * ntuples;
14361433

14371434
/* tlist eval costs are paid per output row, not per tuple scanned */
14381435
startup_cost += path->pathtarget->cost.startup;
1439-
run_cost += path->pathtarget->cost.per_tuple * path->rows;
1436+
cpu_run_cost += path->pathtarget->cost.per_tuple * path->rows;
1437+
1438+
/* Adjust costing for parallelism, if used. */
1439+
if (path->parallel_workers > 0)
1440+
{
1441+
double parallel_divisor = get_parallel_divisor(path);
1442+
1443+
/* The CPU cost is divided among all the workers. */
1444+
cpu_run_cost /= parallel_divisor;
1445+
1446+
/*
1447+
* In the case of a parallel plan, the row count needs to represent
1448+
* the number of tuples processed per worker.
1449+
*/
1450+
path->rows = clamp_row_est(path->rows / parallel_divisor);
1451+
}
14401452

14411453
/* we should not generate this path type when enable_tidscan=false */
14421454
Assert(enable_tidscan);
14431455
path->disabled_nodes = 0;
14441456
path->startup_cost = startup_cost;
1445-
path->total_cost = startup_cost + run_cost;
1457+
path->total_cost = startup_cost + cpu_run_cost + disk_run_cost;
14461458
}
14471459

14481460
/*

src/backend/optimizer/path/tidpath.c

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "optimizer/pathnode.h"
4848
#include "optimizer/paths.h"
4949
#include "optimizer/restrictinfo.h"
50+
#include "optimizer/cost.h"
5051

5152

5253
/*
@@ -553,7 +554,22 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
553554

554555
add_path(rel, (Path *) create_tidrangescan_path(root, rel,
555556
tidrangequals,
556-
required_outer));
557+
required_outer,
558+
0));
559+
560+
/* If appropriate, consider parallel tid range scan. */
561+
if (rel->consider_parallel && required_outer == NULL)
562+
{
563+
int parallel_workers;
564+
565+
parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
566+
max_parallel_workers_per_gather);
567+
if (parallel_workers > 0)
568+
{
569+
add_partial_path(rel, (Path *) create_tidrangescan_path(root, rel, tidrangequals,
570+
required_outer, parallel_workers));
571+
}
572+
}
557573
}
558574

559575
/*

src/backend/optimizer/util/pathnode.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,7 +1262,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
12621262
*/
12631263
TidRangePath *
12641264
create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
1265-
List *tidrangequals, Relids required_outer)
1265+
List *tidrangequals, Relids required_outer,
1266+
int parallel_workers)
12661267
{
12671268
TidRangePath *pathnode = makeNode(TidRangePath);
12681269

@@ -1271,9 +1272,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
12711272
pathnode->path.pathtarget = rel->reltarget;
12721273
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
12731274
required_outer);
1274-
pathnode->path.parallel_aware = false;
1275+
pathnode->path.parallel_aware = (parallel_workers > 0);
12751276
pathnode->path.parallel_safe = rel->consider_parallel;
1276-
pathnode->path.parallel_workers = 0;
1277+
pathnode->path.parallel_workers = parallel_workers;
12771278
pathnode->path.pathkeys = NIL; /* always unordered */
12781279

12791280
pathnode->tidrangequals = tidrangequals;

src/include/access/relscan.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ typedef struct ParallelBlockTableScanDescData
9696
BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
9797
slock_t phs_mutex; /* mutual exclusion for setting startblock */
9898
BlockNumber phs_startblock; /* starting block number */
99+
BlockNumber phs_numblock; /* # blocks to scan, or InvalidBlockNumber if
100+
* no limit */
99101
pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
100102
* workers so far. */
101103
} ParallelBlockTableScanDescData;

src/include/access/tableam.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,6 +1125,16 @@ extern void table_parallelscan_initialize(Relation rel,
11251125
extern TableScanDesc table_beginscan_parallel(Relation relation,
11261126
ParallelTableScanDesc pscan);
11271127

1128+
/*
1129+
* Begin a parallel tidrange scan. `pscan` needs to have been initialized with
1130+
* table_parallelscan_initialize(), for the same relation. The initialization
1131+
* does not need to have happened in this backend.
1132+
*
1133+
* Caller must hold a suitable lock on the relation.
1134+
*/
1135+
extern TableScanDesc table_beginscan_parallel_tidrange(Relation relation,
1136+
ParallelTableScanDesc pscan);
1137+
11281138
/*
11291139
* Restart a parallel scan. Call this in the leader process. Caller is
11301140
* responsible for making sure that all workers have finished the scan

0 commit comments

Comments
 (0)