add more tracing spans

License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
whyrusleeping 2019-10-12 18:44:56 +09:00 committed by Jakub Sztandera
parent b90274bd0c
commit 45737f8a51
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
4 changed files with 62 additions and 8 deletions

View File

@ -10,6 +10,7 @@ import (
bserv "github.com/ipfs/go-blockservice"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/store"
@ -86,6 +87,9 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
}
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
defer span.End()
defer s.Close()
var req BlockSyncRequest
@ -95,7 +99,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
}
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
resp, err := bss.processRequest(&req)
resp, err := bss.processRequest(ctx, &req)
if err != nil {
log.Error("failed to process block sync request: ", err)
return
@ -107,7 +111,10 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
}
}
func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) {
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
defer span.End()
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
@ -116,6 +123,13 @@ func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncRe
}, nil
}
if span.IsRecordingEvents() {
span.AddAttributes(
trace.BoolAttribute("blocks", opts.IncludeBlocks),
trace.BoolAttribute("messages", opts.IncludeMessages),
)
}
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
if err != nil {
log.Error("encountered error while responding to block sync request: ", err)
@ -253,6 +267,15 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
}
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(tipset)),
trace.Int64Attribute("count", int64(count)),
)
}
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
@ -321,6 +344,9 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid)
}
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
defer span.End()
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error

View File

@ -47,12 +47,17 @@ func cidsToKey(cids []cid.Cid) string {
}
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "tipSetState")
defer span.End()
ck := cidsToKey(ts.Cids())
sm.stlk.Lock()
cached, ok := sm.stCache[ck]
sm.stlk.Unlock()
if ok {
if span.IsRecordingEvents() {
span.AddAttributes(trace.BoolAttribute("cache", true))
}
return cached[0], cached[1], nil
}

View File

@ -352,6 +352,9 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
}
func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) error {
ctx, span := trace.StartSpan(ctx, "validateTipSet")
defer span.End()
ts := fts.TipSet()
if ts.Equals(syncer.Genesis) {
return nil
@ -423,6 +426,8 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
// Should match up with 'Semantical Validation' in validation.md in the spec
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error {
ctx, span := trace.StartSpan(ctx, "validateBlock")
defer span.End()
h := b.Header
@ -606,6 +611,15 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu
}
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "collectHeaders")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.Int64Attribute("fromHeight", int64(from.Height())),
trace.Int64Attribute("toHeight", int64(to.Height())),
)
}
blockSet := []*types.TipSet{from}
at := from.Parents()
@ -647,7 +661,7 @@ loop:
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
window = gap
}
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, window)
blks, err := syncer.Bsync.GetBlocks(ctx, at, window)
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
@ -726,7 +740,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error {
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
if err := syncer.ValidateTipSet(ctx, fts); err != nil {
log.Errorf("failed to validate tipset: %+v", err)
@ -740,7 +754,10 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
}
// fills out each of the given tipsets with messages and calls the callback with it
func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error {
func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipSet, cb func(context.Context, *store.FullTipSet) error) error {
ctx, span := trace.StartSpan(ctx, "iterFullTipsets")
defer span.End()
beg := len(headers) - 1
// handle case where we have a prefix of these locally
for ; beg >= 0; beg-- {
@ -751,7 +768,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
if fts == nil {
break
}
if err := cb(fts); err != nil {
if err := cb(ctx, fts); err != nil {
return err
}
}
@ -767,7 +784,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
}
next := headers[i-batchSize]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1))
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1))
if err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}
@ -788,7 +805,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
return xerrors.Errorf("message processing failed: %w", err)
}
if err := cb(fts); err != nil {
if err := cb(ctx, fts); err != nil {
return err
}
@ -828,6 +845,9 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
}
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "collectChain")
defer span.End()
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())

View File

@ -538,6 +538,9 @@ func (vm *VM) ActorBalance(addr address.Address) (types.BigInt, aerrors.ActorErr
}
func (vm *VM) Flush(ctx context.Context) (cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "vm.Flush")
defer span.End()
from := dag.NewDAGService(bserv.New(vm.buf, nil))
to := dag.NewDAGService(bserv.New(vm.buf.Read(), nil))