Make sync wait nicer

This commit is contained in:
Łukasz Magiera 2020-09-24 13:35:45 +02:00
parent bbcad52ce0
commit 15eddf0c96
5 changed files with 51 additions and 4 deletions

View File

@ -725,6 +725,8 @@ type ActiveSync struct {
type SyncState struct { type SyncState struct {
ActiveSyncs []ActiveSync ActiveSyncs []ActiveSync
VMApplied uint64
} }
type SyncStateStage int type SyncStateStage int

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"reflect" "reflect"
"sync/atomic"
"time" "time"
block "github.com/ipfs/go-block-format" block "github.com/ipfs/go-block-format"
@ -40,6 +41,12 @@ var log = logging.Logger("vm")
var actorLog = logging.Logger("actors") var actorLog = logging.Logger("actors")
var gasOnActorExec = newGasCharge("OnActorExec", 0, 0) var gasOnActorExec = newGasCharge("OnActorExec", 0, 0)
// stat counters
var (
StatSends uint64
StatApplied uint64
)
// ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`. // ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`.
func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) { func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) {
if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 { if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 {
@ -204,6 +211,8 @@ type ApplyRet struct {
func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime, func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime,
gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) { gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) {
defer atomic.AddUint64(&StatSends, 1)
st := vm.cstate st := vm.cstate
origin := msg.From origin := msg.From
@ -312,6 +321,7 @@ func checkMessage(msg *types.Message) error {
func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
start := build.Clock.Now() start := build.Clock.Now()
defer atomic.AddUint64(&StatApplied, 1)
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start) ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
rt.finilizeGasTracing() rt.finilizeGasTracing()
return &ApplyRet{ return &ApplyRet{
@ -331,6 +341,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet,
start := build.Clock.Now() start := build.Clock.Now()
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage") ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End() defer span.End()
defer atomic.AddUint64(&StatApplied, 1)
msg := cmsg.VMMessage() msg := cmsg.VMMessage()
if span.IsRecordingEvents() { if span.IsRecordingEvents() {
span.AddAttributes( span.AddAttributes(

View File

@ -225,6 +225,16 @@ var syncCheckpointCmd = &cli.Command{
} }
func SyncWait(ctx context.Context, napi api.FullNode) error { func SyncWait(ctx context.Context, napi api.FullNode) error {
tick := time.Second / 4
lastLines := 0
ticker := time.NewTicker(tick)
defer ticker.Stop()
samples := 8
i := 0
var app, lastApp uint64
for { for {
state, err := napi.SyncState(ctx) state, err := napi.SyncState(ctx)
if err != nil { if err != nil {
@ -266,7 +276,24 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
heightDiff = 0 heightDiff = 0
} }
fmt.Printf("\r\x1b[2KWorker %d: Base Height: %d\tTarget Height: %d\t Height diff: %d\tTarget: %s\tState: %s\tHeight: %d", working, baseHeight, theight, heightDiff, target, ss.Stage, ss.Height) for i := 0; i < lastLines; i++ {
fmt.Print("\r\x1b[2K\x1b[A")
}
fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff)
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height)
lastLines = 2
if i%samples == 0 {
lastApp = app
app = state.VMApplied
}
if i > 0 {
fmt.Printf("Validated %d messages (%d per second)\n", state.VMApplied, (app-lastApp)*uint64(time.Second/tick)/uint64(samples))
lastLines++
}
_ = target // todo: maybe print? (creates a bunch of line wrapping issues with most tipsets)
if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) { if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) {
fmt.Println("\nDone!") fmt.Println("\nDone!")
@ -277,7 +304,9 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
case <-ctx.Done(): case <-ctx.Done():
fmt.Println("\nExit by user") fmt.Println("\nExit by user")
return nil return nil
case <-build.Clock.After(1 * time.Second): case <-ticker.C:
} }
i++
} }
} }

View File

@ -4303,7 +4303,8 @@ Inputs: `null`
Response: Response:
```json ```json
{ {
"ActiveSyncs": null "ActiveSyncs": null,
"VMApplied": 42
} }
``` ```

View File

@ -2,6 +2,7 @@ package full
import ( import (
"context" "context"
"sync/atomic"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -13,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
@ -28,7 +30,9 @@ type SyncAPI struct {
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) { func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
states := a.Syncer.State() states := a.Syncer.State()
out := &api.SyncState{} out := &api.SyncState{
VMApplied: atomic.LoadUint64(&vm.StatApplied),
}
for i := range states { for i := range states {
ss := &states[i] ss := &states[i]