lotus/tools/stats/sync/sync.go
2023-11-15 13:06:51 +01:00

193 lines
4.3 KiB
Go

package sync
import (
"context"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/tools/stats/headbuffer"
)
type SyncWaitApi interface {
SyncState(context.Context) (*api.SyncState, error)
ChainHead(context.Context) (*types.TipSet, error)
}
// SyncWait returns when ChainHead is within 20 epochs of the expected height
func SyncWait(ctx context.Context, napi SyncWaitApi) error {
for {
state, err := napi.SyncState(ctx)
if err != nil {
return err
}
if len(state.ActiveSyncs) == 0 {
build.Clock.Sleep(time.Second)
continue
}
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
working := -1
for i, ss := range state.ActiveSyncs {
switch ss.Stage {
case api.StageSyncComplete:
default:
working = i
case api.StageIdle:
// not complete, not actively working
}
}
if working == -1 {
working = len(state.ActiveSyncs) - 1
}
ss := state.ActiveSyncs[working]
if ss.Base == nil || ss.Target == nil {
log.Infow(
"syncing",
"height", ss.Height,
"stage", ss.Stage.String(),
)
} else {
log.Infow(
"syncing",
"base", ss.Base.Key(),
"target", ss.Target.Key(),
"target_height", ss.Target.Height(),
"height", ss.Height,
"stage", ss.Stage.String(),
)
}
if build.Clock.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs)*30 {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-build.Clock.After(time.Duration(int64(build.BlockDelaySecs) * int64(time.Second))):
}
}
return nil
}
type BufferedTipsetChannelApi interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
Version(context.Context) (api.APIVersion, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
}
// BufferedTipsetChannel returns an unbuffered channel of tipsets. Buffering occurs internally to handle revert
// ChainNotify changes. The returned channel can output tipsets at the same height twice if a reorg larger the the
// provided `size` occurs.
func BufferedTipsetChannel(ctx context.Context, api BufferedTipsetChannelApi, lastHeight abi.ChainEpoch, size int) (<-chan *types.TipSet, error) {
chmain := make(chan *types.TipSet)
hb := headbuffer.NewHeadChangeStackBuffer(size)
notif, err := api.ChainNotify(ctx)
if err != nil {
return nil, err
}
go func() {
defer close(chmain)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case changes, ok := <-notif:
if !ok {
return
}
for _, change := range changes {
log.Debugw("head event", "height", change.Val.Height(), "type", change.Type)
switch change.Type {
case store.HCCurrent:
tipsets, err := loadTipsets(ctx, api, change.Val, lastHeight)
if err != nil {
log.Info(err)
return
}
for _, tipset := range tipsets {
chmain <- tipset
}
case store.HCApply:
if out := hb.Push(change); out != nil {
chmain <- out.Val
}
case store.HCRevert:
hb.Pop()
}
}
case <-ticker.C:
log.Debug("running health check")
cctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if _, err := api.Version(cctx); err != nil {
log.Error("health check failed")
cancel()
return
}
cancel()
log.Debug("node online")
case <-ctx.Done():
return
}
}
}()
return chmain, nil
}
func loadTipsets(ctx context.Context, api BufferedTipsetChannelApi, curr *types.TipSet, lowestHeight abi.ChainEpoch) ([]*types.TipSet, error) {
log.Infow("loading tipsets", "to_height", lowestHeight, "from_height", curr.Height())
tipsets := []*types.TipSet{}
for {
if curr.Height() == 0 {
break
}
if curr.Height() <= lowestHeight {
break
}
log.Debugw("walking back", "height", curr.Height())
tipsets = append(tipsets, curr)
tsk := curr.Parents()
prev, err := api.ChainGetTipSet(ctx, tsk)
if err != nil {
return tipsets, err
}
curr = prev
}
for i, j := 0, len(tipsets)-1; i < j; i, j = i+1, j-1 {
tipsets[i], tipsets[j] = tipsets[j], tipsets[i]
}
return tipsets, nil
}