lotus 消息同步过程
lotus 在创建过程启动了什么服务?
消息同步需要 chain 组件,chain 组件的运行需要依赖 libp2p 的组件。在 lotus/node/builder_chain.go 中提到了 chain 组件的构造过程:
- 检测配置
- 网络引导
- 检测加密依赖
- VM创建
- 链存储以及链状态管理器启动
- 链同步服务启动
- 链网络接入p2p
- 链与矿工的API依赖
- 消息池服务启动
- 钱包服务启动
- 交易频道服务开启
- 三类市场服务开启
- 上述服务的部分方法根据 lotus 节点类型的区别选择是否启动
关于同步相关的逻辑,主要是在 “链同步服务启动” 和 “上述服务的部分方法根据 lotus 节点类型的区别选择是否启动(数据进入)”。只有 lotus 节点为 FULLNODE 时才会有链同步。
lotus 同步
- 启动 libp2p 服务
- 将 Syncer.Sync 作为参数创建一个 Sync.SyncManager
- 通过 NewSyncer+Sync.SyncManager 创建一个 Syncer
- 启动 Syncer,启动 Sync.SyncManager
- 开启对新网络的消息订阅(这里就会提到,如果在强制升级中没有升级,那么网络名不会更新,就不能获得最新的出块信息)
- 对过来的消息进行 Sync 处理
NewSyncer
在创建新的同步器之前先创建了 SyncManagerCtor, 它的类型如下所示,在 Syncer 创建中会使用到,它的主要作用就是返回 SyncManager,但生成过程中需要用到 Syncer 的 Sync 实现,这里有点回调的意思。
type SyncFunc func(context.Context, *types.TipSet) error
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
// NewSyncer creates a new Syncer object.
func NewSyncer(ds dtypes.MetadataDS,
sm *stmgr.StateManager,
exchange exchange.Client,
syncMgrCtor SyncManagerCtor,
connmgr connmgr.ConnManager,
self peer.ID,
beacon beacon.Schedule,
gent Genesis,
consensus consensus.Consensus) (*Syncer, error) {
s := &Syncer{
ds: ds, // 元数据存储
beacon: beacon, // 信标点切片
bad: NewBadBlockCache(), //
Genesis: gent, // 创世tipset
consensus: consensus, // 共识接口
Exchange: exchange, // 基于libp2p的Exchange.Client
store: sm.ChainStore(), // 链存储
sm: sm, // 链管理器
self: self, // 自身的peerID
receiptTracker: newBlockReceiptTracker(), //
connmgr: connmgr, // 自身host持有的连接管理
incoming: pubsub.New(50), // 启动一个容量为50的pubsub
}
s.syncmgr = syncMgrCtor(s.Sync) // 这里把s.Sync作为参数获得一个 SyncManager,s.Sync 就是切换链头的逻辑。实际上进行切换用的是 s.syncmgr.doSync 方法。
return s, nil
}
Syncer.Sync
这个 Syncer.Sync 非常重要。
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
trace.Int64Attribute("height", int64(maybeHead.Height())),
)
}
hts := syncer.store.GetHeaviestTipSet()
if hts.ParentWeight().GreaterThan(maybeHead.ParentWeight()) {
return nil
}
if syncer.Genesis.Equals(maybeHead) || hts.Equals(maybeHead) {
return nil
}
if err := syncer.collectChain(ctx, maybeHead, hts, false); err != nil {
span.AddAttributes(trace.StringAttribute("col_error", err.Error()))
span.SetStatus(trace.Status{
Code: 13,
Message: err.Error(),
})
return xerrors.Errorf("collectChain failed: %w", err)
}
// At this point we have accepted and synced to the new `maybeHead`
// (`StageSyncComplete`).
if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
span.AddAttributes(trace.StringAttribute("put_error", err.Error()))
span.SetStatus(trace.Status{
Code: 13,
Message: err.Error(),
})
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
}
peers := syncer.receiptTracker.GetPeers(maybeHead)
if len(peers) > 0 {
syncer.connmgr.TagPeer(peers[0], "new-block", 40)
for _, p := range peers[1:] {
syncer.connmgr.TagPeer(p, "new-block", 25)
}
}
return nil
}
上述的逻辑也是比较清晰的:
- 如果当前的tipset的父权重比参数的tipset的父权重高,就直接返回nil,不进行操作
- 同理如果当前tipset和参数tipset相同或者参数tipset和创世tipset相同,那么也直接返回nil,不进行操作
- syncer.collectChain(ctx, maybeHead, hts, false) 进行链头的切换
- syncer.store.PutTipSet(ctx, maybeHead) 完成切换后的操作
- 对出块的peer进行赋值(体现在 lotus net score)
这中间最重要的就是链头切换逻辑。
// collectChain tries to advance our view of the chain to the purported head.
//
// It goes through various stages:
//
// 1. StageHeaders: we proceed in the sync process by requesting block headers
// from our peers, moving back from their heads, until we reach a tipset
// that we have in common (such a common tipset must exist, thought it may
// simply be the genesis block).
//
// If the common tipset is our head, we treat the sync as a "fast-forward",
// else we must drop part of our chain to connect to the peer's head
// (referred to as "forking").
//
// 2. StagePersistHeaders: now that we've collected the missing headers,
// augmented by those on the other side of a fork, we persist them to the
// BlockStore.
//
// 3. StageMessages: having acquired the headers and found a common tipset,
// we then move forward, requesting the full blocks, including the messages.
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet, hts *types.TipSet, ignoreCheckpoint bool) error {
// ts 可能的头tipset,hts 现有的头tipset
ctx, span := trace.StartSpan(ctx, "collectChain")
defer span.End()
ss := extractSyncState(ctx)
// 生成一个 syncState,并且配置base为当前头,target为目标头
ss.Init(hts, ts)
// 实际的判断哪个头tipset所在的链权重最高(tips:暂时不看,里面的实现比较复杂,其实就是不断的往前推,直到重合的tipset,然后计算链权重,一般不会很长(一般目标头tipset权重更高,不然压根走不到这里))
headers, err := syncer.collectHeaders(ctx, ts, hts, ignoreCheckpoint)
if err != nil {
ss.Error(err)
return err
}
span.AddAttributes(trace.Int64Attribute("syncChainLength", int64(len(headers))))
if !headers[0].Equals(ts) {
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
}
// 下面几行是tipset的持久化保存
ss.SetStage(api.StagePersistHeaders)
toPersist := make([]*types.BlockHeader, 0, len(headers)*int(build.BlocksPerEpoch))
for _, ts := range headers {
toPersist = append(toPersist, ts.Blocks()...)
}
if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
err = xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
ss.Error(err)
return err
}
toPersist = nil
ss.SetStage(api.StageMessages)
// 链进行了切换,消息需要重新确认并持久化
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
err = xerrors.Errorf("collectChain syncMessages: %w", err)
ss.Error(err)
return err
}
ss.SetStage(api.StageSyncComplete)
log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids()))
return nil
}
链头切换完成后需要进行内部状态的变更,也就是 syncer.store.PutTipSet(ctx, maybeHead),重要的是里面的 MaybeTakeHeavierTipSet
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
for _, b := range ts.Blocks() {
if err := cs.PersistBlockHeaders(b); err != nil {
return err
}
}
expanded, err := cs.expandTipset(ts.Blocks()[0])
if err != nil {
return xerrors.Errorf("errored while expanding tipset: %w", err)
}
log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids())
if err := cs.MaybeTakeHeavierTipSet(ctx, expanded); err != nil {
return xerrors.Errorf("MaybeTakeHeavierTipSet failed in PutTipSet: %w", err)
}
return nil
}
// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our
// internal state as our new head, if and only if it is heavier than the current
// head and does not exceed the maximum fork length.
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
for {
cs.heaviestLk.Lock()
// reorgChBuf 为固定值32,cs.reorgCh 是一个长度为32的通道,它的结构体包含两个属性, old tipset 和 new tipset
if len(cs.reorgCh) < reorgChBuf/2 {
break
}
cs.heaviestLk.Unlock()
// 这里是容易出现问题的地方,本质上就是 cs.reorgCh 没办法消化掉,具体的消化逻辑放后面说,可以搜索:
log.Errorf("reorg channel is heavily backlogged, waiting a bit before trying to take process new tipsets")
select {
case <-time.After(time.Second / 2):
case <-ctx.Done():
return ctx.Err()
}
}
// 下面代码主要就是做了个权重判断,如果目标tipset权重高的话,就会执行 cs.takeHeaviestTipSet(ctx, ts),这里面会有前面说到的 cs.reorgCh 的生产过程
defer cs.heaviestLk.Unlock()
w, err := cs.weight(ctx, cs.StateBlockstore(), ts)
if err != nil {
return err
}
heaviestW, err := cs.weight(ctx, cs.StateBlockstore(), cs.heaviest)
if err != nil {
return err
}
heavier := w.GreaterThan(heaviestW)
if w.Equals(heaviestW) && !ts.Equals(cs.heaviest) {
log.Errorw("weight draw", "currTs", cs.heaviest, "ts", ts)
heavier = breakWeightTie(ts, cs.heaviest)
}
if heavier {
// TODO: don't do this for initial sync. Now that we don't have a
// difference between 'bootstrap sync' and 'caught up' sync, we need
// some other heuristic.
exceeds, err := cs.exceedsForkLength(cs.heaviest, ts)
if err != nil {
return err
}
if exceeds {
return nil
}
return cs.takeHeaviestTipSet(ctx, ts)
}
return nil
}
cs.reorgCh 如何被消费的?
其实就在创建该结构的时候就指定了一个函数进行消费,实现在 lotus/chain/store/store.go:548:
func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNotifee) chan<- reorg {
out := make(chan reorg, reorgChBuf)
notifees := make([]ReorgNotifee, len(initialNotifees))
copy(notifees, initialNotifees)
cs.wg.Add(1)
go func() {
defer cs.wg.Done()
defer log.Warn("reorgWorker quit")
for {
select {
case n := <-cs.reorgNotifeeCh:
notifees = append(notifees, n)
case r := <-out:
// 实际消费的方法
revert, apply, err := cs.ReorgOps(r.old, r.new)
if err != nil {
log.Error("computing reorg ops failed: ", err)
continue
}
// journal 里面 head_change 的数据记录,一个简单的例子:{"System":"sync","Event":"head_change","Timestamp":"2021-11-11T16:13:30.689264594+08:00","Data":{"From":[{"/":"bafy2bzacec5bq4rrn4eegxmd7o4sazskzoff5jv7vtzgv5dlvvj6zizt3v77k"},{"/":"bafy2bzacebsbatugdhaolromri7u66y4ddfgpbvo46s6l2eyjbsqnbeijzes6"},{"/":"bafy2bzaceb4rbcdkguslably3xhtnr2y24zo5d5anviswcpbt3l6fcxv3mrjm"},{"/":"bafy2bzacedc2s354eavxed5njvfpr76amo2xrw67wmp33mu672fwxc62scr5y"},{"/":"bafy2bzacedflehntxhkghi6kuhwvwsfk5e3a3lmpcuwrk2jbb7g6kthjaoy2y"},{"/":"bafy2bzacedg7icplqjg2vcyd4nye4zdmt2djp37bieqsga4alb3i4pylvb2ei"},{"/":"bafy2bzacecx54ljem5274tdh5qld7gkurk25eyokposiaacvg5cy4mqfai4wu"}],"FromHeight":1277067,"To":[{"/":"bafy2bzacec5bq4rrn4eegxmd7o4sazskzoff5jv7vtzgv5dlvvj6zizt3v77k"},{"/":"bafy2bzacebsbatugdhaolromri7u66y4ddfgpbvo46s6l2eyjbsqnbeijzes6"},{"/":"bafy2bzaceb4rbcdkguslably3xhtnr2y24zo5d5anviswcpbt3l6fcxv3mrjm"},{"/":"bafy2bzacedc2s354eavxed5njvfpr76amo2xrw67wmp33mu672fwxc62scr5y"},{"/":"bafy2bzacedflehntxhkghi6kuhwvwsfk5e3a3lmpcuwrk2jbb7g6kthjaoy2y"},{"/":"bafy2bzaceddhdwajwq7xl2c55l6scrimd7uzqgncwgmgslouefqdicyha6oiu"},{"/":"bafy2bzacedg7icplqjg2vcyd4nye4zdmt2djp37bieqsga4alb3i4pylvb2ei"},{"/":"bafy2bzacecx54ljem5274tdh5qld7gkurk25eyokposiaacvg5cy4mqfai4wu"}],"ToHeight":1277067,"RevertCount":1,"ApplyCount":1}}
// 查询方法为:jq -r 'select(.Event == "head_change" and .Data.ApplyCount > 1 and .Data.RevertCount > 0) | "\(.Timestamp), \(.Data.ApplyCount), \(.Data.RevertCount), \(.Data.FromHeight)"' repo/journal/latest.ndjson
cs.journal.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
return HeadChangeEvt{
From: r.old.Key(),
FromHeight: r.old.Height(),
To: r.new.Key(),
ToHeight: r.new.Height(),
RevertCount: len(revert),
ApplyCount: len(apply),
}
})
...
}
}
}()
return out
}
Sync.SyncManager
上面说了下 Syncer.Sync 具体的实现,其实它在同步整个过程中可能只占了1/3的篇幅,它说明了有了目标tipset后的操作,那么如何发现新的目标tipset?这一节我们回到它的上一层:Sync.SyncManager。
// sync manager interface
func NewSyncManager(sync SyncFunc) SyncManager {
ctx, cancel := context.WithCancel(context.Background())
return &syncManager{
ctx: ctx,
cancel: cancel,
workq: make(chan peerHead),
statusq: make(chan workerStatus),
heads: make(map[peer.ID]*types.TipSet),
state: make(map[uint64]*workerState),
recent: newSyncBuffer(RecentSyncBufferSize),
history: make([]*workerState, SyncWorkerHistory),
doSync: sync,
}
}
type peerHead struct {
p peer.ID
ts *types.TipSet
}
type workerStatus struct {
id uint64
err error
}
// 实际进行同步的函数
type SyncFunc func(context.Context, *types.TipSet) error
// 同步的管理器的生成器,传入同步函数,返回同步管理器
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
// 同步管理器
type SyncManager interface {
// Start starts the SyncManager.
Start()
// Stop stops the SyncManager.
Stop()
// SetPeerHead informs the SyncManager that the supplied peer reported the
// supplied tipset.
SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)
// State retrieves the state of the sync workers.
State() []SyncerStateSnapshot
}
// 同步管理器的生成(这里的s.Sync 在上文大篇幅提到)
s.syncmgr = syncMgrCtor(s.Sync)
OK,到了上一步骤,syncmgr就创建完成了,后面等待 Syncer 启动。
func (syncer *Syncer) Start() {
tickerCtx, tickerCtxCancel := context.WithCancel(context.Background())
// syncmgr 的 启动
syncer.syncmgr.Start()
syncer.tickerCtxCancel = tickerCtxCancel
go syncer.runMetricsTricker(tickerCtx)
}
func (sm *syncManager) Start() {
go sm.scheduler()
}
func (sm *syncManager) scheduler() {
ticker := time.NewTicker(time.Minute)
tickerC := ticker.C
for {
select {
case head := <-sm.workq:
// 根据peerHead生成worker,进行同步操作
sm.handlePeerHead(head)
case status := <-sm.statusq:
// 根据worker同步后的状态进行操作,删除状态等操作
sm.handleWorkerStatus(status)
case <-tickerC:
// 在上述的handleWorkerStatus中,如果worker同步花费的时间少于15分钟,那么就可以理解为初始化完成了,就会执行(仅执行一次)下面逻辑
if sm.initialSyncDone {
ticker.Stop()
tickerC = nil
sm.handleInitialSyncDone()
}
case <-sm.ctx.Done():
return
}
}
}
上述逻辑中,重要的就是三个handle。
handlePeerHead
func (sm *syncManager) handlePeerHead(head peerHead) {
log.Debugf("new peer head: %s %s", head.p, head.ts)
// have we started syncing yet?
// sm.nextWorker 在生成worker的时候才会改变,一开始就是要进去
if sm.nextWorker == 0 {
// track the peer head until we start syncing
sm.heads[head.p] = head.ts
// not yet; do we have enough peers?
// BootstrapPeerThreshold = 4,必须要凑够4个peerHead才会往下走
if len(sm.heads) < BootstrapPeerThreshold {
log.Debugw("not tracking enough peers to start sync worker", "have", len(sm.heads), "need", BootstrapPeerThreshold)
// not enough peers; track it and wait
return
}
// we are ready to start syncing; select the sync target and spawn a worker
// 选择最优的同步目标,还是在算权重,后面单独说
target, err := sm.selectInitialSyncTarget()
if err != nil {
log.Errorf("failed to select initial sync target: %s", err)
return
}
log.Infof("selected initial sync target: %s", target)
// 实际生产worker的方法,在这里面会调用 Syncer.Sync 进行链头切换
sm.spawnWorker(target)
return
}
// we have started syncing, add peer head to the queue if applicable and maybe spawn a worker
// if there is work to do (possibly in a fork)
// 在初始化同步完成后会进入到这个函数,这里面主要做的事情就是判断peerHead是不是在缓存池里,根据实际情况返回数据,后面再判断要不要新开worker
target, work, err := sm.addSyncTarget(head.ts)
if err != nil {
log.Warnf("failed to add sync target: %s", err)
return
}
if work {
log.Infof("selected sync target: %s", target)
sm.spawnWorker(target)
}
}
func (sm *syncManager) spawnWorker(target *types.TipSet) {
// 通过 lotus sync status 看到的第一行就是这个 worker id
id := sm.nextWorker
sm.nextWorker++
ws := &workerState{
id: id,
ts: target,
ss: new(SyncerState),
}
ws.ss.data.WorkerID = id
sm.mx.Lock()
sm.state[id] = ws
sm.mx.Unlock()
go sm.worker(ws)
}
func (sm *syncManager) worker(ws *workerState) {
log.Infof("worker %d syncing in %s", ws.id, ws.ts)
start := build.Clock.Now()
ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss)
// 这个doSync 就是 Syncer.Sync 函数
err := sm.doSync(ctx, ws.ts)
ws.dt = build.Clock.Since(start)
log.Infof("worker %d done; took %s", ws.id, ws.dt)
select {
// 将worker运行状态返回给 sm.statusq,这就要提到handleWorkerStatus了
case sm.statusq <- workerStatus{id: ws.id, err: err}:
case <-sm.ctx.Done():
}
}
handleWorkerStatus
func (sm *syncManager) handleWorkerStatus(status workerStatus) {
log.Debugf("worker %d done; status error: %s", status.id, status.err)
// 清理状态map
sm.mx.Lock()
ws := sm.state[status.id]
delete(sm.state, status.id)
// we track the last few workers for debug purposes
sm.history[sm.historyI] = ws
sm.historyI++
sm.historyI %= len(sm.history)
sm.mx.Unlock()
if status.err != nil {
// we failed to sync this target -- log it and try to work on an extended chain
// if there is nothing related to be worked on, we stop working on this chain.
log.Errorf("error during sync in %s: %s", ws.ts, status.err)
} else {
// add to the recently synced buffer
sm.recent.Push(ws.ts)
// if we are still in initial sync and this was fast enough, mark the end of the initial sync
// 这里是唯一可以把sm.initialSyncDone指定为true的地方,在某个worker工作完毕后,且完成时间小于15分钟,就会进到这个逻辑里
if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold {
sm.initialSyncDone = true
}
}
// we are done with this target, select the next sync target and spawn a worker if there is work
// to do, because of an extension of this chain.
// 这个handle进入的是比较频繁的,所以在结束的时候也需要做点生成worker的工作
target, work, err := sm.selectSyncTarget(ws.ts)
if err != nil {
log.Warnf("failed to select sync target: %s", err)
return
}
if work {
log.Infof("selected sync target: %s", target)
sm.spawnWorker(target)
}
}
handleInitialSyncDone
// 在同步开始一分钟后,且初始化同步已经完成了,就会进入到这个函数里,他的逻辑也很清晰,就是要生成至少 4(MaxSyncWorkers=5) 个worker,用来将整个同步流动起来
func (sm *syncManager) handleInitialSyncDone() {
// we have just finished the initial sync; spawn some additional workers in deferred syncs
// as needed (and up to MaxSyncWorkers) to ramp up chain sync
for len(sm.state) < MaxSyncWorkers {
target, work, err := sm.selectDeferredSyncTarget()
if err != nil {
log.Errorf("error selecting deferred sync target: %s", err)
return
}
if !work {
return
}
log.Infof("selected deferred sync target: %s", target)
sm.spawnWorker(target)
}
}
数据进入
上面所有的逻辑都需要基于 sm.workq 有数据进来,也就是 peerHead 进来。sm.workq 有数据了就生成 worker,worker 可以调用 Syncer.Sync 进行链头切换和同步。数据的进入在 build 的最后一步骤:
// Full node API / service startup
ApplyIf(isFullNode,
Override(new(messagepool.Provider), messagepool.NewProvider),
Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))),
Override(new(full.ChainModuleAPI), From(new(full.ChainModule))),
Override(new(full.GasModuleAPI), From(new(full.GasModule))),
Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))),
Override(new(full.StateModuleAPI), From(new(full.StateModule))),
Override(new(stmgr.StateManagerAPI), From(new(*stmgr.StateManager))),
Override(RunHelloKey, modules.RunHello),
Override(RunChainExchangeKey, modules.RunChainExchange),
Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), // 传入的消息处理
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), // 传入的block处理
),
HandleIncomingMessages 主要就是起了一个消息的订阅,和同步有关的是 HandleIncomingBlocks。
HandleIncomingBlocks
func HandleIncomingBlocks(mctx helpers.MetricsCtx,
lc fx.Lifecycle,
ps *pubsub.PubSub,
s *chain.Syncer,
bserv dtypes.ChainBlockService,
chain *store.ChainStore,
cns consensus.Consensus,
h host.Host,
nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc)
v := sub.NewBlockValidator(
h.ID(), chain, cns,
func(p peer.ID) {
ps.BlacklistPeer(p)
h.ConnManager().TagPeer(p, "badblock", -1000)
})
if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
panic(err)
}
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}
// 实际逻辑
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
// BlockDelaySecs=30 PropagationDelaySecs=18
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
for {
// 获取信息
msg, err := bsub.Next(ctx)
if err != nil {
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err)
continue
}
// 验证合法性
blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
return
}
// 获取 peerID
src := msg.GetFrom()
go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := bserv.NewSession(ctx, bs)
start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub")
// 通过block消息,获取普通消息
bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubsub: %s; source: %s", err, src)
return
}
// 通过block消息,获取加签普通消息
smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubsub: %s; source: %s", err, src)
return
}
took := build.Clock.Since(start)
log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
// 获取消息过程超过3秒会警告
if took > 3*time.Second {
log.Warnw("Slow msg fetch", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
}
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Insert(metrics.MinerID, blk.Header.Miner.String())},
metrics.BlockDelay.M(delay),
)
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
}
// 告诉 Syncer 可以做链路切换了
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}) {
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
}
}()
}
}
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) bool {
// TODO: search for other blocks that could form a tipset with this block
// and then send that tipset to InformNewHead
fts := &store.FullTipSet{Blocks: []*types.FullBlock{blk}}
return syncer.InformNewHead(from, fts)
}
// InformNewHead informs the syncer about a new potential tipset
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
defer func() {
if err := recover(); err != nil {
log.Errorf("panic in InformNewHead: %s", err)
}
}()
ctx := context.Background()
if fts == nil {
log.Errorf("got nil tipset in InformNewHead")
return false
}
// 高度判断
if syncer.consensus.IsEpochBeyondCurrMax(fts.TipSet().Height()) {
log.Errorf("Received block with impossibly large height %d", fts.TipSet().Height())
return false
}
for _, b := range fts.Blocks {
// syncer.bad 是 BadBlockCache,已知的非法的tipset
if reason, ok := syncer.bad.Has(b.Cid()); ok {
log.Warnf("InformNewHead called on block marked as bad: %s (reason: %s)", b.Cid(), reason)
return false
}
// 消息合法性检测
if err := syncer.ValidateMsgMeta(b); err != nil {
log.Warnf("invalid block received: %s", err)
return false
}
}
syncer.incoming.Pub(fts.TipSet().Blocks(), LocalIncoming)
// TODO: IMPORTANT(GARBAGE) this needs to be put in the 'temporary' side of
// the blockstore
if err := syncer.store.PersistBlockHeaders(fts.TipSet().Blocks()...); err != nil {
log.Warn("failed to persist incoming block header: ", err)
return false
}
syncer.Exchange.AddPeer(from)
hts := syncer.store.GetHeaviestTipSet()
bestPweight := hts.ParentWeight()
targetWeight := fts.TipSet().ParentWeight()
if targetWeight.LessThan(bestPweight) {
var miners []string
for _, blk := range fts.TipSet().Blocks() {
miners = append(miners, blk.Miner.String())
}
log.Debugw("incoming tipset does not appear to be better than our best chain, ignoring for now", "miners", miners, "bestPweight", bestPweight, "bestTS", hts.Cids(), "incomingWeight", targetWeight, "incomingTS", fts.TipSet().Cids())
return false
}
// 通过 SetPeerHead,将peerID和tipset组成 peerHead,传递给 sm.workq
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
return true
}
func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
select {
case sm.workq <- peerHead{p: p, ts: ts}:
case <-sm.ctx.Done():
case <-ctx.Done():
}
}
lotus 同步相关问题
根据某个cid查询lotus日志
# 正常情况
# 第一次接到某个peer传来的tipset(当前仅一个cid)
2021-11-26T09:44:30.657+0800 INFO chain chain/sync_manager.go:232 selected sync target: [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg]
# 催生一个新的 worker,然后进行 Syncer.Sync 操作,在这里之后会在 journal 里面记录一条 head_change event
2021-11-26T09:44:30.657+0800 INFO chain chain/sync_manager.go:314 worker 261121 syncing in [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg]
# 这里已经确定是当前最重的链,已经写入到本地存储里
2021-11-26T09:44:31.213+0800 INFO chainstore store/store.go:646 New heaviest tipset! [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg bafy2bzaceddzpy3fz3esn6wkuf2pb4mr6mbjihu7owm4qtldt5fcjkgb4avdu bafy2bzacecljlo2drgyte4wbnbspv54naf3helyzzbtsxtohuetb2erogbx2s bafy2bzacebxs3q4jky6uuresywstvbskuolbof7zc5rw7u73kaiu5i5klrymu] (height=1319489)
# 这里又收到一个tipset(有4个cid)
2021-11-26T09:44:36.391+0800 INFO chain chain/sync_manager.go:232 selected sync target: [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg bafy2bzaceddzpy3fz3esn6wkuf2pb4mr6mbjihu7owm4qtldt5fcjkgb4avdu bafy2bzacecljlo2drgyte4wbnbspv54naf3helyzzbtsxtohuetb2erogbx2s bafy2bzacebxs3q4jky6uuresywstvbskuolbof7zc5rw7u73kaiu5i5klrymu]
2021-11-26T09:44:36.391+0800 INFO chain chain/sync_manager.go:314 worker 261122 syncing in [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg bafy2bzaceddzpy3fz3esn6wkuf2pb4mr6mbjihu7owm4qtldt5fcjkgb4avdu bafy2bzacecljlo2drgyte4wbnbspv54naf3helyzzbtsxtohuetb2erogbx2s bafy2bzacebxs3q4jky6uuresywstvbskuolbof7zc5rw7u73kaiu5i5klrymu]
# 常见警告
1.
2021-11-26T09:44:36.391+0800 WARN sub sub/incoming.go:88 Slow msg fetch {"cid": "某个数据", "delay": 4, "miner": "f01234"}
if took > 3*time.Second {
log.Warnw("Slow msg fetch", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
}
在获取block的消息时超时三秒
2.
2021-11-26T09:44:36.391+0800 WARN sub sub/incoming.go:95 received block with large delay from mine {"block": "bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg", "delay": 18, "miner": "f01234"}
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Insert(metrics.MinerID, blk.Header.Miner.String())},
metrics.BlockDelay.M(delay),
)
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
}
当前逻辑是整个处理过程超过该cid的时间戳5秒钟就会警告,然后打包给Syncer.Sync处理。时间戳一般就是上一个半分钟。假设获取消息的时间非常快,那么这里的delay就是块消息发布的时间差。