Merge pull request #3991 from filecoin-project/feat/nicer-syncwait
Make sync wait nicer
This commit is contained in:
commit
85caa48814
@ -725,6 +725,8 @@ type ActiveSync struct {
|
|||||||
|
|
||||||
type SyncState struct {
|
type SyncState struct {
|
||||||
ActiveSyncs []ActiveSync
|
ActiveSyncs []ActiveSync
|
||||||
|
|
||||||
|
VMApplied uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncStateStage int
|
type SyncStateStage int
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
block "github.com/ipfs/go-block-format"
|
block "github.com/ipfs/go-block-format"
|
||||||
@ -40,6 +41,12 @@ var log = logging.Logger("vm")
|
|||||||
var actorLog = logging.Logger("actors")
|
var actorLog = logging.Logger("actors")
|
||||||
var gasOnActorExec = newGasCharge("OnActorExec", 0, 0)
|
var gasOnActorExec = newGasCharge("OnActorExec", 0, 0)
|
||||||
|
|
||||||
|
// stat counters
|
||||||
|
var (
|
||||||
|
StatSends uint64
|
||||||
|
StatApplied uint64
|
||||||
|
)
|
||||||
|
|
||||||
// ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`.
|
// ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`.
|
||||||
func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) {
|
func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) {
|
||||||
if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 {
|
if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 {
|
||||||
@ -204,6 +211,8 @@ type ApplyRet struct {
|
|||||||
func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime,
|
func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime,
|
||||||
gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) {
|
gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) {
|
||||||
|
|
||||||
|
defer atomic.AddUint64(&StatSends, 1)
|
||||||
|
|
||||||
st := vm.cstate
|
st := vm.cstate
|
||||||
|
|
||||||
origin := msg.From
|
origin := msg.From
|
||||||
@ -312,6 +321,7 @@ func checkMessage(msg *types.Message) error {
|
|||||||
|
|
||||||
func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
|
func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
|
||||||
start := build.Clock.Now()
|
start := build.Clock.Now()
|
||||||
|
defer atomic.AddUint64(&StatApplied, 1)
|
||||||
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
|
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
|
||||||
rt.finilizeGasTracing()
|
rt.finilizeGasTracing()
|
||||||
return &ApplyRet{
|
return &ApplyRet{
|
||||||
@ -331,6 +341,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet,
|
|||||||
start := build.Clock.Now()
|
start := build.Clock.Now()
|
||||||
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
|
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
defer atomic.AddUint64(&StatApplied, 1)
|
||||||
msg := cmsg.VMMessage()
|
msg := cmsg.VMMessage()
|
||||||
if span.IsRecordingEvents() {
|
if span.IsRecordingEvents() {
|
||||||
span.AddAttributes(
|
span.AddAttributes(
|
||||||
|
35
cli/sync.go
35
cli/sync.go
@ -225,6 +225,16 @@ var syncCheckpointCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SyncWait(ctx context.Context, napi api.FullNode) error {
|
func SyncWait(ctx context.Context, napi api.FullNode) error {
|
||||||
|
tick := time.Second / 4
|
||||||
|
|
||||||
|
lastLines := 0
|
||||||
|
ticker := time.NewTicker(tick)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
samples := 8
|
||||||
|
i := 0
|
||||||
|
var app, lastApp uint64
|
||||||
|
|
||||||
for {
|
for {
|
||||||
state, err := napi.SyncState(ctx)
|
state, err := napi.SyncState(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -266,7 +276,24 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
|
|||||||
heightDiff = 0
|
heightDiff = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("\r\x1b[2KWorker %d: Base Height: %d\tTarget Height: %d\t Height diff: %d\tTarget: %s\tState: %s\tHeight: %d", working, baseHeight, theight, heightDiff, target, ss.Stage, ss.Height)
|
for i := 0; i < lastLines; i++ {
|
||||||
|
fmt.Print("\r\x1b[2K\x1b[A")
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff)
|
||||||
|
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height)
|
||||||
|
lastLines = 2
|
||||||
|
|
||||||
|
if i%samples == 0 {
|
||||||
|
lastApp = app
|
||||||
|
app = state.VMApplied
|
||||||
|
}
|
||||||
|
if i > 0 {
|
||||||
|
fmt.Printf("Validated %d messages (%d per second)\n", state.VMApplied, (app-lastApp)*uint64(time.Second/tick)/uint64(samples))
|
||||||
|
lastLines++
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = target // todo: maybe print? (creates a bunch of line wrapping issues with most tipsets)
|
||||||
|
|
||||||
if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) {
|
if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) {
|
||||||
fmt.Println("\nDone!")
|
fmt.Println("\nDone!")
|
||||||
@ -277,7 +304,9 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
fmt.Println("\nExit by user")
|
fmt.Println("\nExit by user")
|
||||||
return nil
|
return nil
|
||||||
case <-build.Clock.After(1 * time.Second):
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4303,7 +4303,8 @@ Inputs: `null`
|
|||||||
Response:
|
Response:
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"ActiveSyncs": null
|
"ActiveSyncs": null,
|
||||||
|
"VMApplied": 42
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package full
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
@ -13,6 +14,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,7 +30,9 @@ type SyncAPI struct {
|
|||||||
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
|
||||||
states := a.Syncer.State()
|
states := a.Syncer.State()
|
||||||
|
|
||||||
out := &api.SyncState{}
|
out := &api.SyncState{
|
||||||
|
VMApplied: atomic.LoadUint64(&vm.StatApplied),
|
||||||
|
}
|
||||||
|
|
||||||
for i := range states {
|
for i := range states {
|
||||||
ss := &states[i]
|
ss := &states[i]
|
||||||
|
Loading…
Reference in New Issue
Block a user