diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index c6e0c8b80..0c72b4d80 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -43,7 +43,11 @@ var log = logging.Logger("sub") var ErrSoftFailure = errors.New("soft validation failure") var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") -func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) { +func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { + // Timeout after (block time + propagation delay). This is useless at + // this point. + timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second + for { msg, err := bsub.Next(ctx) if err != nil { @@ -64,15 +68,22 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() go func() { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // NOTE: we could also share a single session between + // all requests but that may have other consequences. + ses := bserv.NewSession(ctx, bs) + start := build.Clock.Now() log.Debug("about to fetch messages for block from pubsub") - bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages) + bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages) if err != nil { log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src) return } - smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages) + smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages) if err != nil { log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src) return @@ -97,7 +108,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha func FetchMessagesByCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, ) ([]*types.Message, error) { out := make([]*types.Message, len(cids)) @@ -126,7 +137,7 @@ func FetchMessagesByCids( // FIXME: Duplicate of above. func FetchSignedMessagesByCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, ) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, len(cids)) @@ -156,12 +167,11 @@ func FetchSignedMessagesByCids( // blocks we did not request. func fetchCids( ctx context.Context, - bserv bserv.BlockService, + bserv bserv.BlockGetter, cids []cid.Cid, cb func(int, blocks.Block) error, ) error { - // FIXME: Why don't we use the context here? - fetchedBlocks := bserv.GetBlocks(context.TODO(), cids) + fetchedBlocks := bserv.GetBlocks(ctx, cids) cidIndex := make(map[cid.Cid]int) for i, c := range cids {