Merge pull request #349 from filecoin-project/feat/tracing-2
add more tracing spans
This commit is contained in:
commit
068f788d7a
@ -10,6 +10,7 @@ import (
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/store"
|
||||
@ -86,6 +87,9 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
|
||||
}
|
||||
|
||||
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
||||
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
|
||||
defer span.End()
|
||||
|
||||
defer s.Close()
|
||||
|
||||
var req BlockSyncRequest
|
||||
@ -95,7 +99,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
||||
}
|
||||
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
|
||||
|
||||
resp, err := bss.processRequest(&req)
|
||||
resp, err := bss.processRequest(ctx, &req)
|
||||
if err != nil {
|
||||
log.Error("failed to process block sync request: ", err)
|
||||
return
|
||||
@ -107,7 +111,10 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
|
||||
defer span.End()
|
||||
|
||||
opts := ParseBSOptions(req.Options)
|
||||
if len(req.Start) == 0 {
|
||||
return &BlockSyncResponse{
|
||||
@ -116,6 +123,11 @@ func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncRe
|
||||
}, nil
|
||||
}
|
||||
|
||||
span.AddAttributes(
|
||||
trace.BoolAttribute("blocks", opts.IncludeBlocks),
|
||||
trace.BoolAttribute("messages", opts.IncludeMessages),
|
||||
)
|
||||
|
||||
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
|
||||
if err != nil {
|
||||
log.Error("encountered error while responding to block sync request: ", err)
|
||||
@ -253,6 +265,15 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
|
||||
defer span.End()
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("tipset", fmt.Sprint(tipset)),
|
||||
trace.Int64Attribute("count", int64(count)),
|
||||
)
|
||||
}
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
@ -321,6 +342,9 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
|
||||
defer span.End()
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
|
@ -47,12 +47,15 @@ func cidsToKey(cids []cid.Cid) string {
|
||||
}
|
||||
|
||||
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "tipSetState")
|
||||
defer span.End()
|
||||
|
||||
ck := cidsToKey(ts.Cids())
|
||||
sm.stlk.Lock()
|
||||
cached, ok := sm.stCache[ck]
|
||||
sm.stlk.Unlock()
|
||||
if ok {
|
||||
span.AddAttributes(trace.BoolAttribute("cache", true))
|
||||
return cached[0], cached[1], nil
|
||||
}
|
||||
|
||||
|
@ -352,6 +352,9 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||
}
|
||||
|
||||
func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) error {
|
||||
ctx, span := trace.StartSpan(ctx, "validateTipSet")
|
||||
defer span.End()
|
||||
|
||||
ts := fts.TipSet()
|
||||
if ts.Equals(syncer.Genesis) {
|
||||
return nil
|
||||
@ -423,6 +426,8 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
|
||||
|
||||
// Should match up with 'Semantical Validation' in validation.md in the spec
|
||||
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error {
|
||||
ctx, span := trace.StartSpan(ctx, "validateBlock")
|
||||
defer span.End()
|
||||
|
||||
h := b.Header
|
||||
|
||||
@ -606,6 +611,14 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu
|
||||
}
|
||||
|
||||
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "collectHeaders")
|
||||
defer span.End()
|
||||
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute("fromHeight", int64(from.Height())),
|
||||
trace.Int64Attribute("toHeight", int64(to.Height())),
|
||||
)
|
||||
|
||||
blockSet := []*types.TipSet{from}
|
||||
|
||||
at := from.Parents()
|
||||
@ -647,7 +660,7 @@ loop:
|
||||
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
|
||||
window = gap
|
||||
}
|
||||
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, window)
|
||||
blks, err := syncer.Bsync.GetBlocks(ctx, at, window)
|
||||
if err != nil {
|
||||
// Most likely our peers aren't fully synced yet, but forwarded
|
||||
// new block message (ideally we'd find better peers)
|
||||
@ -726,7 +739,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
|
||||
|
||||
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
|
||||
syncer.syncState.SetHeight(0)
|
||||
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
|
||||
return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error {
|
||||
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
|
||||
if err := syncer.ValidateTipSet(ctx, fts); err != nil {
|
||||
log.Errorf("failed to validate tipset: %+v", err)
|
||||
@ -740,7 +753,10 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
|
||||
}
|
||||
|
||||
// fills out each of the given tipsets with messages and calls the callback with it
|
||||
func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error {
|
||||
func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipSet, cb func(context.Context, *store.FullTipSet) error) error {
|
||||
ctx, span := trace.StartSpan(ctx, "iterFullTipsets")
|
||||
defer span.End()
|
||||
|
||||
beg := len(headers) - 1
|
||||
// handle case where we have a prefix of these locally
|
||||
for ; beg >= 0; beg-- {
|
||||
@ -751,7 +767,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
|
||||
if fts == nil {
|
||||
break
|
||||
}
|
||||
if err := cb(fts); err != nil {
|
||||
if err := cb(ctx, fts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -767,7 +783,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
|
||||
}
|
||||
|
||||
next := headers[i-batchSize]
|
||||
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1))
|
||||
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
@ -788,7 +804,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
if err := cb(fts); err != nil {
|
||||
if err := cb(ctx, fts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -828,6 +844,9 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
|
||||
}
|
||||
|
||||
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
|
||||
ctx, span := trace.StartSpan(ctx, "collectChain")
|
||||
defer span.End()
|
||||
|
||||
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
|
||||
|
||||
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
|
||||
|
@ -538,6 +538,9 @@ func (vm *VM) ActorBalance(addr address.Address) (types.BigInt, aerrors.ActorErr
|
||||
}
|
||||
|
||||
func (vm *VM) Flush(ctx context.Context) (cid.Cid, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "vm.Flush")
|
||||
defer span.End()
|
||||
|
||||
from := dag.NewDAGService(bserv.New(vm.buf, nil))
|
||||
to := dag.NewDAGService(bserv.New(vm.buf.Read(), nil))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user