diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 22a0db160..89708432c 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -2,14 +2,12 @@ package bcast import ( "context" - "encoding/binary" - "fmt" "sync" "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-multihash" + "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -24,7 +22,7 @@ const ( GcSanityCheck = 5 // GcLookback determines the number of epochs kept in the consistent // broadcast cache. - GcLookback = 2 + GcLookback = 1000 ) type blksInfo struct { @@ -41,20 +39,20 @@ type bcastDict struct { m *sync.Map } -func (bd *bcastDict) load(key multihash.Multihash) (*blksInfo, bool) { - v, ok := bd.m.Load(key.String()) +func (bd *bcastDict) load(key []byte) (*blksInfo, bool) { + v, ok := bd.m.Load(key) if !ok { return nil, ok } return v.(*blksInfo), ok } -func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) { - bd.m.Store(key.String(), d) +func (bd *bcastDict) store(key []byte, d *blksInfo) { + bd.m.Store(key, d) } -func (bd *bcastDict) blkLen(key multihash.Multihash) int { - v, ok := bd.m.Load(key.String()) +func (bd *bcastDict) blkLen(key []byte) int { + v, ok := bd.m.Load(key) if !ok { return 0 } @@ -64,21 +62,15 @@ func (bd *bcastDict) blkLen(key multihash.Multihash) int { type ConsistentBCast struct { lk sync.RWMutex delay time.Duration - // FIXME: Make this a slice??? Less storage but needs indexing logic. - m map[abi.ChainEpoch]*bcastDict + m map[abi.ChainEpoch]*bcastDict } func newBcastDict() *bcastDict { return &bcastDict{new(sync.Map)} } -// TODO: the VRFProof may already be small enough so we may not need to use a hash here. -// we can maybe bypass the useless computation. -func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { - k := make([]byte, len(bh.Ticket.VRFProof)) - copy(k, bh.Ticket.VRFProof) - binary.PutVarint(k, int64(bh.Height)) - return multihash.Sum(k, multihash.SHA2_256, -1) +func BCastKey(bh *types.BlockHeader) []byte { + return bh.Ticket.VRFProof } func NewConsistentBCast(delay time.Duration) *ConsistentBCast { @@ -99,7 +91,7 @@ func cidExists(cids []cid.Cid, c cid.Cid) bool { func (bInfo *blksInfo) eqErr() error { bInfo.cancel() - return fmt.Errorf("different blocks with the same ticket already seen") + return xerrors.Errorf("different blocks with the same ticket already seen") } func (cb *ConsistentBCast) Len() int { @@ -108,6 +100,17 @@ func (cb *ConsistentBCast) Len() int { return len(cb.m) } +// RcvBlock is called every time a new block is received through the network. +// +// This function keeps track of all the blocks with a specific VRFProof received +// for the same height. Every time a new block with a VRFProof not seen at certain +// height is received, a new timer is triggered to wait for the delay time determined by +// the consistent broadcast before informing the syncer. During this time, if a new +// block with the same VRFProof for that height is received, it means a miner is +// trying to equivocate, and both blocks are discarded. +// +// The delay time should be set to a value high enough to allow any block sent for +// certain epoch to be propagated to a large amount of miners in the network. func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { cb.lk.Lock() bcastDict, ok := cb.m[blk.Header.Height] @@ -116,11 +119,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { cb.m[blk.Header.Height] = bcastDict } cb.lk.Unlock() - key, err := BCastKey(blk.Header) - if err != nil { - log.Errorf("couldn't hash blk info for height %d: %s", blk.Header.Height, err) - return - } + key := BCastKey(blk.Header) blkCid := blk.Cid() bInfo, ok := bcastDict.load(key) @@ -142,22 +141,22 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) } +// WaitForDelivery is called before informing the syncer about a new block +// to check if the consistent broadcast delay triggered or if the block should +// be held off for a bit more time. func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { cb.lk.RLock() bcastDict := cb.m[bh.Height] cb.lk.RUnlock() - key, err := BCastKey(bh) - if err != nil { - return err - } + key := BCastKey(bh) bInfo, ok := bcastDict.load(key) if !ok { - return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) + return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) } // Wait for the timeout <-bInfo.ctx.Done() if bcastDict.blkLen(key) > 1 { - return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) + return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) } return nil } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 6226e45d8..6436cc27d 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -113,13 +113,13 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p if src != self { log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height) if err := cb.WaitForDelivery(blk.Header); err != nil { - log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) + log.Errorf("not informing syncer about new block, potential equivocation detected (cid: %s, source: %s): %s; ", blk.Header.Cid(), src, err) return } } // Garbage collect the broadcast state cb.GarbageCollect(blk.Header.Height) - log.Debugf("Block in height %v delivered successfully (cid=)", blk.Header.Height, blk.Cid()) + log.Debugf("Block in height %v delivered successfully (cid=%s)", blk.Header.Height, blk.Cid()) if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header,