diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index b69845476..cc600cb10 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -53,8 +53,16 @@ func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) { bd.m.Store(key.String(), d) } +func (bd *bcastDict) blkLen(key multihash.Multihash) int { + v, ok := bd.m.Load(key.String()) + if !ok { + return 0 + } + return len(v.(*blksInfo).blks) +} + type ConsistentBCast struct { - lk sync.Mutex + lk sync.RWMutex delay time.Duration // FIXME: Make this a slice??? Less storage but needs indexing logic. m map[abi.ChainEpoch]*bcastDict @@ -95,6 +103,8 @@ func (bInfo *blksInfo) eqErr() error { } func (cb *ConsistentBCast) Len() int { + cb.lk.RLock() + defer cb.lk.RUnlock() return len(cb.m) } @@ -121,7 +131,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { } if !cidExists(bInfo.blks, blkCid) { - bInfo.blks = append(bInfo.blks, blkCid) + bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)}) log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) return } @@ -133,7 +143,9 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { } func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { + cb.lk.RLock() bcastDict := cb.m[bh.Height] + cb.lk.RUnlock() key, err := BCastKey(bh) if err != nil { return err @@ -144,7 +156,7 @@ func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { } // Wait for the timeout <-bInfo.ctx.Done() - if len(bInfo.blks) > 1 { + if bcastDict.blkLen(key) > 1 { return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) } return nil