wal.watcher应用在remote_write的流程中:
- wal.watcher会实时读取wal最新的数据;
- 然后将其Apppend给QueueManager;
- 最后由QueueManager发送给remote storage。
一.整体框架
wal.watcher在启动时,记录wal.startTimestamp = now();
- wal.watcher先读checkpoint目录下的segment,调用readSegment()读取wal中的数据;
- wal.watcher再读wal目录下的segment,调用readSegment()读取wal中的数据;
由于wal目录中的数据是实时追加,wal.watcher会实时读取其中的数据。
readSegment()读取wal中数据的流程:
- 若遇到record.Series,则将其添加到QueueManager的series缓存中;
- 若遇到reocrd.Samples,检查其时间 > wal.startTimeStamp,若是,则将其Append到QueueMananger;
二.流程代码
入口是QueueManager.Start():
// storage/remote/queue_manager.go
func (t *QueueManager) Start() {
......
t.watcher.Start()
......
}
其主要工作在w.loop()中:
// tsdb/wal/watcher.go
func (w *Watcher) Start() {
w.setMetrics()
level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
go w.loop()
}
w.loop()保证除非退出,否则会一直执行w.Run():
// tsdb/wal/watcher.go
func (w *Watcher) loop() {
defer close(w.done)
// We may encounter failures processing the WAL; we should wait and retry.
for !isClosed(w.quit) {
w.SetStartTime(time.Now()) // 设置w.startTimestamp
if err := w.Run(); err != nil {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}
select {
case
w.Run()是具体的执行流程:
- 首先,读checkpoint目录下的segment,进行LiveReader;
- 然后,读wal目录下的segment,进行LiveReader;
// tsdb/wal/watcher.go
func (w *Watcher) Run() error {
_, lastSegment, err := w.firstAndLast()
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
if err == nil {
if err = w.readCheckpoint(lastCheckpoint); err != nil { // 首先,LiveReader checkpoint目录下的segments
return errors.Wrap(err, "readCheckpoint")
}
}
w.lastCheckpoint = lastCheckpoint
currentSegment, err := w.findSegmentForIndex(checkpointIndex)
for !isClosed(w.quit) {
// 然后,LiveReader wal目录下的segments
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil {
return err
}
...
currentSegment++
}
return nil
}
读checkpoint目录下的segment:
- 遍历checkpoint目录的所有segment;
- 对每个segment,使用LiveReader读取文件,使用readSegment()解析处理;
// tsdb/wal/watcher.go
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string) error {
index, err := checkpointNum(checkpointDir)
// Ensure we read the whole contents of every segment in the checkpoint dir.
segs, err := w.segments(checkpointDir)
for _, seg := range segs {
sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))
defer sr.Close()
r := NewLiveReader(w.logger, w.readerMetrics, sr)
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {
return errors.Wrap(err, "readSegment")
}
}
return nil
}
读wal目录下的segment:
- 调用w.readSegment()持续的对segmentNum进行读取;
// tsdb/wal/watcher.go
func (w *Watcher) watch(segmentNum int, tail bool) error {
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
.......
gcSem := make(chan struct{}, 1)
for {
select {
case
三.读数据的代码
读取在w.readSegment()中:
- r.Next(): 使用LiveReader读取segment文件;
-
在readSegment()中对读取的数据进行解析,发送;
- 若record是series类型,则将其保存到QueueManager;
- 若record是samples类型,检查sample.T > w.startTimestamp,然后符合条件的samples发送给QueueManager;
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
......
for r.Next() && !isClosed(w.quit) { // 读一条record
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
// 将series信息保存到QueueManager
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
// 对samples而言,若segment不是最后一个正在读写的,则跳过
if !tail {
break
}
samples, err := dec.Samples(rec, samples[:0])
for _, s := range samples {
if s.T > w.startTimestamp { //时间上有效的samples
....
send = append(send, s)
}
}
if len(send) > 0 {
// 将samples发送给QueueManager
w.writer.Append(send)
send = send[:0]
}
......
}
}
return r.Err()
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net