From 15eddf0c96313e53913e768bb7e20d27b401be25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 24 Sep 2020 13:35:45 +0200 Subject: [PATCH] Make sync wait nicer --- api/api_full.go | 2 ++ chain/vm/vm.go | 11 +++++++++++ cli/sync.go | 33 +++++++++++++++++++++++++++++++-- documentation/en/api-methods.md | 3 ++- node/impl/full/sync.go | 6 +++++- 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 772ce9250..6d2d0c7b5 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -725,6 +725,8 @@ type ActiveSync struct { type SyncState struct { ActiveSyncs []ActiveSync + + VMApplied uint64 } type SyncStateStage int diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 3bafbe090..54ea47698 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "reflect" + "sync/atomic" "time" block "github.com/ipfs/go-block-format" @@ -40,6 +41,12 @@ var log = logging.Logger("vm") var actorLog = logging.Logger("actors") var gasOnActorExec = newGasCharge("OnActorExec", 0, 0) +// stat counters +var ( + StatSends uint64 + StatApplied uint64 +) + // ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`. func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) { if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 { @@ -204,6 +211,8 @@ type ApplyRet struct { func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime, gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) { + defer atomic.AddUint64(&StatSends, 1) + st := vm.cstate origin := msg.From @@ -312,6 +321,7 @@ func checkMessage(msg *types.Message) error { func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { start := build.Clock.Now() + defer atomic.AddUint64(&StatApplied, 1) ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start) rt.finilizeGasTracing() return &ApplyRet{ @@ -331,6 +341,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, start := build.Clock.Now() ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage") defer span.End() + defer atomic.AddUint64(&StatApplied, 1) msg := cmsg.VMMessage() if span.IsRecordingEvents() { span.AddAttributes( diff --git a/cli/sync.go b/cli/sync.go index 3b4e2e9fb..bee87cf70 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -225,6 +225,16 @@ var syncCheckpointCmd = &cli.Command{ } func SyncWait(ctx context.Context, napi api.FullNode) error { + tick := time.Second / 4 + + lastLines := 0 + ticker := time.NewTicker(tick) + defer ticker.Stop() + + samples := 8 + i := 0 + var app, lastApp uint64 + for { state, err := napi.SyncState(ctx) if err != nil { @@ -266,7 +276,24 @@ func SyncWait(ctx context.Context, napi api.FullNode) error { heightDiff = 0 } - fmt.Printf("\r\x1b[2KWorker %d: Base Height: %d\tTarget Height: %d\t Height diff: %d\tTarget: %s\tState: %s\tHeight: %d", working, baseHeight, theight, heightDiff, target, ss.Stage, ss.Height) + for i := 0; i < lastLines; i++ { + fmt.Print("\r\x1b[2K\x1b[A") + } + + fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff) + fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height) + lastLines = 2 + + if i%samples == 0 { + lastApp = app + app = state.VMApplied + } + if i > 0 { + fmt.Printf("Validated %d messages (%d per second)\n", state.VMApplied, (app-lastApp)*uint64(time.Second/tick)/uint64(samples)) + lastLines++ + } + + _ = target // todo: maybe print? (creates a bunch of line wrapping issues with most tipsets) if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) { fmt.Println("\nDone!") @@ -277,7 +304,9 @@ func SyncWait(ctx context.Context, napi api.FullNode) error { case <-ctx.Done(): fmt.Println("\nExit by user") return nil - case <-build.Clock.After(1 * time.Second): + case <-ticker.C: } + + i++ } } diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index 22aa8e474..e489fcb0f 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -4303,7 +4303,8 @@ Inputs: `null` Response: ```json { - "ActiveSyncs": null + "ActiveSyncs": null, + "VMApplied": 42 } ``` diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index 31a707b90..dc3bfe230 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -2,6 +2,7 @@ package full import ( "context" + "sync/atomic" cid "github.com/ipfs/go-cid" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -13,6 +14,7 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/node/modules/dtypes" ) @@ -28,7 +30,9 @@ type SyncAPI struct { func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { states := a.Syncer.State() - out := &api.SyncState{} + out := &api.SyncState{ + VMApplied: atomic.LoadUint64(&vm.StatApplied), + } for i := range states { ss := &states[i]