682ddf6ffa
Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com>
198 lines
5.6 KiB
Go
198 lines
5.6 KiB
Go
package bcast
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
var log = logging.Logger("sub-cb")
|
|
|
|
const (
|
|
// GcSanityCheck determines the number of epochs in the past
|
|
// that will be garbage collected from the current epoch.
|
|
GcSanityCheck = 100
|
|
// GcLookback determines the number of epochs kept in the consistent
|
|
// broadcast cache.
|
|
GcLookback = 5
|
|
// GcDeepCheck determines the number of epochs in the past that we
|
|
// we try cleaning in the deep garbage collection round.
|
|
GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch
|
|
// GcDeepInterval determines after the number of epochs for which
|
|
// we are going to start a deeper garbage collection round.
|
|
GcDeepInterval = 1000
|
|
)
|
|
|
|
type blksInfo struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
blks []cid.Cid
|
|
}
|
|
|
|
type bcastDict struct {
|
|
m map[string]*blksInfo
|
|
}
|
|
|
|
func (bd *bcastDict) load(key []byte) (*blksInfo, bool) {
|
|
v, ok := bd.m[string(key)]
|
|
if !ok {
|
|
return nil, ok
|
|
}
|
|
return v, ok
|
|
}
|
|
|
|
func (bd *bcastDict) blkLen(key []byte) int {
|
|
return len(bd.m[string(key)].blks)
|
|
}
|
|
|
|
func (bd *bcastDict) store(key []byte, d *blksInfo) {
|
|
bd.m[string(key)] = d
|
|
}
|
|
|
|
// ConsistentBCast tracks recent information about the
|
|
// blocks and tickets received at different epochs
|
|
type ConsistentBCast struct {
|
|
lk sync.RWMutex
|
|
delay time.Duration
|
|
m map[abi.ChainEpoch]*bcastDict
|
|
lastDeepGc abi.ChainEpoch
|
|
}
|
|
|
|
func newBcastDict() *bcastDict {
|
|
return &bcastDict{m: make(map[string]*blksInfo)}
|
|
}
|
|
|
|
func BCastKey(bh *types.BlockHeader) []byte {
|
|
return bh.Ticket.VRFProof
|
|
}
|
|
|
|
func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
|
|
return &ConsistentBCast{
|
|
delay: delay,
|
|
m: make(map[abi.ChainEpoch]*bcastDict),
|
|
}
|
|
}
|
|
|
|
func cidExists(cids []cid.Cid, c cid.Cid) bool {
|
|
for _, v := range cids {
|
|
if v == c {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (bInfo *blksInfo) eqErr() error {
|
|
bInfo.cancel()
|
|
return xerrors.Errorf("different blocks with the same ticket already seen")
|
|
}
|
|
|
|
func (cb *ConsistentBCast) Len() int {
|
|
cb.lk.RLock()
|
|
defer cb.lk.RUnlock()
|
|
return len(cb.m)
|
|
}
|
|
|
|
// RcvBlock is called every time a new block is received through the network.
|
|
//
|
|
// This function keeps track of all the blocks with a specific VRFProof received
|
|
// for the same height. Every time a new block with a VRFProof not seen at certain
|
|
// height is received, a new timer is triggered to wait for the delay time determined by
|
|
// the consistent broadcast before informing the syncer. During this time, if a new
|
|
// block with the same VRFProof for that height is received, it means a miner is
|
|
// trying to equivocate, and both blocks are discarded.
|
|
//
|
|
// The delay time should be set to a value high enough to allow any block sent for
|
|
// certain epoch to be propagated to a large amount of miners in the network.
|
|
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
|
|
cb.lk.Lock()
|
|
defer cb.lk.Unlock()
|
|
bcastDict, ok := cb.m[blk.Header.Height]
|
|
if !ok {
|
|
bcastDict = newBcastDict()
|
|
cb.m[blk.Header.Height] = bcastDict
|
|
}
|
|
|
|
key := BCastKey(blk.Header)
|
|
blkCid := blk.Cid()
|
|
|
|
bInfo, ok := bcastDict.load(key)
|
|
if ok {
|
|
if len(bInfo.blks) > 1 {
|
|
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
|
|
return
|
|
}
|
|
|
|
if !cidExists(bInfo.blks, blkCid) {
|
|
bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)})
|
|
// By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for
|
|
// the epoch-ticket combination making them to fail and not be sent to the syncer, as
|
|
// a potential equivocation is detected.
|
|
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, cb.delay)
|
|
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
|
|
}
|
|
|
|
// WaitForDelivery is called before informing the syncer about a new block
|
|
// to check if the consistent broadcast delay triggered or if the block should
|
|
// be held off for a bit more time.
|
|
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
|
|
cb.lk.RLock()
|
|
bcastDict := cb.m[bh.Height]
|
|
key := BCastKey(bh)
|
|
bInfo, ok := bcastDict.load(key)
|
|
cb.lk.RUnlock()
|
|
if !ok {
|
|
return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
|
|
}
|
|
|
|
// Wait for the timeout
|
|
<-bInfo.ctx.Done()
|
|
if bcastDict.blkLen(key) > 1 {
|
|
return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GarbageCollect cleans the consistent broadcast cache periodically.
|
|
//
|
|
// A light garbage collection is triggered before every block delivery
|
|
// while a deeper one is triggered once every GcDeepCheck to ensure
|
|
// that nothing was left behind.
|
|
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
|
|
cb.lk.Lock()
|
|
defer cb.lk.Unlock()
|
|
|
|
// perform a deeper sanity check every now and then
|
|
gcRange := GcSanityCheck
|
|
if cb.lastDeepGc+GcDeepInterval > currEpoch {
|
|
gcRange = GcDeepCheck
|
|
cb.lastDeepGc = currEpoch
|
|
}
|
|
|
|
// keep currEpoch-gcRange and delete a few more in the past
|
|
// as a sanity-check
|
|
// Garbage collection is triggered before block delivery,
|
|
// and we use the sanity-check in case there were a few rounds
|
|
// without delivery, and the garbage collection wasn't triggered
|
|
// for a few epochs.
|
|
for i := 0; i < gcRange; i++ {
|
|
if currEpoch > GcLookback {
|
|
delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i))
|
|
}
|
|
}
|
|
}
|