lotus/lotus-soup/stats/rpc.go
Anton Evangelatov 84bc071179 wip
2020-07-03 15:38:19 +02:00

106 lines
2.1 KiB
Go

package stats
import (
"context"
"time"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
func GetTips(ctx context.Context, api api.FullNode, lastHeight abi.ChainEpoch, headlag int) (<-chan *types.TipSet, error) {
chmain := make(chan *types.TipSet)
hb := NewHeadBuffer(headlag)
notif, err := api.ChainNotify(ctx)
if err != nil {
return nil, err
}
go func() {
defer close(chmain)
ping := time.Tick(30 * time.Second)
for {
select {
case changes := <-notif:
for _, change := range changes {
log.Infow("Head event", "height", change.Val.Height(), "type", change.Type)
switch change.Type {
case store.HCCurrent:
tipsets, err := loadTipsets(ctx, api, change.Val, lastHeight)
if err != nil {
log.Info(err)
return
}
for _, tipset := range tipsets {
chmain <- tipset
}
case store.HCApply:
if out := hb.Push(change); out != nil {
chmain <- out.Val
}
case store.HCRevert:
hb.Pop()
}
}
case <-ping:
log.Info("Running health check")
cctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if _, err := api.ID(cctx); err != nil {
log.Error("Health check failed")
cancel()
return
}
cancel()
log.Info("Node online")
case <-ctx.Done():
return
}
}
}()
return chmain, nil
}
func loadTipsets(ctx context.Context, api api.FullNode, curr *types.TipSet, lowestHeight abi.ChainEpoch) ([]*types.TipSet, error) {
tipsets := []*types.TipSet{}
for {
if curr.Height() == 0 {
break
}
if curr.Height() <= lowestHeight {
break
}
log.Infow("Walking back", "height", curr.Height())
tipsets = append(tipsets, curr)
tsk := curr.Parents()
prev, err := api.ChainGetTipSet(ctx, tsk)
if err != nil {
return tipsets, err
}
curr = prev
}
for i, j := 0, len(tipsets)-1; i < j; i, j = i+1, j-1 {
tipsets[i], tipsets[j] = tipsets[j], tipsets[i]
}
return tipsets, nil
}