Revert "Merge pull request #9858 from adlrocha/adlrocha/consistent-bcast"

This reverts commit 8b2208fd9a, reversing
changes made to 2db6b12b78.

Unfortunately, this is rather tricky code. We've found several issues so
far and, while we've fixed a few, there are outstanding issues that
would require complex fixes we don't have time to tackle right now.

Luckily, this code isn't actually needed by the main Filecoin chain
which relies on consensus fault reporting to handle equivocation. So we
can just try again later.
This commit is contained in:
Steven Allen 2023-04-27 12:18:02 -07:00
parent 727a71186b
commit bb5ba64cca
11 changed files with 2 additions and 487 deletions

View File

@ -6,7 +6,6 @@ package build
import (
"os"
"strconv"
"time"
"github.com/ipfs/go-cid"
@ -146,7 +145,3 @@ const BootstrapPeerThreshold = 1
const Eip155ChainId = 31415926
var WhitelistedBlock = cid.Undef
// Reducing the delivery delay for equivocation of
// consistent broadcast to just half a second.
var CBDeliveryDelay = 500 * time.Millisecond

View File

@ -4,8 +4,6 @@
package build
import (
"time"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
@ -93,8 +91,3 @@ const BootstrapPeerThreshold = 2
const Eip155ChainId = 3141592
var WhitelistedBlock = cid.Undef
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second

View File

@ -6,7 +6,6 @@ package build
import (
"os"
"strconv"
"time"
"github.com/ipfs/go-cid"
@ -129,8 +128,3 @@ const BootstrapPeerThreshold = 4
const Eip155ChainId = 314159
var WhitelistedBlock = cid.Undef
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second

View File

@ -6,7 +6,6 @@ package build
import (
"os"
"strconv"
"time"
"github.com/ipfs/go-cid"
@ -135,8 +134,3 @@ const BootstrapPeerThreshold = 2
const Eip155ChainId = 3141592
var WhitelistedBlock = cid.Undef
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second

View File

@ -7,7 +7,6 @@ import (
"math"
"os"
"strconv"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
@ -148,8 +147,3 @@ const Eip155ChainId = 314
// we skip checks on message validity in this block to sidestep the zero-bls signature
var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi")
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
// This determines the wait time for the detection of potential equivocations.
// It is a variable instead of a constant so it can be conveniently configured in tests
var CBDeliveryDelay = 2 * time.Second

View File

@ -1,203 +0,0 @@
package bcast
import (
"context"
"sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("sub-cb")
const (
// GcSanityCheck determines the number of epochs in the past
// that will be garbage collected from the current epoch.
GcSanityCheck = 100
// GcLookback determines the number of epochs kept in the consistent
// broadcast cache.
GcLookback = 5
// GcDeepCheck determines the number of epochs in the past that we
// we try cleaning in the deep garbage collection round.
GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch
// GcDeepInterval determines after the number of epochs for which
// we are going to start a deeper garbage collection round.
GcDeepInterval = 1000
)
type blksInfo struct {
ctx context.Context
cancel context.CancelFunc
blks []cid.Cid
}
type bcastDict struct {
m map[string]*blksInfo
}
func (bd *bcastDict) load(key []byte) (*blksInfo, bool) {
v, ok := bd.m[string(key)]
if !ok {
return nil, ok
}
return v, ok
}
func (bd *bcastDict) blkLen(key []byte) int {
return len(bd.m[string(key)].blks)
}
func (bd *bcastDict) store(key []byte, d *blksInfo) {
bd.m[string(key)] = d
}
// ConsistentBCast tracks recent information about the
// blocks and tickets received at different epochs
type ConsistentBCast struct {
lk sync.RWMutex
delay time.Duration
m map[abi.ChainEpoch]*bcastDict
lastDeepGc abi.ChainEpoch
}
func newBcastDict() *bcastDict {
return &bcastDict{m: make(map[string]*blksInfo)}
}
func BCastKey(bh *types.BlockHeader) []byte {
return bh.Ticket.VRFProof
}
func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
return &ConsistentBCast{
delay: delay,
m: make(map[abi.ChainEpoch]*bcastDict),
}
}
func cidExists(cids []cid.Cid, c cid.Cid) bool {
for _, v := range cids {
if v == c {
return true
}
}
return false
}
func (bInfo *blksInfo) eqErr() error {
bInfo.cancel()
return xerrors.Errorf("different blocks with the same ticket already seen")
}
func (cb *ConsistentBCast) Len() int {
cb.lk.RLock()
defer cb.lk.RUnlock()
return len(cb.m)
}
// RcvBlock is called every time a new block is received through the network.
//
// This function keeps track of all the blocks with a specific VRFProof received
// for the same height. Every time a new block with a VRFProof not seen at certain
// height is received, a new timer is triggered to wait for the delay time determined by
// the consistent broadcast before informing the syncer. During this time, if a new
// block with the same VRFProof for that height is received, it means a miner is
// trying to equivocate, and both blocks are discarded.
//
// The delay time should be set to a value high enough to allow any block sent for
// certain epoch to be propagated to a large amount of miners in the network.
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
cb.lk.Lock()
defer cb.lk.Unlock()
bcastDict, ok := cb.m[blk.Header.Height]
if !ok {
bcastDict = newBcastDict()
cb.m[blk.Header.Height] = bcastDict
}
key := BCastKey(blk.Header)
blkCid := blk.Cid()
bInfo, ok := bcastDict.load(key)
if ok {
if len(bInfo.blks) > 1 {
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
return
}
if !cidExists(bInfo.blks, blkCid) {
bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)})
// By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for
// the epoch-ticket combination making them to fail and not be sent to the syncer, as
// a potential equivocation is detected.
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
return
}
return
}
ctx, cancel := context.WithTimeout(ctx, cb.delay)
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
}
// WaitForDelivery is called before informing the syncer about a new block
// to check if the consistent broadcast delay triggered or if the block should
// be held off for a bit more time.
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
cb.lk.RLock()
defer cb.lk.RUnlock()
bcastDict, ok := cb.m[bh.Height]
if !ok {
return xerrors.Errorf("block at height %d garbage collected before it could be processed", bh.Height)
}
key := BCastKey(bh)
bInfo, ok := bcastDict.load(key)
if !ok {
return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
}
// Wait for the timeout
cb.lk.RUnlock()
<-bInfo.ctx.Done()
cb.lk.RLock()
if bcastDict.blkLen(key) > 1 {
return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
}
return nil
}
// GarbageCollect cleans the consistent broadcast cache periodically.
//
// A light garbage collection is triggered before every block delivery
// while a deeper one is triggered once every GcDeepCheck to ensure
// that nothing was left behind.
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
cb.lk.Lock()
defer cb.lk.Unlock()
// perform a deeper sanity check every now and then
gcRange := GcSanityCheck
if cb.lastDeepGc+GcDeepInterval > currEpoch {
gcRange = GcDeepCheck
cb.lastDeepGc = currEpoch
}
// keep currEpoch-gcRange and delete a few more in the past
// as a sanity-check
// Garbage collection is triggered before block delivery,
// and we use the sanity-check in case there were a few rounds
// without delivery, and the garbage collection wasn't triggered
// for a few epochs.
for i := 0; i < gcRange; i++ {
if currEpoch > GcLookback {
delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i))
}
}
}

View File

@ -1,223 +0,0 @@
package bcast_test
import (
"context"
"crypto/rand"
"fmt"
mrand "math/rand"
"strconv"
"sync"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/sub/bcast"
"github.com/filecoin-project/lotus/chain/types"
)
const TEST_DELAY = 1 * time.Second
func TestSimpleDelivery(t *testing.T) {
cb := bcast.NewConsistentBCast(TEST_DELAY)
// Check that we wait for delivery.
start := time.Now()
testSimpleDelivery(t, cb, 100, 5)
since := time.Since(start)
require.GreaterOrEqual(t, since, TEST_DELAY)
}
func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) {
ctx := context.Background()
wg := new(sync.WaitGroup)
errs := make([]error, 0)
wg.Add(numBlocks)
for i := 0; i < numBlocks; i++ {
go func(i int) {
defer wg.Done()
// Add a random delay in block reception
r := mrand.Intn(200)
time.Sleep(time.Duration(r) * time.Millisecond)
blk := newBlock(t, epoch, randomProof(t), []byte("test"+strconv.Itoa(i)))
cb.RcvBlock(ctx, blk)
err := cb.WaitForDelivery(blk.Header)
if err != nil {
errs = append(errs, err)
}
}(i)
}
wg.Wait()
for _, v := range errs {
t.Fatalf("error in delivery: %s", v)
}
}
func TestSeveralEpochs(t *testing.T) {
cb := bcast.NewConsistentBCast(TEST_DELAY)
numEpochs := 6
wg := new(sync.WaitGroup)
wg.Add(numEpochs)
for i := 0; i < numEpochs; i++ {
go func(i int) {
defer wg.Done()
// Add a random delay between epochs
r := mrand.Intn(500)
time.Sleep(time.Duration(i)*TEST_DELAY + time.Duration(r)*time.Millisecond)
rNumBlocks := mrand.Intn(5)
flip, err := flipCoin(0.7)
require.NoError(t, err)
t.Logf("Running epoch %d with %d with equivocation=%v", i, rNumBlocks, !flip)
if flip {
testSimpleDelivery(t, cb, abi.ChainEpoch(i), rNumBlocks)
} else {
testEquivocation(t, cb, abi.ChainEpoch(i), rNumBlocks)
}
cb.GarbageCollect(abi.ChainEpoch(i))
}(i)
}
wg.Wait()
require.Equal(t, cb.Len(), numEpochs)
}
// bias is expected to be 0-1
func flipCoin(bias float32) (bool, error) {
if bias > 1 || bias < 0 {
return false, fmt.Errorf("wrong bias. expected (0,1)")
}
r := mrand.Intn(100)
return r < int(bias*100), nil
}
func testEquivocation(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) {
ctx := context.Background()
wg := new(sync.WaitGroup)
errs := make([]error, 0)
wg.Add(numBlocks + 1)
for i := 0; i < numBlocks; i++ {
proof := randomProof(t)
// Valid blocks
go func(i int, proof []byte) {
defer wg.Done()
r := mrand.Intn(200)
time.Sleep(time.Duration(r) * time.Millisecond)
blk := newBlock(t, epoch, proof, []byte("valid"+strconv.Itoa(i)))
cb.RcvBlock(ctx, blk)
err := cb.WaitForDelivery(blk.Header)
if err != nil {
errs = append(errs, err)
}
}(i, proof)
// Equivocation for the last block
if i == numBlocks-1 {
// Attempting equivocation
go func(i int, proof []byte) {
defer wg.Done()
// Use the same proof and the same epoch
blk := newBlock(t, epoch, proof, []byte("invalid"+strconv.Itoa(i)))
cb.RcvBlock(ctx, blk)
err := cb.WaitForDelivery(blk.Header)
// Equivocation detected
require.Error(t, err)
}(i, proof)
}
}
wg.Wait()
// The equivocated block arrived too late, so
// we delivered all the valid blocks.
require.Len(t, errs, 1)
}
func TestEquivocation(t *testing.T) {
cb := bcast.NewConsistentBCast(TEST_DELAY)
testEquivocation(t, cb, 100, 5)
}
func TestFailedEquivocation(t *testing.T) {
cb := bcast.NewConsistentBCast(TEST_DELAY)
ctx := context.Background()
numBlocks := 5
wg := new(sync.WaitGroup)
errs := make([]error, 0)
wg.Add(numBlocks + 1)
for i := 0; i < numBlocks; i++ {
proof := randomProof(t)
// Valid blocks
go func(i int, proof []byte) {
defer wg.Done()
r := mrand.Intn(200)
time.Sleep(time.Duration(r) * time.Millisecond)
blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i)))
cb.RcvBlock(ctx, blk)
err := cb.WaitForDelivery(blk.Header)
if err != nil {
errs = append(errs, err)
}
}(i, proof)
// Equivocation for the last block
if i == numBlocks-1 {
// Attempting equivocation
go func(i int, proof []byte) {
defer wg.Done()
// The equivocated block arrives late
time.Sleep(2 * TEST_DELAY)
// Use the same proof and the same epoch
blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i)))
cb.RcvBlock(ctx, blk)
err := cb.WaitForDelivery(blk.Header)
// Equivocation detected
require.Error(t, err)
}(i, proof)
}
}
wg.Wait()
// The equivocated block arrived too late, so
// we delivered all the valid blocks.
require.Len(t, errs, 0)
}
func randomProof(t *testing.T) []byte {
proof := make([]byte, 10)
_, err := rand.Read(proof)
if err != nil {
t.Fatal(err)
}
return proof
}
func newBlock(t *testing.T, epoch abi.ChainEpoch, proof []byte, mCidSeed []byte) *types.BlockMsg {
h, err := multihash.Sum(mCidSeed, multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
testCid := cid.NewCidV0(h)
addr, err := address.NewIDAddress(10)
if err != nil {
t.Fatal(err)
}
bh := &types.BlockHeader{
Miner: addr,
ParentStateRoot: testCid,
ParentMessageReceipts: testCid,
Ticket: &types.Ticket{
VRFProof: proof,
},
Height: epoch,
Messages: testCid,
}
return &types.BlockMsg{
Header: bh,
}
}

View File

@ -27,7 +27,6 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub/bcast"
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
@ -44,11 +43,10 @@ var msgCidPrefix = cid.Prefix{
MhLength: 32,
}
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self peer.ID, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
cb := bcast.NewConsistentBCast(build.CBDeliveryDelay)
for {
msg, err := bsub.Next(ctx)
@ -69,9 +67,6 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p
src := msg.GetFrom()
// Notify consistent broadcast about a new block
cb.RcvBlock(ctx, blk)
go func() {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
@ -107,20 +102,6 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
}
// When we propose a new block ourselves, the proposed block also gets here through SyncSubmitBlock.
// If we are the block proposers we don't need to wait for delivery, we know the blocks are
// honest.
if src != self {
log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height)
if err := cb.WaitForDelivery(blk.Header); err != nil {
log.Errorf("not informing syncer about new block, potential equivocation detected (cid: %s, source: %s): %s; ", blk.Header.Cid(), src, err)
return
}
}
// Garbage collect the broadcast state
cb.GarbageCollect(blk.Header.Height)
log.Debugf("Block in height %v delivered successfully (cid=%s)", blk.Header.Height, blk.Cid())
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,

View File

@ -45,11 +45,6 @@ func init() {
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048))
policy.SetMinVerifiedDealSize(abi.NewStoragePower(256))
// these tests assume really fast block times. disabling
// the consistent broadcast delay to avoid them from adding
// an unnecessary overhead.
build.CBDeliveryDelay = 2 * time.Millisecond
}
const source = 0

View File

@ -40,11 +40,6 @@ func init() {
build.InsecurePoStValidation = true
// Disabling consistent broadcast in itests. Many tests use really fast
// block times, and adding this additional delay for block delivery
// would make these tests to fail.
build.CBDeliveryDelay = 0
if err := os.Setenv("BELLMAN_NO_GPU", "1"); err != nil {
panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err))
}

View File

@ -167,7 +167,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx,
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, h.ID(), s, bserv, h.ConnManager())
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {