2022-09-12 17:03:06 +00:00
|
|
|
package bcast
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
|
|
"github.com/multiformats/go-multihash"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2022-12-13 10:13:07 +00:00
|
|
|
// GcSanityCheck determines the number of epochs that in the past
|
|
|
|
// that will be garbage collected from the current epoch.
|
|
|
|
GcSanityCheck = 5
|
|
|
|
// GcLookback determines the number of epochs kept in the consistent
|
|
|
|
// broadcast cache.
|
|
|
|
GcLookback = 2
|
2022-09-12 17:03:06 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type blksInfo struct {
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
blks []cid.Cid
|
|
|
|
}
|
|
|
|
|
|
|
|
type bcastDict struct {
|
2022-09-13 10:33:28 +00:00
|
|
|
// thread-safe map impl for the dictionary
|
|
|
|
// sync.Map accepts `any` as keys and values.
|
|
|
|
// To make it type safe and only support the right
|
|
|
|
// types we use this auxiliary type.
|
|
|
|
m *sync.Map
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bd *bcastDict) load(key multihash.Multihash) (*blksInfo, bool) {
|
|
|
|
v, ok := bd.m.Load(key.String())
|
|
|
|
if !ok {
|
|
|
|
return nil, ok
|
|
|
|
}
|
|
|
|
return v.(*blksInfo), ok
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) {
|
|
|
|
bd.m.Store(key.String(), d)
|
2022-09-12 17:03:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type ConsistentBCast struct {
|
|
|
|
lk sync.Mutex
|
|
|
|
delay time.Duration
|
|
|
|
// FIXME: Make this a slice??? Less storage but needs indexing logic.
|
|
|
|
m map[abi.ChainEpoch]*bcastDict
|
|
|
|
}
|
|
|
|
|
2022-09-14 17:59:29 +00:00
|
|
|
func newBcastDict() *bcastDict {
|
2022-09-13 10:33:28 +00:00
|
|
|
return &bcastDict{new(sync.Map)}
|
2022-09-12 17:03:06 +00:00
|
|
|
}
|
|
|
|
|
2022-12-13 10:13:07 +00:00
|
|
|
// TODO: the VRFProof may already be small enough so we may not need to use a hash here.
|
|
|
|
// we can maybe bypass the useless computation.
|
2022-09-13 10:33:28 +00:00
|
|
|
func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) {
|
2022-09-14 17:59:29 +00:00
|
|
|
k := make([]byte, len(bh.Ticket.VRFProof))
|
|
|
|
copy(k, bh.Ticket.VRFProof)
|
|
|
|
binary.PutVarint(k, int64(bh.Height))
|
|
|
|
return multihash.Sum(k, multihash.SHA2_256, -1)
|
2022-09-12 17:03:06 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
|
|
|
|
return &ConsistentBCast{
|
|
|
|
delay: delay,
|
|
|
|
m: make(map[abi.ChainEpoch]*bcastDict),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func cidExists(cids []cid.Cid, c cid.Cid) bool {
|
|
|
|
for _, v := range cids {
|
|
|
|
if v == c {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (bInfo *blksInfo) eqErr() error {
|
|
|
|
bInfo.cancel()
|
|
|
|
return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen")
|
|
|
|
}
|
|
|
|
|
2022-09-13 14:06:18 +00:00
|
|
|
func (cb *ConsistentBCast) Len() int {
|
|
|
|
return len(cb.m)
|
|
|
|
}
|
|
|
|
|
2022-09-12 17:03:06 +00:00
|
|
|
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error {
|
|
|
|
cb.lk.Lock()
|
|
|
|
bcastDict, ok := cb.m[blk.Header.Height]
|
|
|
|
if !ok {
|
2022-09-14 17:59:29 +00:00
|
|
|
bcastDict = newBcastDict()
|
2022-09-13 10:33:28 +00:00
|
|
|
cb.m[blk.Header.Height] = bcastDict
|
2022-09-12 17:03:06 +00:00
|
|
|
}
|
|
|
|
cb.lk.Unlock()
|
2022-09-13 10:33:28 +00:00
|
|
|
key, err := BCastKey(blk.Header)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-09-12 17:03:06 +00:00
|
|
|
blkCid := blk.Cid()
|
|
|
|
|
2022-09-13 10:33:28 +00:00
|
|
|
bInfo, ok := bcastDict.load(key)
|
2022-09-12 17:03:06 +00:00
|
|
|
if ok {
|
|
|
|
if len(bInfo.blks) > 1 {
|
|
|
|
return bInfo.eqErr()
|
|
|
|
}
|
|
|
|
|
|
|
|
if !cidExists(bInfo.blks, blkCid) {
|
|
|
|
bInfo.blks = append(bInfo.blks, blkCid)
|
|
|
|
return bInfo.eqErr()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-09-14 17:59:29 +00:00
|
|
|
ctx, cancel := context.WithTimeout(ctx, cb.delay)
|
2022-09-13 10:33:28 +00:00
|
|
|
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
|
2022-09-12 17:03:06 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
|
|
|
|
bcastDict := cb.m[bh.Height]
|
2022-09-13 10:33:28 +00:00
|
|
|
key, err := BCastKey(bh)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
bInfo, ok := bcastDict.load(key)
|
2022-09-12 17:03:06 +00:00
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
|
|
|
|
}
|
|
|
|
// Wait for the timeout
|
|
|
|
<-bInfo.ctx.Done()
|
|
|
|
if len(bInfo.blks) > 1 {
|
|
|
|
return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
|
|
|
|
cb.lk.Lock()
|
|
|
|
defer cb.lk.Unlock()
|
|
|
|
|
|
|
|
// keep currEpoch-2 and delete a few more in the past
|
|
|
|
// as a sanity-check
|
2022-09-13 10:33:28 +00:00
|
|
|
// Garbage collection is triggered before block delivery,
|
|
|
|
// and we use the sanity-check in case there were a few rounds
|
|
|
|
// without delivery, and the garbage collection wasn't triggered
|
|
|
|
// for a few epochs.
|
2022-12-13 10:13:07 +00:00
|
|
|
for i := 0; i < GcSanityCheck; i++ {
|
|
|
|
if currEpoch > GcLookback {
|
|
|
|
delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i))
|
2022-09-13 14:06:18 +00:00
|
|
|
}
|
2022-09-12 17:03:06 +00:00
|
|
|
}
|
|
|
|
}
|