Skip to content

Commit de5134e

Browse files
melanieplagemanCommitfest Bot
authored andcommitted
Eagerly flush bulkwrite strategy ring
Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger run, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level. The heuristic for eager eviction is to only flush buffers in the strategy ring which do not require a WAL flush. This patch also is a step toward AIO writes. Reviewed-by: Chao Li <li.evan.chao@gmail.com> Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com> Earlier version Reviewed-by: Kirill Reshke <reshkekirill@gmail.com> Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com
1 parent 3142b4a commit de5134e

File tree

3 files changed

+235
-6
lines changed

3 files changed

+235
-6
lines changed

src/backend/storage/buffer/bufmgr.c

Lines changed: 183 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,16 @@ static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object
534534
IOContext io_context, XLogRecPtr buffer_lsn);
535535
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
536536
IOObject io_object, IOContext io_context);
537-
static void CleanVictimBuffer(BufferDesc *bufdesc, uint32 *buf_state,
537+
static BufferDesc *NextStratBufToFlush(BufferAccessStrategy strategy,
538+
Buffer sweep_end,
539+
XLogRecPtr *lsn,
540+
int *sweep_cursor);
541+
static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
542+
RelFileLocator *rlocator,
543+
bool skip_pinned,
544+
XLogRecPtr *max_lsn);
545+
static void CleanVictimBuffer(BufferAccessStrategy strategy,
546+
BufferDesc *bufdesc, uint32 *buf_state,
538547
bool from_ring, IOContext io_context);
539548
static void FindAndDropRelationBuffers(RelFileLocator rlocator,
540549
ForkNumber forkNum,
@@ -2420,7 +2429,7 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
24202429
}
24212430

24222431
/* Content lock is released inside CleanVictimBuffer */
2423-
CleanVictimBuffer(buf_hdr, &buf_state, from_ring, io_context);
2432+
CleanVictimBuffer(strategy, buf_hdr, &buf_state, from_ring, io_context);
24242433
}
24252434

24262435
if (buf_state & BM_VALID)
@@ -4254,6 +4263,40 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
42544263
DoFlushBuffer(buf, reln, io_object, io_context, lsn);
42554264
}
42564265

4266+
/*
4267+
* Returns the buffer descriptor of the buffer containing the next block we
4268+
* should eagerly flush or NULL when there are no further buffers to consider
4269+
* writing out.
4270+
*/
4271+
static BufferDesc *
4272+
NextStratBufToFlush(BufferAccessStrategy strategy,
4273+
Buffer sweep_end,
4274+
XLogRecPtr *lsn, int *sweep_cursor)
4275+
{
4276+
Buffer bufnum;
4277+
BufferDesc *bufdesc;
4278+
4279+
while ((bufnum =
4280+
StrategySweepNextBuffer(strategy, sweep_cursor)) != sweep_end)
4281+
{
4282+
/*
4283+
* For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining
4284+
* buffers in the ring will be invalid.
4285+
*/
4286+
if (!BufferIsValid(bufnum))
4287+
break;
4288+
4289+
if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum,
4290+
InvalidBlockNumber,
4291+
NULL,
4292+
true,
4293+
lsn)) != NULL)
4294+
return bufdesc;
4295+
}
4296+
4297+
return NULL;
4298+
}
4299+
42574300
/*
42584301
* Prepare and write out a dirty victim buffer.
42594302
*
@@ -4264,24 +4307,158 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
42644307
* bufdesc and buf_state may be modified.
42654308
*/
42664309
static void
4267-
CleanVictimBuffer(BufferDesc *bufdesc, uint32 *buf_state,
4310+
CleanVictimBuffer(BufferAccessStrategy strategy,
4311+
BufferDesc *bufdesc, uint32 *buf_state,
42684312
bool from_ring, IOContext io_context)
42694313
{
42704314

42714315
XLogRecPtr max_lsn = InvalidXLogRecPtr;
42724316
LWLock *content_lock;
4317+
bool first_buffer = true;
42734318

42744319
Assert(*buf_state & BM_DIRTY);
42754320

42764321
/* Set up this victim buffer to be flushed */
42774322
if (!PrepareFlushBuffer(bufdesc, buf_state, &max_lsn))
42784323
return;
42794324

4280-
DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
4325+
if (from_ring && StrategySupportsEagerFlush(strategy))
4326+
{
4327+
Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc);
4328+
int cursor = StrategySweepStart(strategy);
4329+
4330+
/* Clean victim buffer and find more to flush opportunistically */
4331+
do
4332+
{
4333+
DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
4334+
content_lock = BufferDescriptorGetContentLock(bufdesc);
4335+
LWLockRelease(content_lock);
4336+
ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
4337+
&bufdesc->tag);
4338+
/* We leave the first buffer pinned for the caller */
4339+
if (!first_buffer)
4340+
UnpinBuffer(bufdesc);
4341+
first_buffer = false;
4342+
} while ((bufdesc = NextStratBufToFlush(strategy, sweep_end,
4343+
&max_lsn, &cursor)) != NULL);
4344+
}
4345+
else
4346+
{
4347+
DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
4348+
content_lock = BufferDescriptorGetContentLock(bufdesc);
4349+
LWLockRelease(content_lock);
4350+
ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
4351+
&bufdesc->tag);
4352+
}
4353+
}
4354+
4355+
/*
4356+
* Prepare bufdesc for eager flushing.
4357+
*
4358+
* Given bufnum, return the block -- the pointer to the block data in memory
4359+
* -- which we will opportunistically flush or NULL if this buffer does not
4360+
* contain a block that should be flushed.
4361+
*
4362+
* require is the BlockNumber required by the caller. Some callers may require
4363+
* a specific BlockNumber to be in bufnum because they are assembling a
4364+
* contiguous run of blocks.
4365+
*
4366+
* If the caller needs the block to be from a specific relation, rlocator will
4367+
* be provided.
4368+
*/
4369+
BufferDesc *
4370+
PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
4371+
RelFileLocator *rlocator, bool skip_pinned,
4372+
XLogRecPtr *max_lsn)
4373+
{
4374+
BufferDesc *bufdesc;
4375+
uint32 buf_state;
4376+
XLogRecPtr lsn;
4377+
BlockNumber blknum;
4378+
LWLock *content_lock;
4379+
4380+
if (!BufferIsValid(bufnum))
4381+
return NULL;
4382+
4383+
Assert(!BufferIsLocal(bufnum));
4384+
4385+
bufdesc = GetBufferDescriptor(bufnum - 1);
4386+
4387+
/* Block may need to be in a specific relation */
4388+
if (rlocator &&
4389+
!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag),
4390+
*rlocator))
4391+
return NULL;
4392+
4393+
/* Must do this before taking the buffer header spinlock */
4394+
ResourceOwnerEnlarge(CurrentResourceOwner);
4395+
ReservePrivateRefCountEntry();
4396+
4397+
buf_state = LockBufHdr(bufdesc);
4398+
4399+
if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID))
4400+
goto except_unlock_header;
4401+
4402+
/* We don't eagerly flush buffers used by others */
4403+
if (skip_pinned &&
4404+
(BUF_STATE_GET_REFCOUNT(buf_state) > 0 ||
4405+
BUF_STATE_GET_USAGECOUNT(buf_state) > 1))
4406+
goto except_unlock_header;
4407+
4408+
/* Get page LSN while holding header lock */
4409+
lsn = BufferGetLSN(bufdesc);
4410+
4411+
PinBuffer_Locked(bufdesc);
4412+
CheckBufferIsPinnedOnce(bufnum);
4413+
4414+
blknum = BufferGetBlockNumber(bufnum);
4415+
Assert(BlockNumberIsValid(blknum));
4416+
4417+
/* If we'll have to flush WAL to flush the block, we're done */
4418+
if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn))
4419+
goto except_unpin_buffer;
4420+
4421+
/* We only include contiguous blocks in the run */
4422+
if (BlockNumberIsValid(require) && blknum != require)
4423+
goto except_unpin_buffer;
4424+
42814425
content_lock = BufferDescriptorGetContentLock(bufdesc);
4426+
if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
4427+
goto except_unpin_buffer;
4428+
4429+
/*
4430+
* Now that we have the content lock, we need to recheck if we need to
4431+
* flush WAL.
4432+
*/
4433+
buf_state = LockBufHdr(bufdesc);
4434+
lsn = BufferGetLSN(bufdesc);
4435+
UnlockBufHdr(bufdesc, buf_state);
4436+
4437+
if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn))
4438+
goto except_unlock_content;
4439+
4440+
/* Try to start an I/O operation */
4441+
if (!StartBufferIO(bufdesc, false, true))
4442+
goto except_unlock_content;
4443+
4444+
if (lsn > *max_lsn)
4445+
*max_lsn = lsn;
4446+
buf_state = LockBufHdr(bufdesc);
4447+
buf_state &= ~BM_JUST_DIRTIED;
4448+
UnlockBufHdr(bufdesc, buf_state);
4449+
4450+
return bufdesc;
4451+
4452+
except_unlock_content:
42824453
LWLockRelease(content_lock);
4283-
ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
4284-
&bufdesc->tag);
4454+
4455+
except_unpin_buffer:
4456+
UnpinBuffer(bufdesc);
4457+
return NULL;
4458+
4459+
except_unlock_header:
4460+
UnlockBufHdr(bufdesc, buf_state);
4461+
return NULL;
42854462
}
42864463

42874464
/*

src/backend/storage/buffer/freelist.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,31 @@ ClockSweepTick(void)
156156
return victim;
157157
}
158158

159+
/*
160+
* Some BufferAccessStrategies support eager flushing -- which is flushing
161+
* buffers in the ring before they are needed. This can lead to better I/O
162+
* patterns than lazily flushing buffers immediately before reusing them.
163+
*/
164+
bool
165+
StrategySupportsEagerFlush(BufferAccessStrategy strategy)
166+
{
167+
Assert(strategy);
168+
169+
switch (strategy->btype)
170+
{
171+
case BAS_BULKWRITE:
172+
return true;
173+
case BAS_VACUUM:
174+
case BAS_NORMAL:
175+
case BAS_BULKREAD:
176+
return false;
177+
default:
178+
elog(ERROR, "unrecognized buffer access strategy: %d",
179+
(int) strategy->btype);
180+
return false;
181+
}
182+
}
183+
159184
/*
160185
* StrategyGetBuffer
161186
*
@@ -270,6 +295,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
270295
}
271296
}
272297

298+
/*
299+
* Return the next buffer in the ring or InvalidBuffer if the current sweep is
300+
* over.
301+
*/
302+
Buffer
303+
StrategySweepNextBuffer(BufferAccessStrategy strategy, int *sweep_cursor)
304+
{
305+
if (++(*sweep_cursor) >= strategy->nbuffers)
306+
*sweep_cursor = 0;
307+
308+
return strategy->buffers[*sweep_cursor];
309+
}
310+
311+
/*
312+
* Return the starting buffer of a sweep of the strategy ring
313+
*/
314+
int
315+
StrategySweepStart(BufferAccessStrategy strategy)
316+
{
317+
return strategy->current;
318+
}
319+
320+
273321
/*
274322
* StrategySyncStart -- tell BgBufferSync where to start syncing
275323
*

src/include/storage/buf_internals.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag
437437

438438

439439
/* freelist.c */
440+
extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy);
441+
extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy,
442+
int *sweep_cursor);
443+
extern int StrategySweepStart(BufferAccessStrategy strategy);
440444
extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
441445
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
442446
uint32 *buf_state, bool *from_ring);

0 commit comments

Comments
 (0)