diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go new file mode 100644 index 000000000..918a6819f --- /dev/null +++ b/chain/sub/bcast/consistent.go @@ -0,0 +1,132 @@ +package bcast + +import ( + "context" + "encoding/binary" + "fmt" + "sync" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" +) + +// TODO: Take const out of here and make them build params. +const ( + DELAY = 6 * time.Second + GC_SANITY_CHECK = 5 + GC_LOOKBACK = 2 +) + +type blksInfo struct { + ctx context.Context + cancel context.CancelFunc + blks []cid.Cid +} + +type bcastDict struct { + // TODO: Consider making this a KeyMutexed map + lk sync.RWMutex + blks map[cid.Cid]*blksInfo // map[epoch + VRFProof]blksInfo +} + +type ConsistentBCast struct { + lk sync.Mutex + delay time.Duration + // FIXME: Make this a slice??? Less storage but needs indexing logic. + m map[abi.ChainEpoch]*bcastDict +} + +func newBcastDict(delay time.Duration) *bcastDict { + return &bcastDict{ + blks: make(map[cid.Cid]*blksInfo), + } +} + +// TODO: What if the VRFProof is already small?? We donĀ“t need the CID. Useless computation. +func BCastKey(bh *types.BlockHeader) cid.Cid { + proof := bh.Ticket.VRFProof + binary.PutVarint(proof, int64(bh.Height)) + return cid.NewCidV0(multihash.Multihash(proof)) +} + +func NewConsistentBCast(delay time.Duration) *ConsistentBCast { + return &ConsistentBCast{ + delay: delay, + m: make(map[abi.ChainEpoch]*bcastDict), + } +} + +func cidExists(cids []cid.Cid, c cid.Cid) bool { + for _, v := range cids { + if v == c { + return true + } + } + return false +} + +func (bInfo *blksInfo) eqErr() error { + bInfo.cancel() + return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen") +} + +func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error { + cb.lk.Lock() + bcastDict, ok := cb.m[blk.Header.Height] + if !ok { + bcastDict = newBcastDict(cb.delay) + } + cb.lk.Unlock() + key := BCastKey(blk.Header) + blkCid := blk.Cid() + + bcastDict.lk.Lock() + defer bcastDict.lk.Unlock() + bInfo, ok := bcastDict.blks[key] + if ok { + if len(bInfo.blks) > 1 { + return bInfo.eqErr() + } + + if !cidExists(bInfo.blks, blkCid) { + bInfo.blks = append(bInfo.blks, blkCid) + return bInfo.eqErr() + } + return nil + } + + ctx, cancel := context.WithTimeout(ctx, cb.delay) + bcastDict.blks[key] = &blksInfo{ctx, cancel, []cid.Cid{blkCid}} + return nil +} + +func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { + bcastDict := cb.m[bh.Height] + key := BCastKey(bh) + bcastDict.lk.RLock() + defer bcastDict.lk.RUnlock() + bInfo, ok := bcastDict.blks[key] + if !ok { + return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) + } + // Wait for the timeout + <-bInfo.ctx.Done() + if len(bInfo.blks) > 1 { + return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) + } + return nil +} + +func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { + cb.lk.Lock() + defer cb.lk.Unlock() + + // keep currEpoch-2 and delete a few more in the past + // as a sanity-check + for i := 0; i < GC_SANITY_CHECK; i++ { + delete(cb.m, currEpoch-abi.ChainEpoch(2-i)) + } +} diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go new file mode 100644 index 000000000..2f3a9e4de --- /dev/null +++ b/chain/sub/bcast/consistent_test.go @@ -0,0 +1,29 @@ +package bcast_test + +import ( + "crypto/rand" + "testing" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" +) + +func TestSimpleDelivery(t *testing.T) { +} + +func newBlock(t *testing.T, epoch abi.ChainEpoch) *types.BlockMsg { + proof := make([]byte, 10) + _, err := rand.Read(proof) + if err != err { + t.Fatal(err) + } + bh := &types.BlockHeader{ + Ticket: &types.Ticket{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + Height: 85919298723, + } + return &types.BlockMsg{ + Header: bh, + } +} diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index b8427e036..e03241c23 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/sub/bcast" "github.com/filecoin-project/lotus/chain/sub/ratelimit" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" @@ -47,6 +48,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha // Timeout after (block time + propagation delay). This is useless at // this point. timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second + cb := bcast.NewConsistentBCast(bcast.DELAY) for { msg, err := bsub.Next(ctx) @@ -67,6 +69,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() + // Notify consistent broadcast about a new block + cb.RcvBlock(ctx, blk) + go func() { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -102,6 +107,14 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner) } + if err := cb.WaitForDelivery(blk.Header); err != nil { + log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) + return + } + + // Garbage collect the broadcast state + cb.GarbageCollect(blk.Header.Height) + if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, BlsMessages: bmsgs,