首先要干的是先为事务日志申请共享内存,以及一些重要结构体的初始化。
本节主要内容如下:
- 计算XLOG需要申请多少共享内存:XLOGShmemSize函数
- 两个重要结构体:XLogCtlData和XLogCtlInsert
- XLOG初始化共享内存:XLOGShmemInit函数
一、 XLOGShmemSize函数
用户可以通过wal_buffers参数来指定XLOG BUFFER中缓存页面的数量。wal_buffers默认值为-1,表示pg通过启发式算法(XLOGChooseNumBuffers函数)计算出需要缓存页面的数量。
除了XLOG BUFFER,还需要申请一部分共享内存保存XLOG的控制信息,因此在函数中可以看到size是几部分相加的结果。
/*
* 计算XLOG需要申请多少共享内存
*/
Size XLOGShmemSize(void)
{
Size size;
/*
* 判断wal_buffers是否为-1,如果是,则调用XLOGChooseNumBuffers函数获取值
*/
if (XLOGbuffers == -1)
{
char buf[32];
snprintf(buf, sizeof(buf), "%d", XLOGChooseNumBuffers());
SetConfigOption("wal_buffers", buf, PGC_POSTMASTER, PGC_S_OVERRIDE);
}
// wal_buffers必须大于0,因为是缓存的页面数
Assert(XLOGbuffers > 0);
/* XLogCtl,XLog控制信息的大小,后面会具体了解这个结构体 */
size = sizeof(XLogCtlData);
/* WAL insertion locks, plus alignment,日志记录插入时使用的轻量锁大小 */
size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
/* xlblocks array,记录每个XLog Block的起始LSN */
size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
/* extra alignment padding for XLOG I/O buffers,额外使用一个块大小,保持字节对齐 */
size = add_size(size, XLOG_BLCKSZ);
/* and the buffers themselves,XLOG BUFFER的大小 */
size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
// 注意:本函数不计算ControlFileData的大小
return size;
}
从上述代码中可用看出共享缓存被分了5个部分:
- 第一部分:XLogCtl
- 第二部分:LSN数组,数组元素个数和log buffer的页面数一致(XLOGbuffers)
- 第三部分:WALInsertLockPadded数组,数组元素个数为NUM_XLOGINSERT_LOCKS
- 第四部分:对齐位
- 第五部分:log buffer,数组元素个数为XLOGbuffers
二、 两个重要结构体
1. XLogCtlData
XLogCtlData中保存当前WAL的写入状态、刷盘状态、buffer页面的状态信息等。
/*
* Total shared-memory state for XLOG.
*/
typedef struct XLogCtlData
{
XLogCtlInsert Insert; //其中包含XLogCtlInsert结构体
/* Protected by info_lck: 由info_lck保护 */
XLogwrtRqst LogwrtRqst; /* 日志需要写入和刷入的LSN */
XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr,冗余保存Insert->RedoRecPtr,表示接近checkpoint的Redo LSN */
FullTransactionId ckptFullXid; /* nextXid of latest checkpoint,最近一次checkpoint对应的下一个事务id */
XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort,最新异步提交/回滚操作的LSN */
XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot,复制槽需要的最旧LSN */
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment,最近一次删除/回收的日志段ID */
/* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. 假的LSN计数器,用于不需记录日志的表,由ulsn_lck保护,目前只有GiST使用 */
XLogRecPtr unloggedLSN;
slock_t ulsn_lck;
/* Time and LSN of last xlog segment switch. Protected by WALWriteLock. WAL日志切换时,记录当前时间和刷入日志的LSN,由WALWriteLock保护 */
pg_time_t lastSegSwitchTime;
XLogRecPtr lastSegSwitchLSN;
/*
* Protected by info_lck and WALWriteLock (you must hold either lock to
* read it, but both to update) 日志已经写入和刷入的LSN。由info_lck和WALWriteLock保护,获得其中一个时可以读取它,两者都获得时才能更新它
*/
XLogwrtResult LogwrtResult;
/*
* Latest initialized page in the cache (last byte position + 1).
* 当前XLOG BUFFER分配的页面中,最后一个页面的LSN
*/
XLogRecPtr InitializedUpTo;
/*
* These values do not change after startup, although the pointed-to pages
* and xlblocks values certainly do. xlblocks values are protected by
* WALBufMappingLock. XLOG BUFFER中的页面及页面编号。以下值在db启动后就不会改变,虽然它们指向页和xlblocks确实会变。Xlblocks的值由WALBufMappingLock保护
*/
char *pages; /* buffers for unwritten XLOG pages,指向XLOG BUFFER中尚未写入XLOG的页的指针 */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index,最大已分配的XLOG BUFFER索引。存放最大的log buffer页面下标,也就是页面数量-1 */
/*
* 时间线信息
*/
TimeLineID ThisTimeLineID; // 当前的时间线
TimeLineID PrevTimeLineID; // 之前的时间线,如果没有出现过分支,二者是相等的
/*
* SharedRecoveryState indicates if we're still in crash or archive
* recovery. Protected by info_lck.恢复状态标志,表示我们是否在crash或archive恢复中,由info_lck保护
*/
RecoveryState SharedRecoveryState;
/*
* SharedHotStandbyActive indicates if we allow hot standby queries to be
* run. Protected by info_lck.是否允许从库执行查询,由info_lck保护
*/
bool SharedHotStandbyActive;
/*
* SharedPromoteIsTriggered indicates if a standby promotion has been
* triggered. Protected by info_lck. 是否已触发从库提升为主库操作,由info_lck保护
*/
bool SharedPromoteIsTriggered;
/*
* WalWriterSleeping indicates whether the WAL writer is currently in
* low-power mode (and hence should be nudged if an async commit occurs).
* Protected by info_lck. WAL writer当前是否为low-power模式(允许异步提交),由info_lck保护
*/
bool WalWriterSleeping;
/*
* recoveryWakeupLatch is used to wake up the startup process to continue
* WAL replay, if it is waiting for WAL to arrive or failover trigger file
* to appear. 用于唤醒启动进程继续执行WAL replay操作,如果当前在等待WAL进行归档或者出现failover触发器文件
*
*/
Latch recoveryWakeupLatch;
/*
* During recovery, we keep a copy of the latest checkpoint record here.在recovery期间,我们会保存最近的checkpoint记录的复制
*/
XLogRecPtr lastCheckPointRecPtr; //指向checkpoint记录的开头位置
XLogRecPtr lastCheckPointEndPtr; //指向checkpoint记录的结束位置(end+1),当checkpointer需要创建restartpoint时使用
CheckPoint lastCheckPoint; // 最近的checkpoint记录的复制
/*
* lastReplayedEndRecPtr points to end+1 of the last record successfully
* replayed. When we're currently replaying a record, ie. in a redo
* function, replayEndRecPtr points to the end+1 of the record being
* replayed, otherwise it's equal to lastReplayedEndRecPtr.
* lastReplayedEndRecPtr指向最后一个成功回放的记录的end+1位置。
* 如果正处于redo函数回放记录期间,replayEndRecPtr指向正在恢复的记录的end+1位置,否则replayEndRecPtr = lastReplayedEndRecPtr
*/
XLogRecPtr lastReplayedEndRecPtr;
TimeLineID lastReplayedTLI;
XLogRecPtr replayEndRecPtr;
TimeLineID replayEndTLI;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed),最后的提交/回滚记录回放(或正在回放)的时间 */
TimestampTz recoveryLastXTime;
/*
* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery,开始回放当前的WAL chunk的时间(仅与复制或归档恢复相关)
*/
TimestampTz currentChunkStartTime;
/* Recovery pause state,Recovery暂停状态 */
RecoveryPauseState recoveryPauseState;
ConditionVariable recoveryNotPausedCV;
/*
* lastFpwDisableRecPtr points to the start of the last replayed
* XLOG_FPW_CHANGE record that instructs full_page_writes is disabled.
* 指向最后已回放的XLOG_FPW_CHANGE记录(禁用全页写)的起始点.
*/
XLogRecPtr lastFpwDisableRecPtr;
slock_t info_lck; /* locks shared variables shown above,前面提到的一个锁共享变量 */
} XLogCtlData;
// 定义对应指针,初始值为空
static XLogCtlData *XLogCtl = NULL;
2. XLogCtlInsert
XLogCtlInsert中保存日志记录中写入buffer时所需的各种变量。
/*
* Shared state data for WAL insertion.
*/
typedef struct XLogCtlInsert
{
slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos,保护CurrBytePos和PrevBytePos */
uint64 CurrBytePos; // 新记录写入的位置
uint64 PrevBytePos; // 新记录需要记录前一条日志的LSN
/*
* 确保上面的变量(会被频繁修改)在同一个cache line,下面的变量(较少修改)在另一个cache line。分到不同的cache line可以避免上面变量频繁修改导致cache line失效,影响下面变量的读取效率
*/
char pad[PG_CACHE_LINE_SIZE];
/*
* 全页写相关的变量
*/
XLogRecPtr RedoRecPtr; /* current redo point for insertions,插入时的当前redo point */
bool forcePageWrites; /* forcing full-page writes for PITR? 为PITR强制执行全页写? */
bool fullPageWrites; //是否全页写?
/*
* 在线备份功能 pg_start_backup/pg_stop_backup
*/
ExclusiveBackupState exclusiveBackupState; //排他备份的状态
int nonExclusiveBackups; //非排他备份变量是一个计数器,指示当前正在进行的流基础备份(streaming base backups)的数量
XLogRecPtr lastBackupStart; //用作在线备份起点的最新检查点的重做位置
/*
* WAL insertion locks.日志插入时的锁。为提高日志记录写入buffer的并发度,这里会分配NUM_XLOGINSERT_LOCKS个锁,Backends进程按照MyProc->pgprocno依次申请,直到获得锁
*/
WALInsertLockPadded *WALInsertLocks;
} XLogCtlInsert;
三、 XLOGShmemInit函数
用于XLOG初始化共享内存
void XLOGShmemInit(void)
{
bool foundCFile,
foundXLog;
char *allocptr;
int i;
ControlFileData *localControlFile; // 控制文件内容
#ifdef WAL_DEBUG // 如果启用了WAL_DEBUG参数
/*
* 为WAL debug创建内存上下文,如果内存分配失败,DB可能进入PANIC状态,不过wal_debug本来就不是用于生产环境的参数,所以问题也不大
*/
if (walDebugCxt == NULL)
{
walDebugCxt = AllocSetContextCreate(TopMemoryContext,
"WAL Debug",
ALLOCSET_DEFAULT_SIZES);
MemoryContextAllowInCriticalSection(walDebugCxt, true);
}
#endif
// 初始化共享内存结构体XLogCtlData的对象XLogCtl
XLogCtl = (XLogCtlData *)
ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
// 初始化共享内存结构体ControlFileData的对象ControlFile,以及localControlFile
localControlFile = ControlFile;
ControlFile = (ControlFileData *)
ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
// 如果存在控制文件或XLOG文件
if (foundCFile || foundXLog)
{
/* both should be present or neither,两者都存在或都不存在 */
Assert(foundCFile && foundXLog);
/* Initialize local copy of WALInsertLocks,初始化WALInsertLocks的本地copy */
WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
/* 如果localControlFile已存在,释放其占用的内存 */
if (localControlFile)
pfree(localControlFile);
return;
}
// 为XLogCtl分配内存
memset(XLogCtl, 0, sizeof(XLogCtlData));
/*
* Already have read control file locally, unless in bootstrap mode. Move
* contents into shared memory. 如果本地已读取过控制文件,除非是bootstrap模式,否则将其内容移至共享内存
*/
if (localControlFile)
{
memcpy(ControlFile, localControlFile, sizeof(ControlFileData));
pfree(localControlFile);
}
/*
* Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
* multiple of the alignment for same, so no extra alignment padding is
* needed here.
* 由于XLogCtlData包含XLogRecPtr字段,它的size应该是多次相同大小的分配,不需要额外的填充字段
*/
allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
/* WAL insertion locks. Ensure they're aligned to the full padded size,WAL插入锁 */
allocptr += sizeof(WALInsertLockPadded) -
((uintptr_t) allocptr) % sizeof(WALInsertLockPadded);
WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
(WALInsertLockPadded *) allocptr;
allocptr += sizeof(WALInsertLockPadded) * NUM_XLOGINSERT_LOCKS;
for (i = 0; i pages = allocptr;
memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
/*
* Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
* in additional info.) 初始化 XLogCtl 共享数据,StartupXLOG函数会填充其余的字段
*/
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
XLogCtl->SharedRecoveryState = RECOVERY_STATE_CRASH;
XLogCtl->SharedHotStandbyActive = false;
XLogCtl->SharedPromoteIsTriggered = false;
XLogCtl->WalWriterSleeping = false;
// XLogCtl 中的锁是通过spinlock来实现的
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
}
参考
《PostgreSQL技术内幕:事务处理深度探索》第4章
https://www.jianshu.com/p/69323c1c9994
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net