fixed bugs in consistent broadcast integration

This commit is contained in:
Alfonso de la Rocha 2022-09-14 19:59:29 +02:00
parent 1dadff303c
commit d1a4f1dc50
No known key found for this signature in database
GPG Key ID: 97DE48FDFC756FBD
3 changed files with 19 additions and 12 deletions

View File

@ -53,15 +53,16 @@ type ConsistentBCast struct {
m map[abi.ChainEpoch]*bcastDict m map[abi.ChainEpoch]*bcastDict
} }
func newBcastDict(delay time.Duration) *bcastDict { func newBcastDict() *bcastDict {
return &bcastDict{new(sync.Map)} return &bcastDict{new(sync.Map)}
} }
// TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation. // TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation.
func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) {
proof := bh.Ticket.VRFProof k := make([]byte, len(bh.Ticket.VRFProof))
binary.PutVarint(proof, int64(bh.Height)) copy(k, bh.Ticket.VRFProof)
return multihash.Sum(proof, multihash.SHA2_256, -1) binary.PutVarint(k, int64(bh.Height))
return multihash.Sum(k, multihash.SHA2_256, -1)
} }
func NewConsistentBCast(delay time.Duration) *ConsistentBCast { func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
@ -93,7 +94,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er
cb.lk.Lock() cb.lk.Lock()
bcastDict, ok := cb.m[blk.Header.Height] bcastDict, ok := cb.m[blk.Header.Height]
if !ok { if !ok {
bcastDict = newBcastDict(cb.delay) bcastDict = newBcastDict()
cb.m[blk.Header.Height] = bcastDict cb.m[blk.Header.Height] = bcastDict
} }
cb.lk.Unlock() cb.lk.Unlock()
@ -116,7 +117,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er
return nil return nil
} }
ctx, cancel := context.WithTimeout(ctx, cb.delay*time.Second) ctx, cancel := context.WithTimeout(ctx, cb.delay)
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
return nil return nil
} }

View File

@ -44,7 +44,7 @@ var msgCidPrefix = cid.Prefix{
MhLength: 32, MhLength: 32,
} }
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self peer.ID, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at // Timeout after (block time + propagation delay). This is useless at
// this point. // this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
@ -107,13 +107,19 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner) log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
} }
// When we propose a new block ourselves, the proposed block also gets here through SyncSubmitBlock.
// If we are the block proposers we don't need to wait for delivery, we know the blocks are
// honest.
if src != self {
log.Infof("Waiting for consistent broadcast of block in height: %v", blk.Header.Height)
if err := cb.WaitForDelivery(blk.Header); err != nil { if err := cb.WaitForDelivery(blk.Header); err != nil {
log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src)
return return
} }
}
// Garbage collect the broadcast state // Garbage collect the broadcast state
cb.GarbageCollect(blk.Header.Height) cb.GarbageCollect(blk.Header.Height)
log.Infof("Block in height %v delivered successfully", blk.Header.Height)
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header, Header: blk.Header,

View File

@ -166,7 +166,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx,
panic(err) panic(err)
} }
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) go sub.HandleIncomingBlocks(ctx, blocksub, h.ID(), s, bserv, h.ConnManager())
} }
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {