Merge pull request #323 from filecoin-project/feat/tracing-1
add some more tracing to chain sync
This commit is contained in:
commit
177a876cf2
@ -20,7 +20,7 @@ import (
|
||||
)
|
||||
|
||||
func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wallet, miner address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, timestamp uint64) (*types.FullBlock, error) {
|
||||
st, recpts, err := sm.TipSetState(parents)
|
||||
st, recpts, err := sm.TipSetState(ctx, parents)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to load tipset state")
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
hamt "github.com/ipfs/go-hamt-ipld"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
var log = logging.Logger("statemgr")
|
||||
@ -45,8 +46,7 @@ func cidsToKey(cids []cid.Cid) string {
|
||||
return out
|
||||
}
|
||||
|
||||
func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error) {
|
||||
ctx := context.TODO()
|
||||
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
|
||||
|
||||
ck := cidsToKey(ts.Cids())
|
||||
sm.stlk.Lock()
|
||||
@ -76,6 +76,9 @@ func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error)
|
||||
}
|
||||
|
||||
func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.BlockHeader, cb func(cid.Cid, *types.Message, *vm.ApplyRet) error) (cid.Cid, cid.Cid, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "computeTipSetState")
|
||||
defer span.End()
|
||||
|
||||
pstate := blks[0].ParentStateRoot
|
||||
|
||||
cids := make([]cid.Cid, len(blks))
|
||||
@ -248,7 +251,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
|
||||
ts = sm.cs.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
st, _, err := sm.TipSetState(ts)
|
||||
st, _, err := sm.TipSetState(ctx, ts)
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("resolve address failed to get tipset state: %w", err)
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
@ -87,6 +88,7 @@ const BootstrapPeerThreshold = 1
|
||||
// This should be called when connecting to new peers, and additionally
|
||||
// when receiving new blocks from the network
|
||||
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
ctx := context.Background()
|
||||
if fts == nil {
|
||||
panic("bad")
|
||||
}
|
||||
@ -102,7 +104,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
// TODO: this is kindof a hack...
|
||||
log.Info("got block from ourselves")
|
||||
|
||||
if err := syncer.Sync(fts.TipSet()); err != nil {
|
||||
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
|
||||
log.Errorf("failed to sync our own block %s: %+v", fts.TipSet().Cids(), err)
|
||||
}
|
||||
|
||||
@ -114,7 +116,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
||||
syncer.Bsync.AddPeer(from)
|
||||
|
||||
go func() {
|
||||
if err := syncer.Sync(fts.TipSet()); err != nil {
|
||||
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
|
||||
log.Errorf("sync error: %+v", err)
|
||||
}
|
||||
}()
|
||||
@ -327,9 +329,10 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
|
||||
return fts, nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
|
||||
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
||||
defer span.End()
|
||||
|
||||
ctx := context.TODO()
|
||||
syncer.syncLock.Lock()
|
||||
defer syncer.syncLock.Unlock()
|
||||
|
||||
@ -420,6 +423,7 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
|
||||
|
||||
// Should match up with 'Semantical Validation' in validation.md in the spec
|
||||
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error {
|
||||
|
||||
h := b.Header
|
||||
|
||||
baseTs, err := syncer.store.LoadTipSet(h.Parents)
|
||||
@ -427,7 +431,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
|
||||
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
|
||||
}
|
||||
|
||||
stateroot, precp, err := syncer.sm.TipSetState(baseTs)
|
||||
stateroot, precp, err := syncer.sm.TipSetState(ctx, baseTs)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err)
|
||||
}
|
||||
@ -720,11 +724,11 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
|
||||
return nil, xerrors.Errorf("fork was longer than our threshold")
|
||||
}
|
||||
|
||||
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
|
||||
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
|
||||
syncer.syncState.SetHeight(0)
|
||||
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
|
||||
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
|
||||
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
|
||||
if err := syncer.ValidateTipSet(ctx, fts); err != nil {
|
||||
log.Errorf("failed to validate tipset: %+v", err)
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
@ -847,7 +851,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
||||
|
||||
syncer.syncState.SetStage(api.StageMessages)
|
||||
|
||||
if err := syncer.syncMessagesAndCheckState(headers); err != nil {
|
||||
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
|
||||
return xerrors.Errorf("collectChain syncMessages: %w", err)
|
||||
}
|
||||
|
||||
|
@ -430,6 +430,13 @@ func checkMessage(msg *types.Message) error {
|
||||
func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
|
||||
defer span.End()
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("to", msg.To.String()),
|
||||
trace.Int64Attribute("method", int64(msg.Method)),
|
||||
trace.StringAttribute("value", msg.Value.String()),
|
||||
)
|
||||
}
|
||||
|
||||
if err := checkMessage(msg); err != nil {
|
||||
return nil, err
|
||||
@ -605,6 +612,7 @@ func (vm *VM) SetBlockHeight(h uint64) {
|
||||
func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params []byte) ([]byte, aerrors.ActorError) {
|
||||
ctx, span := trace.StartSpan(vmctx.ctx, "vm.Invoke")
|
||||
defer span.End()
|
||||
|
||||
var oldCtx context.Context
|
||||
oldCtx, vmctx.ctx = vmctx.ctx, ctx
|
||||
defer func() {
|
||||
|
@ -129,12 +129,12 @@ func (a *StateAPI) StateReplay(ctx context.Context, ts *types.TipSet, mc cid.Cid
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) {
|
||||
func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) {
|
||||
if ts == nil {
|
||||
ts = a.Chain.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
st, _, err := a.StateManager.TipSetState(ts)
|
||||
st, _, err := a.StateManager.TipSetState(ctx, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -145,7 +145,7 @@ func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) {
|
||||
}
|
||||
|
||||
func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||
state, err := a.stateForTs(ts)
|
||||
state, err := a.stateForTs(ctx, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -154,7 +154,7 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts
|
||||
}
|
||||
|
||||
func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) {
|
||||
state, err := a.stateForTs(ts)
|
||||
state, err := a.stateForTs(ctx, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user