Lotus 消息同步过程

2021/06/30

lotus 消息同步过程

lotus 在创建过程启动了什么服务?

消息同步需要 chain 组件,chain 组件的运行需要依赖 libp2p 的组件。在 lotus/node/builder_chain.go 中提到了 chain 组件的构造过程:

  • 检测配置
  • 网络引导
  • 检测加密依赖
  • VM创建
  • 链存储以及链状态管理器启动
  • 链同步服务启动
  • 链网络接入p2p
  • 链与矿工的API依赖
  • 消息池服务启动
  • 钱包服务启动
  • 交易频道服务开启
  • 三类市场服务开启
  • 上述服务的部分方法根据 lotus 节点类型的区别选择是否启动

关于同步相关的逻辑,主要是在 “链同步服务启动” 和 “上述服务的部分方法根据 lotus 节点类型的区别选择是否启动(数据进入)”。只有 lotus 节点为 FULLNODE 时才会有链同步。

lotus 同步

  • 启动 libp2p 服务
  • 将 Syncer.Sync 作为参数创建一个 Sync.SyncManager
  • 通过 NewSyncer+Sync.SyncManager 创建一个 Syncer
  • 启动 Syncer,启动 Sync.SyncManager
  • 开启对新网络的消息订阅(这里就会提到,如果在强制升级中没有升级,那么网络名不会更新,就不能获得最新的出块信息)
  • 对过来的消息进行 Sync 处理

NewSyncer

在创建新的同步器之前先创建了 SyncManagerCtor, 它的类型如下所示,在 Syncer 创建中会使用到,它的主要作用就是返回 SyncManager,但生成过程中需要用到 Syncer 的 Sync 实现,这里有点回调的意思。

type SyncFunc func(context.Context, *types.TipSet) error
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
// NewSyncer creates a new Syncer object.
func NewSyncer(ds dtypes.MetadataDS,
	sm *stmgr.StateManager,
	exchange exchange.Client,
	syncMgrCtor SyncManagerCtor,
	connmgr connmgr.ConnManager,
	self peer.ID,
	beacon beacon.Schedule,
	gent Genesis,
	consensus consensus.Consensus) (*Syncer, error) {

	s := &Syncer{
		ds:             ds,  // 元数据存储
		beacon:         beacon,  // 信标点切片
		bad:            NewBadBlockCache(),  // 
		Genesis:        gent,  // 创世tipset
		consensus:      consensus,  // 共识接口
		Exchange:       exchange,  // 基于libp2p的Exchange.Client
		store:          sm.ChainStore(),  // 链存储
		sm:             sm,  // 链管理器
		self:           self,  // 自身的peerID
		receiptTracker: newBlockReceiptTracker(), // 
		connmgr:        connmgr,  // 自身host持有的连接管理

		incoming: pubsub.New(50),  // 启动一个容量为50的pubsub
	}

	s.syncmgr = syncMgrCtor(s.Sync)  // 这里把s.Sync作为参数获得一个 SyncManager,s.Sync 就是切换链头的逻辑。实际上进行切换用的是 s.syncmgr.doSync 方法。
	return s, nil
}

Syncer.Sync

这个 Syncer.Sync 非常重要。

func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
	ctx, span := trace.StartSpan(ctx, "chain.Sync")
	defer span.End()

	if span.IsRecordingEvents() {
		span.AddAttributes(
			trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
			trace.Int64Attribute("height", int64(maybeHead.Height())),
		)
	}

	hts := syncer.store.GetHeaviestTipSet()

	if hts.ParentWeight().GreaterThan(maybeHead.ParentWeight()) {
		return nil
	}
	if syncer.Genesis.Equals(maybeHead) || hts.Equals(maybeHead) {
		return nil
	}

	if err := syncer.collectChain(ctx, maybeHead, hts, false); err != nil {
		span.AddAttributes(trace.StringAttribute("col_error", err.Error()))
		span.SetStatus(trace.Status{
			Code:    13,
			Message: err.Error(),
		})
		return xerrors.Errorf("collectChain failed: %w", err)
	}

	// At this point we have accepted and synced to the new `maybeHead`
	// (`StageSyncComplete`).
	if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
		span.AddAttributes(trace.StringAttribute("put_error", err.Error()))
		span.SetStatus(trace.Status{
			Code:    13,
			Message: err.Error(),
		})
		return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
	}

	peers := syncer.receiptTracker.GetPeers(maybeHead)
	if len(peers) > 0 {
		syncer.connmgr.TagPeer(peers[0], "new-block", 40)

		for _, p := range peers[1:] {
			syncer.connmgr.TagPeer(p, "new-block", 25)
		}
	}

	return nil
}

上述的逻辑也是比较清晰的:

  • 如果当前的tipset的父权重比参数的tipset的父权重高,就直接返回nil,不进行操作
  • 同理如果当前tipset和参数tipset相同或者参数tipset和创世tipset相同,那么也直接返回nil,不进行操作
  • syncer.collectChain(ctx, maybeHead, hts, false) 进行链头的切换
  • syncer.store.PutTipSet(ctx, maybeHead) 完成切换后的操作
  • 对出块的peer进行赋值(体现在 lotus net score)

这中间最重要的就是链头切换逻辑。

// collectChain tries to advance our view of the chain to the purported head.
//
// It goes through various stages:
//
//  1. StageHeaders: we proceed in the sync process by requesting block headers
//     from our peers, moving back from their heads, until we reach a tipset
//     that we have in common (such a common tipset must exist, thought it may
//     simply be the genesis block).
//
//     If the common tipset is our head, we treat the sync as a "fast-forward",
//     else we must drop part of our chain to connect to the peer's head
//     (referred to as "forking").
//
//	2. StagePersistHeaders: now that we've collected the missing headers,
//     augmented by those on the other side of a fork, we persist them to the
//     BlockStore.
//
//  3. StageMessages: having acquired the headers and found a common tipset,
//     we then move forward, requesting the full blocks, including the messages.
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet, hts *types.TipSet, ignoreCheckpoint bool) error {
    // ts 可能的头tipset,hts 现有的头tipset
	ctx, span := trace.StartSpan(ctx, "collectChain")
	defer span.End()
	ss := extractSyncState(ctx)

    // 生成一个 syncState,并且配置base为当前头,target为目标头
	ss.Init(hts, ts)

    // 实际的判断哪个头tipset所在的链权重最高(tips:暂时不看,里面的实现比较复杂,其实就是不断的往前推,直到重合的tipset,然后计算链权重,一般不会很长(一般目标头tipset权重更高,不然压根走不到这里))
	headers, err := syncer.collectHeaders(ctx, ts, hts, ignoreCheckpoint)
	if err != nil {
		ss.Error(err)
		return err
	}

	span.AddAttributes(trace.Int64Attribute("syncChainLength", int64(len(headers))))

	if !headers[0].Equals(ts) {
		log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
	}

    // 下面几行是tipset的持久化保存
	ss.SetStage(api.StagePersistHeaders)

	toPersist := make([]*types.BlockHeader, 0, len(headers)*int(build.BlocksPerEpoch))
	for _, ts := range headers {
		toPersist = append(toPersist, ts.Blocks()...)
	}
	if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
		err = xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
		ss.Error(err)
		return err
	}
	toPersist = nil

	ss.SetStage(api.StageMessages)

    // 链进行了切换,消息需要重新确认并持久化
	if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
		err = xerrors.Errorf("collectChain syncMessages: %w", err)
		ss.Error(err)
		return err
	}

	ss.SetStage(api.StageSyncComplete)
	log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids()))

	return nil
}

链头切换完成后需要进行内部状态的变更,也就是 syncer.store.PutTipSet(ctx, maybeHead),重要的是里面的 MaybeTakeHeavierTipSet

func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
	for _, b := range ts.Blocks() {
		if err := cs.PersistBlockHeaders(b); err != nil {
			return err
		}
	}

	expanded, err := cs.expandTipset(ts.Blocks()[0])
	if err != nil {
		return xerrors.Errorf("errored while expanding tipset: %w", err)
	}
	log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids())

	if err := cs.MaybeTakeHeavierTipSet(ctx, expanded); err != nil {
		return xerrors.Errorf("MaybeTakeHeavierTipSet failed in PutTipSet: %w", err)
	}
	return nil
}
// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our
// internal state as our new head, if and only if it is heavier than the current
// head and does not exceed the maximum fork length.
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
	for {
		cs.heaviestLk.Lock()
        // reorgChBuf 为固定值32,cs.reorgCh 是一个长度为32的通道,它的结构体包含两个属性, old tipset 和 new tipset
		if len(cs.reorgCh) < reorgChBuf/2 {
			break
		}
		cs.heaviestLk.Unlock()
        // 这里是容易出现问题的地方,本质上就是 cs.reorgCh 没办法消化掉,具体的消化逻辑放后面说,可以搜索:
		log.Errorf("reorg channel is heavily backlogged, waiting a bit before trying to take process new tipsets")
		select {
		case <-time.After(time.Second / 2):
		case <-ctx.Done():
			return ctx.Err()
		}
	}

    // 下面代码主要就是做了个权重判断,如果目标tipset权重高的话,就会执行 cs.takeHeaviestTipSet(ctx, ts),这里面会有前面说到的 cs.reorgCh 的生产过程
	defer cs.heaviestLk.Unlock()
	w, err := cs.weight(ctx, cs.StateBlockstore(), ts)
	if err != nil {
		return err
	}
	heaviestW, err := cs.weight(ctx, cs.StateBlockstore(), cs.heaviest)
	if err != nil {
		return err
	}

	heavier := w.GreaterThan(heaviestW)
	if w.Equals(heaviestW) && !ts.Equals(cs.heaviest) {
		log.Errorw("weight draw", "currTs", cs.heaviest, "ts", ts)
		heavier = breakWeightTie(ts, cs.heaviest)
	}

	if heavier {
		// TODO: don't do this for initial sync. Now that we don't have a
		// difference between 'bootstrap sync' and 'caught up' sync, we need
		// some other heuristic.

		exceeds, err := cs.exceedsForkLength(cs.heaviest, ts)
		if err != nil {
			return err
		}
		if exceeds {
			return nil
		}

		return cs.takeHeaviestTipSet(ctx, ts)
	}

	return nil
}

cs.reorgCh 如何被消费的?

其实就在创建该结构的时候就指定了一个函数进行消费,实现在 lotus/chain/store/store.go:548:

func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNotifee) chan<- reorg {
	out := make(chan reorg, reorgChBuf)
	notifees := make([]ReorgNotifee, len(initialNotifees))
	copy(notifees, initialNotifees)

	cs.wg.Add(1)
	go func() {
		defer cs.wg.Done()
		defer log.Warn("reorgWorker quit")

		for {
			select {
			case n := <-cs.reorgNotifeeCh:
				notifees = append(notifees, n)

			case r := <-out:
                // 实际消费的方法
				revert, apply, err := cs.ReorgOps(r.old, r.new)
				if err != nil {
					log.Error("computing reorg ops failed: ", err)
					continue
				}
                // journal 里面 head_change 的数据记录,一个简单的例子:{"System":"sync","Event":"head_change","Timestamp":"2021-11-11T16:13:30.689264594+08:00","Data":{"From":[{"/":"bafy2bzacec5bq4rrn4eegxmd7o4sazskzoff5jv7vtzgv5dlvvj6zizt3v77k"},{"/":"bafy2bzacebsbatugdhaolromri7u66y4ddfgpbvo46s6l2eyjbsqnbeijzes6"},{"/":"bafy2bzaceb4rbcdkguslably3xhtnr2y24zo5d5anviswcpbt3l6fcxv3mrjm"},{"/":"bafy2bzacedc2s354eavxed5njvfpr76amo2xrw67wmp33mu672fwxc62scr5y"},{"/":"bafy2bzacedflehntxhkghi6kuhwvwsfk5e3a3lmpcuwrk2jbb7g6kthjaoy2y"},{"/":"bafy2bzacedg7icplqjg2vcyd4nye4zdmt2djp37bieqsga4alb3i4pylvb2ei"},{"/":"bafy2bzacecx54ljem5274tdh5qld7gkurk25eyokposiaacvg5cy4mqfai4wu"}],"FromHeight":1277067,"To":[{"/":"bafy2bzacec5bq4rrn4eegxmd7o4sazskzoff5jv7vtzgv5dlvvj6zizt3v77k"},{"/":"bafy2bzacebsbatugdhaolromri7u66y4ddfgpbvo46s6l2eyjbsqnbeijzes6"},{"/":"bafy2bzaceb4rbcdkguslably3xhtnr2y24zo5d5anviswcpbt3l6fcxv3mrjm"},{"/":"bafy2bzacedc2s354eavxed5njvfpr76amo2xrw67wmp33mu672fwxc62scr5y"},{"/":"bafy2bzacedflehntxhkghi6kuhwvwsfk5e3a3lmpcuwrk2jbb7g6kthjaoy2y"},{"/":"bafy2bzaceddhdwajwq7xl2c55l6scrimd7uzqgncwgmgslouefqdicyha6oiu"},{"/":"bafy2bzacedg7icplqjg2vcyd4nye4zdmt2djp37bieqsga4alb3i4pylvb2ei"},{"/":"bafy2bzacecx54ljem5274tdh5qld7gkurk25eyokposiaacvg5cy4mqfai4wu"}],"ToHeight":1277067,"RevertCount":1,"ApplyCount":1}}
                // 查询方法为:jq -r 'select(.Event == "head_change" and .Data.ApplyCount > 1 and .Data.RevertCount > 0) | "\(.Timestamp), \(.Data.ApplyCount), \(.Data.RevertCount), \(.Data.FromHeight)"' repo/journal/latest.ndjson
				cs.journal.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
					return HeadChangeEvt{
						From:        r.old.Key(),
						FromHeight:  r.old.Height(),
						To:          r.new.Key(),
						ToHeight:    r.new.Height(),
						RevertCount: len(revert),
						ApplyCount:  len(apply),
					}
				})
                ...
			}
		}
	}()
	return out
}

Sync.SyncManager

上面说了下 Syncer.Sync 具体的实现,其实它在同步整个过程中可能只占了1/3的篇幅,它说明了有了目标tipset后的操作,那么如何发现新的目标tipset?这一节我们回到它的上一层:Sync.SyncManager。

// sync manager interface
func NewSyncManager(sync SyncFunc) SyncManager {
	ctx, cancel := context.WithCancel(context.Background())
	return &syncManager{
		ctx:    ctx,
		cancel: cancel,

		workq:   make(chan peerHead),
		statusq: make(chan workerStatus),

		heads:   make(map[peer.ID]*types.TipSet),
		state:   make(map[uint64]*workerState),
		recent:  newSyncBuffer(RecentSyncBufferSize),
		history: make([]*workerState, SyncWorkerHistory),

		doSync: sync,
	}
}

type peerHead struct {
	p  peer.ID
	ts *types.TipSet
}

type workerStatus struct {
	id  uint64
	err error
}

// 实际进行同步的函数
type SyncFunc func(context.Context, *types.TipSet) error
// 同步的管理器的生成器,传入同步函数,返回同步管理器
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
// 同步管理器
type SyncManager interface {
	// Start starts the SyncManager.
	Start()

	// Stop stops the SyncManager.
	Stop()

	// SetPeerHead informs the SyncManager that the supplied peer reported the
	// supplied tipset.
	SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)

	// State retrieves the state of the sync workers.
	State() []SyncerStateSnapshot
}

// 同步管理器的生成(这里的s.Sync 在上文大篇幅提到)
s.syncmgr = syncMgrCtor(s.Sync)

OK,到了上一步骤,syncmgr就创建完成了,后面等待 Syncer 启动。

func (syncer *Syncer) Start() {
	tickerCtx, tickerCtxCancel := context.WithCancel(context.Background())
    // syncmgr 的 启动
	syncer.syncmgr.Start()

	syncer.tickerCtxCancel = tickerCtxCancel

	go syncer.runMetricsTricker(tickerCtx)
}
func (sm *syncManager) Start() {
	go sm.scheduler()
}

func (sm *syncManager) scheduler() {
	ticker := time.NewTicker(time.Minute)
	tickerC := ticker.C
	for {
		select {
		case head := <-sm.workq:
            // 根据peerHead生成worker,进行同步操作
			sm.handlePeerHead(head)
		case status := <-sm.statusq:
            // 根据worker同步后的状态进行操作,删除状态等操作
			sm.handleWorkerStatus(status)
		case <-tickerC:
            // 在上述的handleWorkerStatus中,如果worker同步花费的时间少于15分钟,那么就可以理解为初始化完成了,就会执行(仅执行一次)下面逻辑
			if sm.initialSyncDone {
				ticker.Stop()
				tickerC = nil
				sm.handleInitialSyncDone()
			}
		case <-sm.ctx.Done():
			return
		}
	}
}

上述逻辑中,重要的就是三个handle。

handlePeerHead

func (sm *syncManager) handlePeerHead(head peerHead) {
	log.Debugf("new peer head: %s %s", head.p, head.ts)

	// have we started syncing yet?
    // sm.nextWorker 在生成worker的时候才会改变,一开始就是要进去
	if sm.nextWorker == 0 {
		// track the peer head until we start syncing
		sm.heads[head.p] = head.ts

		// not yet; do we have enough peers?
        // BootstrapPeerThreshold = 4,必须要凑够4个peerHead才会往下走
		if len(sm.heads) < BootstrapPeerThreshold {
			log.Debugw("not tracking enough peers to start sync worker", "have", len(sm.heads), "need", BootstrapPeerThreshold)
			// not enough peers; track it and wait
			return
		}

		// we are ready to start syncing; select the sync target and spawn a worker
        // 选择最优的同步目标,还是在算权重,后面单独说
		target, err := sm.selectInitialSyncTarget()
		if err != nil {
			log.Errorf("failed to select initial sync target: %s", err)
			return
		}

		log.Infof("selected initial sync target: %s", target)
        // 实际生产worker的方法,在这里面会调用 Syncer.Sync 进行链头切换
		sm.spawnWorker(target)
		return
	}

	// we have started syncing, add peer head to the queue if applicable and maybe spawn a worker
	// if there is work to do (possibly in a fork)
    // 在初始化同步完成后会进入到这个函数,这里面主要做的事情就是判断peerHead是不是在缓存池里,根据实际情况返回数据,后面再判断要不要新开worker
	target, work, err := sm.addSyncTarget(head.ts)
	if err != nil {
		log.Warnf("failed to add sync target: %s", err)
		return
	}

	if work {
		log.Infof("selected sync target: %s", target)
		sm.spawnWorker(target)
	}
}
func (sm *syncManager) spawnWorker(target *types.TipSet) {
    // 通过 lotus sync status 看到的第一行就是这个 worker id
	id := sm.nextWorker
	sm.nextWorker++
	ws := &workerState{
		id: id,
		ts: target,
		ss: new(SyncerState),
	}
	ws.ss.data.WorkerID = id

	sm.mx.Lock()
	sm.state[id] = ws
	sm.mx.Unlock()

	go sm.worker(ws)
}

func (sm *syncManager) worker(ws *workerState) {
	log.Infof("worker %d syncing in %s", ws.id, ws.ts)

	start := build.Clock.Now()

	ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss)
    // 这个doSync 就是 Syncer.Sync 函数
	err := sm.doSync(ctx, ws.ts)

	ws.dt = build.Clock.Since(start)
	log.Infof("worker %d done; took %s", ws.id, ws.dt)
	select {
        // 将worker运行状态返回给 sm.statusq,这就要提到handleWorkerStatus了
	case sm.statusq <- workerStatus{id: ws.id, err: err}:
	case <-sm.ctx.Done():
	}
}

handleWorkerStatus

func (sm *syncManager) handleWorkerStatus(status workerStatus) {
	log.Debugf("worker %d done; status error: %s", status.id, status.err)

    // 清理状态map
	sm.mx.Lock()
	ws := sm.state[status.id]
	delete(sm.state, status.id)

	// we track the last few workers for debug purposes
	sm.history[sm.historyI] = ws
	sm.historyI++
	sm.historyI %= len(sm.history)
	sm.mx.Unlock()

	if status.err != nil {
		// we failed to sync this target -- log it and try to work on an extended chain
		// if there is nothing related to be worked on, we stop working on this chain.
		log.Errorf("error during sync in %s: %s", ws.ts, status.err)
	} else {
		// add to the recently synced buffer
		sm.recent.Push(ws.ts)
		// if we are still in initial sync and this was fast enough, mark the end of the initial sync
        // 这里是唯一可以把sm.initialSyncDone指定为true的地方,在某个worker工作完毕后,且完成时间小于15分钟,就会进到这个逻辑里
		if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold {
			sm.initialSyncDone = true
		}
	}

	// we are done with this target, select the next sync target and spawn a worker if there is work
	// to do, because of an extension of this chain.
    // 这个handle进入的是比较频繁的,所以在结束的时候也需要做点生成worker的工作
	target, work, err := sm.selectSyncTarget(ws.ts)
	if err != nil {
		log.Warnf("failed to select sync target: %s", err)
		return
	}

	if work {
		log.Infof("selected sync target: %s", target)
		sm.spawnWorker(target)
	}
}

handleInitialSyncDone

// 在同步开始一分钟后,且初始化同步已经完成了,就会进入到这个函数里,他的逻辑也很清晰,就是要生成至少 4(MaxSyncWorkers=5) 个worker,用来将整个同步流动起来
func (sm *syncManager) handleInitialSyncDone() {
	// we have just finished the initial sync; spawn some additional workers in deferred syncs
	// as needed (and up to MaxSyncWorkers) to ramp up chain sync
	for len(sm.state) < MaxSyncWorkers {
		target, work, err := sm.selectDeferredSyncTarget()
		if err != nil {
			log.Errorf("error selecting deferred sync target: %s", err)
			return
		}

		if !work {
			return
		}

		log.Infof("selected deferred sync target: %s", target)
		sm.spawnWorker(target)
	}
}

数据进入

上面所有的逻辑都需要基于 sm.workq 有数据进来,也就是 peerHead 进来。sm.workq 有数据了就生成 worker,worker 可以调用 Syncer.Sync 进行链头切换和同步。数据的进入在 build 的最后一步骤:

// Full node API / service startup
	ApplyIf(isFullNode,
		Override(new(messagepool.Provider), messagepool.NewProvider),
		Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))),
		Override(new(full.ChainModuleAPI), From(new(full.ChainModule))),
		Override(new(full.GasModuleAPI), From(new(full.GasModule))),
		Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))),
		Override(new(full.StateModuleAPI), From(new(full.StateModule))),
		Override(new(stmgr.StateManagerAPI), From(new(*stmgr.StateManager))),

		Override(RunHelloKey, modules.RunHello),
		Override(RunChainExchangeKey, modules.RunChainExchange),
		Override(RunPeerMgrKey, modules.RunPeerMgr),
		Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),  // 传入的消息处理
		Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),  // 传入的block处理
	),

HandleIncomingMessages 主要就是起了一个消息的订阅,和同步有关的是 HandleIncomingBlocks。

HandleIncomingBlocks

func HandleIncomingBlocks(mctx helpers.MetricsCtx,
	lc fx.Lifecycle,
	ps *pubsub.PubSub,
	s *chain.Syncer,
	bserv dtypes.ChainBlockService,
	chain *store.ChainStore,
	cns consensus.Consensus,
	h host.Host,
	nn dtypes.NetworkName) {
	ctx := helpers.LifecycleCtx(mctx, lc)

	v := sub.NewBlockValidator(
		h.ID(), chain, cns,
		func(p peer.ID) {
			ps.BlacklistPeer(p)
			h.ConnManager().TagPeer(p, "badblock", -1000)
		})

	if err := ps.RegisterTopicValidator(build.BlocksTopic(nn), v.Validate); err != nil {
		panic(err)
	}

	log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))

	blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
	if err != nil {
		panic(err)
	}

    // 实际逻辑
	go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
	// Timeout after (block time + propagation delay). This is useless at
	// this point.
    // BlockDelaySecs=30 PropagationDelaySecs=18
	timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second

	for {
        // 获取信息
		msg, err := bsub.Next(ctx)
		if err != nil {
			if ctx.Err() != nil {
				log.Warn("quitting HandleIncomingBlocks loop")
				return
			}
			log.Error("error from block subscription: ", err)
			continue
		}

        // 验证合法性
		blk, ok := msg.ValidatorData.(*types.BlockMsg)
		if !ok {
			log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
			return
		}

        // 获取 peerID
		src := msg.GetFrom()

		go func() {
			ctx, cancel := context.WithTimeout(ctx, timeout)
			defer cancel()

			// NOTE: we could also share a single session between
			// all requests but that may have other consequences.
			ses := bserv.NewSession(ctx, bs)

			start := build.Clock.Now()
			log.Debug("about to fetch messages for block from pubsub")
            // 通过block消息,获取普通消息
			bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
			if err != nil {
				log.Errorf("failed to fetch all bls messages for block received over pubsub: %s; source: %s", err, src)
				return
			}
            // 通过block消息,获取加签普通消息
			smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
			if err != nil {
				log.Errorf("failed to fetch all secpk messages for block received over pubsub: %s; source: %s", err, src)
				return
			}

			took := build.Clock.Since(start)
			log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
            // 获取消息过程超过3秒会警告
			if took > 3*time.Second {
				log.Warnw("Slow msg fetch", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
			}
			if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
				_ = stats.RecordWithTags(ctx,
					[]tag.Mutator{tag.Insert(metrics.MinerID, blk.Header.Miner.String())},
					metrics.BlockDelay.M(delay),
				)
				log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
			}
            
            // 告诉 Syncer 可以做链路切换了
			if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
				Header:        blk.Header,
				BlsMessages:   bmsgs,
				SecpkMessages: smsgs,
			}) {
				cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
			}
		}()
	}
}
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) bool {
	// TODO: search for other blocks that could form a tipset with this block
	// and then send that tipset to InformNewHead

	fts := &store.FullTipSet{Blocks: []*types.FullBlock{blk}}
	return syncer.InformNewHead(from, fts)
}

// InformNewHead informs the syncer about a new potential tipset
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
	defer func() {
		if err := recover(); err != nil {
			log.Errorf("panic in InformNewHead: %s", err)
		}
	}()

	ctx := context.Background()
	if fts == nil {
		log.Errorf("got nil tipset in InformNewHead")
		return false
	}
    // 高度判断
	if syncer.consensus.IsEpochBeyondCurrMax(fts.TipSet().Height()) {
		log.Errorf("Received block with impossibly large height %d", fts.TipSet().Height())
		return false
	}

	for _, b := range fts.Blocks {
        // syncer.bad 是 BadBlockCache,已知的非法的tipset
		if reason, ok := syncer.bad.Has(b.Cid()); ok {
			log.Warnf("InformNewHead called on block marked as bad: %s (reason: %s)", b.Cid(), reason)
			return false
		}
        // 消息合法性检测
		if err := syncer.ValidateMsgMeta(b); err != nil {
			log.Warnf("invalid block received: %s", err)
			return false
		}
	}

	syncer.incoming.Pub(fts.TipSet().Blocks(), LocalIncoming)

	// TODO: IMPORTANT(GARBAGE) this needs to be put in the 'temporary' side of
	// the blockstore
	if err := syncer.store.PersistBlockHeaders(fts.TipSet().Blocks()...); err != nil {
		log.Warn("failed to persist incoming block header: ", err)
		return false
	}

	syncer.Exchange.AddPeer(from)

	hts := syncer.store.GetHeaviestTipSet()
	bestPweight := hts.ParentWeight()
	targetWeight := fts.TipSet().ParentWeight()
	if targetWeight.LessThan(bestPweight) {
		var miners []string
		for _, blk := range fts.TipSet().Blocks() {
			miners = append(miners, blk.Miner.String())
		}
		log.Debugw("incoming tipset does not appear to be better than our best chain, ignoring for now", "miners", miners, "bestPweight", bestPweight, "bestTS", hts.Cids(), "incomingWeight", targetWeight, "incomingTS", fts.TipSet().Cids())
		return false
	}

    // 通过 SetPeerHead,将peerID和tipset组成 peerHead,传递给 sm.workq
	syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
	return true
}

func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
	select {
	case sm.workq <- peerHead{p: p, ts: ts}:
	case <-sm.ctx.Done():
	case <-ctx.Done():
	}
}

lotus 同步相关问题

根据某个cid查询lotus日志

# 正常情况
# 第一次接到某个peer传来的tipset(当前仅一个cid)
2021-11-26T09:44:30.657+0800	INFO	chain	chain/sync_manager.go:232	selected sync target: [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg]
# 催生一个新的 worker,然后进行 Syncer.Sync 操作,在这里之后会在 journal 里面记录一条 head_change event
2021-11-26T09:44:30.657+0800	INFO	chain	chain/sync_manager.go:314	worker 261121 syncing in [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg]
# 这里已经确定是当前最重的链,已经写入到本地存储里
2021-11-26T09:44:31.213+0800	INFO	chainstore	store/store.go:646	New heaviest tipset! [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg bafy2bzaceddzpy3fz3esn6wkuf2pb4mr6mbjihu7owm4qtldt5fcjkgb4avdu bafy2bzacecljlo2drgyte4wbnbspv54naf3helyzzbtsxtohuetb2erogbx2s bafy2bzacebxs3q4jky6uuresywstvbskuolbof7zc5rw7u73kaiu5i5klrymu] (height=1319489)
# 这里又收到一个tipset(有4个cid)
2021-11-26T09:44:36.391+0800	INFO	chain	chain/sync_manager.go:232	selected sync target: [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg bafy2bzaceddzpy3fz3esn6wkuf2pb4mr6mbjihu7owm4qtldt5fcjkgb4avdu bafy2bzacecljlo2drgyte4wbnbspv54naf3helyzzbtsxtohuetb2erogbx2s bafy2bzacebxs3q4jky6uuresywstvbskuolbof7zc5rw7u73kaiu5i5klrymu]
2021-11-26T09:44:36.391+0800	INFO	chain	chain/sync_manager.go:314	worker 261122 syncing in [bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg bafy2bzaceddzpy3fz3esn6wkuf2pb4mr6mbjihu7owm4qtldt5fcjkgb4avdu bafy2bzacecljlo2drgyte4wbnbspv54naf3helyzzbtsxtohuetb2erogbx2s bafy2bzacebxs3q4jky6uuresywstvbskuolbof7zc5rw7u73kaiu5i5klrymu]
# 常见警告
1.
2021-11-26T09:44:36.391+0800	WARN	sub	sub/incoming.go:88	Slow msg fetch {"cid": "某个数据", "delay": 4, "miner": "f01234"}
if took > 3*time.Second {
				log.Warnw("Slow msg fetch", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
}
在获取block的消息时超时三秒

2.
2021-11-26T09:44:36.391+0800	WARN	sub	sub/incoming.go:95	received block with large delay from mine {"block": "bafy2bzacecd6hraitmcvlwdhw4otqqkyun7x3wq6ssf6gbt56ty5373qz55sg", "delay": 18, "miner": "f01234"} 
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
				_ = stats.RecordWithTags(ctx,
					[]tag.Mutator{tag.Insert(metrics.MinerID, blk.Header.Miner.String())},
					metrics.BlockDelay.M(delay),
				)
				log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
			}
当前逻辑是整个处理过程超过该cid的时间戳5秒钟就会警告,然后打包给Syncer.Sync处理。时间戳一般就是上一个半分钟。假设获取消息的时间非常快,那么这里的delay就是块消息发布的时间差。

Search

    Table of Contents