lotus/tools/stats/rpc.go
Steven Allen 5733c71c50 Lint everything
We were ignoring quite a few error cases, and had one case where we weren't
actually updating state where we wanted to. Unfortunately, if the linter doesn't
pass, nobody has any reason to actually check lint failures in CI.

There are three remaining XXXs marked in the code for lint.
2020-08-20 20:46:36 -07:00

226 lines
4.9 KiB
Go

package stats
import (
"context"
"net/http"
"time"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/specs-actors/actors/abi"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/repo"
)
func getAPI(path string) (string, http.Header, error) {
r, err := repo.NewFS(path)
if err != nil {
return "", nil, err
}
ma, err := r.APIEndpoint()
if err != nil {
return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err)
}
_, addr, err := manet.DialArgs(ma)
if err != nil {
return "", nil, err
}
var headers http.Header
token, err := r.APIToken()
if err != nil {
log.Warnw("Couldn't load CLI token, capabilities may be limited", "error", err)
} else {
headers = http.Header{}
headers.Add("Authorization", "Bearer "+string(token))
}
return "ws://" + addr + "/rpc/v0", headers, nil
}
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
sync_complete:
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-build.Clock.After(5 * time.Second):
state, err := napi.SyncState(ctx)
if err != nil {
return err
}
for i, w := range state.ActiveSyncs {
if w.Target == nil {
continue
}
if w.Stage == api.StageSyncErrored {
log.Errorw(
"Syncing",
"worker", i,
"base", w.Base.Key(),
"target", w.Target.Key(),
"target_height", w.Target.Height(),
"height", w.Height,
"error", w.Message,
"stage", chain.SyncStageString(w.Stage),
)
} else {
log.Infow(
"Syncing",
"worker", i,
"base", w.Base.Key(),
"target", w.Target.Key(),
"target_height", w.Target.Height(),
"height", w.Height,
"stage", chain.SyncStageString(w.Stage),
)
}
if w.Stage == api.StageSyncComplete {
break sync_complete
}
}
}
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-build.Clock.After(5 * time.Second):
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
timestampDelta := build.Clock.Now().Unix() - int64(head.MinTimestamp())
log.Infow(
"Waiting for reasonable head height",
"height", head.Height(),
"timestamp_delta", timestampDelta,
)
// If we get within 20 blocks of the current exected block height we
// consider sync complete. Block propagation is not always great but we still
// want to be recording stats as soon as we can
if timestampDelta < int64(build.BlockDelaySecs)*20 {
return nil
}
}
}
}
func GetTips(ctx context.Context, api api.FullNode, lastHeight abi.ChainEpoch, headlag int) (<-chan *types.TipSet, error) {
chmain := make(chan *types.TipSet)
hb := newHeadBuffer(headlag)
notif, err := api.ChainNotify(ctx)
if err != nil {
return nil, err
}
go func() {
defer close(chmain)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case changes := <-notif:
for _, change := range changes {
log.Infow("Head event", "height", change.Val.Height(), "type", change.Type)
switch change.Type {
case store.HCCurrent:
tipsets, err := loadTipsets(ctx, api, change.Val, lastHeight)
if err != nil {
log.Info(err)
return
}
for _, tipset := range tipsets {
chmain <- tipset
}
case store.HCApply:
if out := hb.push(change); out != nil {
chmain <- out.Val
}
case store.HCRevert:
hb.pop()
}
}
case <-ticker.C:
log.Info("Running health check")
cctx, cancel := context.WithTimeout(ctx, 5*time.Second)
if _, err := api.ID(cctx); err != nil {
log.Error("Health check failed")
cancel()
return
}
cancel()
log.Info("Node online")
case <-ctx.Done():
return
}
}
}()
return chmain, nil
}
func loadTipsets(ctx context.Context, api api.FullNode, curr *types.TipSet, lowestHeight abi.ChainEpoch) ([]*types.TipSet, error) {
tipsets := []*types.TipSet{}
for {
if curr.Height() == 0 {
break
}
if curr.Height() <= lowestHeight {
break
}
log.Infow("Walking back", "height", curr.Height())
tipsets = append(tipsets, curr)
tsk := curr.Parents()
prev, err := api.ChainGetTipSet(ctx, tsk)
if err != nil {
return tipsets, err
}
curr = prev
}
for i, j := 0, len(tipsets)-1; i < j; i, j = i+1, j-1 {
tipsets[i], tipsets[j] = tipsets[j], tipsets[i]
}
return tipsets, nil
}
func GetFullNodeAPI(repo string) (api.FullNode, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(repo)
if err != nil {
return nil, nil, err
}
return client.NewFullNodeRPC(addr, headers)
}