Merge pull request #4142 from filecoin-project/steb/bs-sessions

use bitswap sessions when fetching messages, and cancel them
This commit is contained in:
Łukasz Magiera 2020-10-05 11:33:21 +02:00 committed by GitHub
commit 0162a485d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -43,7 +43,11 @@ var log = logging.Logger("sub")
var ErrSoftFailure = errors.New("soft validation failure") var ErrSoftFailure = errors.New("soft validation failure")
var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bserv bserv.BlockService, cmgr connmgr.ConnManager) { func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
for { for {
msg, err := bsub.Next(ctx) msg, err := bsub.Next(ctx)
if err != nil { if err != nil {
@ -64,15 +68,22 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
src := msg.GetFrom() src := msg.GetFrom()
go func() { go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := bserv.NewSession(ctx, bs)
start := build.Clock.Now() start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub") log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := FetchMessagesByCids(context.TODO(), bserv, blk.BlsMessages) bmsgs, err := FetchMessagesByCids(ctx, ses, blk.BlsMessages)
if err != nil { if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src) log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; source: %s", err, src)
return return
} }
smsgs, err := FetchSignedMessagesByCids(context.TODO(), bserv, blk.SecpkMessages) smsgs, err := FetchSignedMessagesByCids(ctx, ses, blk.SecpkMessages)
if err != nil { if err != nil {
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src) log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; source: %s", err, src)
return return
@ -97,7 +108,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
func FetchMessagesByCids( func FetchMessagesByCids(
ctx context.Context, ctx context.Context,
bserv bserv.BlockService, bserv bserv.BlockGetter,
cids []cid.Cid, cids []cid.Cid,
) ([]*types.Message, error) { ) ([]*types.Message, error) {
out := make([]*types.Message, len(cids)) out := make([]*types.Message, len(cids))
@ -126,7 +137,7 @@ func FetchMessagesByCids(
// FIXME: Duplicate of above. // FIXME: Duplicate of above.
func FetchSignedMessagesByCids( func FetchSignedMessagesByCids(
ctx context.Context, ctx context.Context,
bserv bserv.BlockService, bserv bserv.BlockGetter,
cids []cid.Cid, cids []cid.Cid,
) ([]*types.SignedMessage, error) { ) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids)) out := make([]*types.SignedMessage, len(cids))
@ -156,12 +167,11 @@ func FetchSignedMessagesByCids(
// blocks we did not request. // blocks we did not request.
func fetchCids( func fetchCids(
ctx context.Context, ctx context.Context,
bserv bserv.BlockService, bserv bserv.BlockGetter,
cids []cid.Cid, cids []cid.Cid,
cb func(int, blocks.Block) error, cb func(int, blocks.Block) error,
) error { ) error {
// FIXME: Why don't we use the context here? fetchedBlocks := bserv.GetBlocks(ctx, cids)
fetchedBlocks := bserv.GetBlocks(context.TODO(), cids)
cidIndex := make(map[cid.Cid]int) cidIndex := make(map[cid.Cid]int)
for i, c := range cids { for i, c := range cids {