add some more tracing to chain sync

This commit is contained in:
whyrusleeping 2019-10-10 20:13:26 +09:00
parent fef544f88e
commit 3f342d7ae2
5 changed files with 31 additions and 16 deletions

View File

@ -20,7 +20,7 @@ import (
)
func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wallet, miner address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, timestamp uint64) (*types.FullBlock, error) {
st, recpts, err := sm.TipSetState(parents)
st, recpts, err := sm.TipSetState(ctx, parents)
if err != nil {
return nil, errors.Wrap(err, "failed to load tipset state")
}

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-cid"
hamt "github.com/ipfs/go-hamt-ipld"
logging "github.com/ipfs/go-log"
"go.opencensus.io/trace"
)
var log = logging.Logger("statemgr")
@ -45,8 +46,7 @@ func cidsToKey(cids []cid.Cid) string {
return out
}
func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error) {
ctx := context.TODO()
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
ck := cidsToKey(ts.Cids())
sm.stlk.Lock()
@ -76,6 +76,9 @@ func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error)
}
func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.BlockHeader, cb func(cid.Cid, *types.Message, *vm.ApplyRet) error) (cid.Cid, cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "computeTipSetState")
defer span.End()
pstate := blks[0].ParentStateRoot
cids := make([]cid.Cid, len(blks))
@ -248,7 +251,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
ts = sm.cs.GetHeaviestTipSet()
}
st, _, err := sm.TipSetState(ts)
st, _, err := sm.TipSetState(ctx, ts)
if err != nil {
return address.Undef, xerrors.Errorf("resolve address failed to get tipset state: %w", err)
}

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"go.opencensus.io/trace"
amt "github.com/filecoin-project/go-amt-ipld"
"github.com/ipfs/go-cid"
@ -87,6 +88,7 @@ const BootstrapPeerThreshold = 1
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
ctx := context.Background()
if fts == nil {
panic("bad")
}
@ -102,7 +104,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
// TODO: this is kindof a hack...
log.Info("got block from ourselves")
if err := syncer.Sync(fts.TipSet()); err != nil {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("failed to sync our own block %s: %+v", fts.TipSet().Cids(), err)
}
@ -114,7 +116,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.Bsync.AddPeer(from)
go func() {
if err := syncer.Sync(fts.TipSet()); err != nil {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("sync error: %+v", err)
}
}()
@ -327,9 +329,10 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
return fts, nil
}
func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()
ctx := context.TODO()
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
@ -420,6 +423,7 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
// Should match up with 'Semantical Validation' in validation.md in the spec
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error {
h := b.Header
baseTs, err := syncer.store.LoadTipSet(h.Parents)
@ -427,7 +431,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
}
stateroot, precp, err := syncer.sm.TipSetState(baseTs)
stateroot, precp, err := syncer.sm.TipSetState(ctx, baseTs)
if err != nil {
return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err)
}
@ -720,11 +724,11 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
return nil, xerrors.Errorf("fork was longer than our threshold")
}
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
if err := syncer.ValidateTipSet(ctx, fts); err != nil {
log.Errorf("failed to validate tipset: %+v", err)
return xerrors.Errorf("message processing failed: %w", err)
}
@ -847,7 +851,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
syncer.syncState.SetStage(api.StageMessages)
if err := syncer.syncMessagesAndCheckState(headers); err != nil {
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
return xerrors.Errorf("collectChain syncMessages: %w", err)
}

View File

@ -430,6 +430,13 @@ func checkMessage(msg *types.Message) error {
func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("to", msg.To.String()),
trace.Int64Attribute("method", int64(msg.Method)),
trace.StringAttribute("value", msg.Value.String()),
)
}
if err := checkMessage(msg); err != nil {
return nil, err
@ -605,6 +612,7 @@ func (vm *VM) SetBlockHeight(h uint64) {
func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params []byte) ([]byte, aerrors.ActorError) {
ctx, span := trace.StartSpan(vmctx.ctx, "vm.Invoke")
defer span.End()
var oldCtx context.Context
oldCtx, vmctx.ctx = vmctx.ctx, ctx
defer func() {

View File

@ -129,12 +129,12 @@ func (a *StateAPI) StateReplay(ctx context.Context, ts *types.TipSet, mc cid.Cid
}, nil
}
func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) {
func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) {
if ts == nil {
ts = a.Chain.GetHeaviestTipSet()
}
st, _, err := a.StateManager.TipSetState(ts)
st, _, err := a.StateManager.TipSetState(ctx, ts)
if err != nil {
return nil, err
}
@ -145,7 +145,7 @@ func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) {
}
func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) {
state, err := a.stateForTs(ts)
state, err := a.stateForTs(ctx, ts)
if err != nil {
return nil, err
}
@ -154,7 +154,7 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts
}
func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) {
state, err := a.stateForTs(ts)
state, err := a.stateForTs(ctx, ts)
if err != nil {
return nil, err
}