package bcast import ( "context" "encoding/binary" "fmt" "sync" "time" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" ) // TODO: Take const out of here and make them build params. const ( DELAY = 6 * time.Second GC_SANITY_CHECK = 5 GC_LOOKBACK = 2 ) type blksInfo struct { ctx context.Context cancel context.CancelFunc blks []cid.Cid } type bcastDict struct { // thread-safe map impl for the dictionary // sync.Map accepts `any` as keys and values. // To make it type safe and only support the right // types we use this auxiliary type. m *sync.Map } func (bd *bcastDict) load(key multihash.Multihash) (*blksInfo, bool) { v, ok := bd.m.Load(key.String()) if !ok { return nil, ok } return v.(*blksInfo), ok } func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) { bd.m.Store(key.String(), d) } type ConsistentBCast struct { lk sync.Mutex delay time.Duration // FIXME: Make this a slice??? Less storage but needs indexing logic. m map[abi.ChainEpoch]*bcastDict } func newBcastDict() *bcastDict { return &bcastDict{new(sync.Map)} } // TODO: What if the VRFProof is already small?? We donĀ“t need the CID. Useless computation. func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { k := make([]byte, len(bh.Ticket.VRFProof)) copy(k, bh.Ticket.VRFProof) binary.PutVarint(k, int64(bh.Height)) return multihash.Sum(k, multihash.SHA2_256, -1) } func NewConsistentBCast(delay time.Duration) *ConsistentBCast { return &ConsistentBCast{ delay: delay, m: make(map[abi.ChainEpoch]*bcastDict), } } func cidExists(cids []cid.Cid, c cid.Cid) bool { for _, v := range cids { if v == c { return true } } return false } func (bInfo *blksInfo) eqErr() error { bInfo.cancel() return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen") } func (cb *ConsistentBCast) Len() int { return len(cb.m) } func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error { cb.lk.Lock() bcastDict, ok := cb.m[blk.Header.Height] if !ok { bcastDict = newBcastDict() cb.m[blk.Header.Height] = bcastDict } cb.lk.Unlock() key, err := BCastKey(blk.Header) if err != nil { return err } blkCid := blk.Cid() bInfo, ok := bcastDict.load(key) if ok { if len(bInfo.blks) > 1 { return bInfo.eqErr() } if !cidExists(bInfo.blks, blkCid) { bInfo.blks = append(bInfo.blks, blkCid) return bInfo.eqErr() } return nil } ctx, cancel := context.WithTimeout(ctx, cb.delay) bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) return nil } func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { bcastDict := cb.m[bh.Height] key, err := BCastKey(bh) if err != nil { return err } bInfo, ok := bcastDict.load(key) if !ok { return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) } // Wait for the timeout <-bInfo.ctx.Done() if len(bInfo.blks) > 1 { return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) } return nil } func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { cb.lk.Lock() defer cb.lk.Unlock() // keep currEpoch-2 and delete a few more in the past // as a sanity-check // Garbage collection is triggered before block delivery, // and we use the sanity-check in case there were a few rounds // without delivery, and the garbage collection wasn't triggered // for a few epochs. for i := 0; i < GC_SANITY_CHECK; i++ { if currEpoch > GC_LOOKBACK { delete(cb.m, currEpoch-abi.ChainEpoch(GC_LOOKBACK+i)) } } }