From c03ad9dcfd28b33f8673d74a2f9ddf140f773bf5 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Mon, 12 Sep 2022 19:03:06 +0200 Subject: [PATCH 01/27] wip: draft impl of consistent bcast --- chain/sub/bcast/consistent.go | 132 +++++++++++++++++++++++++++++ chain/sub/bcast/consistent_test.go | 29 +++++++ chain/sub/incoming.go | 13 +++ 3 files changed, 174 insertions(+) create mode 100644 chain/sub/bcast/consistent.go create mode 100644 chain/sub/bcast/consistent_test.go diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go new file mode 100644 index 000000000..918a6819f --- /dev/null +++ b/chain/sub/bcast/consistent.go @@ -0,0 +1,132 @@ +package bcast + +import ( + "context" + "encoding/binary" + "fmt" + "sync" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" +) + +// TODO: Take const out of here and make them build params. +const ( + DELAY = 6 * time.Second + GC_SANITY_CHECK = 5 + GC_LOOKBACK = 2 +) + +type blksInfo struct { + ctx context.Context + cancel context.CancelFunc + blks []cid.Cid +} + +type bcastDict struct { + // TODO: Consider making this a KeyMutexed map + lk sync.RWMutex + blks map[cid.Cid]*blksInfo // map[epoch + VRFProof]blksInfo +} + +type ConsistentBCast struct { + lk sync.Mutex + delay time.Duration + // FIXME: Make this a slice??? Less storage but needs indexing logic. + m map[abi.ChainEpoch]*bcastDict +} + +func newBcastDict(delay time.Duration) *bcastDict { + return &bcastDict{ + blks: make(map[cid.Cid]*blksInfo), + } +} + +// TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation. +func BCastKey(bh *types.BlockHeader) cid.Cid { + proof := bh.Ticket.VRFProof + binary.PutVarint(proof, int64(bh.Height)) + return cid.NewCidV0(multihash.Multihash(proof)) +} + +func NewConsistentBCast(delay time.Duration) *ConsistentBCast { + return &ConsistentBCast{ + delay: delay, + m: make(map[abi.ChainEpoch]*bcastDict), + } +} + +func cidExists(cids []cid.Cid, c cid.Cid) bool { + for _, v := range cids { + if v == c { + return true + } + } + return false +} + +func (bInfo *blksInfo) eqErr() error { + bInfo.cancel() + return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen") +} + +func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error { + cb.lk.Lock() + bcastDict, ok := cb.m[blk.Header.Height] + if !ok { + bcastDict = newBcastDict(cb.delay) + } + cb.lk.Unlock() + key := BCastKey(blk.Header) + blkCid := blk.Cid() + + bcastDict.lk.Lock() + defer bcastDict.lk.Unlock() + bInfo, ok := bcastDict.blks[key] + if ok { + if len(bInfo.blks) > 1 { + return bInfo.eqErr() + } + + if !cidExists(bInfo.blks, blkCid) { + bInfo.blks = append(bInfo.blks, blkCid) + return bInfo.eqErr() + } + return nil + } + + ctx, cancel := context.WithTimeout(ctx, cb.delay) + bcastDict.blks[key] = &blksInfo{ctx, cancel, []cid.Cid{blkCid}} + return nil +} + +func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { + bcastDict := cb.m[bh.Height] + key := BCastKey(bh) + bcastDict.lk.RLock() + defer bcastDict.lk.RUnlock() + bInfo, ok := bcastDict.blks[key] + if !ok { + return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) + } + // Wait for the timeout + <-bInfo.ctx.Done() + if len(bInfo.blks) > 1 { + return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) + } + return nil +} + +func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { + cb.lk.Lock() + defer cb.lk.Unlock() + + // keep currEpoch-2 and delete a few more in the past + // as a sanity-check + for i := 0; i < GC_SANITY_CHECK; i++ { + delete(cb.m, currEpoch-abi.ChainEpoch(2-i)) + } +} diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go new file mode 100644 index 000000000..2f3a9e4de --- /dev/null +++ b/chain/sub/bcast/consistent_test.go @@ -0,0 +1,29 @@ +package bcast_test + +import ( + "crypto/rand" + "testing" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" +) + +func TestSimpleDelivery(t *testing.T) { +} + +func newBlock(t *testing.T, epoch abi.ChainEpoch) *types.BlockMsg { + proof := make([]byte, 10) + _, err := rand.Read(proof) + if err != err { + t.Fatal(err) + } + bh := &types.BlockHeader{ + Ticket: &types.Ticket{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + Height: 85919298723, + } + return &types.BlockMsg{ + Header: bh, + } +} diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index b8427e036..e03241c23 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/sub/bcast" "github.com/filecoin-project/lotus/chain/sub/ratelimit" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" @@ -47,6 +48,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha // Timeout after (block time + propagation delay). This is useless at // this point. timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second + cb := bcast.NewConsistentBCast(bcast.DELAY) for { msg, err := bsub.Next(ctx) @@ -67,6 +69,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha src := msg.GetFrom() + // Notify consistent broadcast about a new block + cb.RcvBlock(ctx, blk) + go func() { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -102,6 +107,14 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner) } + if err := cb.WaitForDelivery(blk.Header); err != nil { + log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) + return + } + + // Garbage collect the broadcast state + cb.GarbageCollect(blk.Header.Height) + if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, BlsMessages: bmsgs, From 76031d72ad955af9f4a04a74b606e959a02f2c77 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Sep 2022 12:33:28 +0200 Subject: [PATCH 02/27] minor fixes. unit tests added --- chain/sub/bcast/consistent.go | 55 +++++--- chain/sub/bcast/consistent_test.go | 198 ++++++++++++++++++++++++++++- 2 files changed, 231 insertions(+), 22 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 918a6819f..af31389ee 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -27,9 +27,23 @@ type blksInfo struct { } type bcastDict struct { - // TODO: Consider making this a KeyMutexed map - lk sync.RWMutex - blks map[cid.Cid]*blksInfo // map[epoch + VRFProof]blksInfo + // thread-safe map impl for the dictionary + // sync.Map accepts `any` as keys and values. + // To make it type safe and only support the right + // types we use this auxiliary type. + m *sync.Map +} + +func (bd *bcastDict) load(key multihash.Multihash) (*blksInfo, bool) { + v, ok := bd.m.Load(key.String()) + if !ok { + return nil, ok + } + return v.(*blksInfo), ok +} + +func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) { + bd.m.Store(key.String(), d) } type ConsistentBCast struct { @@ -40,16 +54,14 @@ type ConsistentBCast struct { } func newBcastDict(delay time.Duration) *bcastDict { - return &bcastDict{ - blks: make(map[cid.Cid]*blksInfo), - } + return &bcastDict{new(sync.Map)} } // TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation. -func BCastKey(bh *types.BlockHeader) cid.Cid { +func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { proof := bh.Ticket.VRFProof binary.PutVarint(proof, int64(bh.Height)) - return cid.NewCidV0(multihash.Multihash(proof)) + return multihash.Sum(proof, multihash.SHA2_256, -1) } func NewConsistentBCast(delay time.Duration) *ConsistentBCast { @@ -78,14 +90,16 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er bcastDict, ok := cb.m[blk.Header.Height] if !ok { bcastDict = newBcastDict(cb.delay) + cb.m[blk.Header.Height] = bcastDict } cb.lk.Unlock() - key := BCastKey(blk.Header) + key, err := BCastKey(blk.Header) + if err != nil { + return err + } blkCid := blk.Cid() - bcastDict.lk.Lock() - defer bcastDict.lk.Unlock() - bInfo, ok := bcastDict.blks[key] + bInfo, ok := bcastDict.load(key) if ok { if len(bInfo.blks) > 1 { return bInfo.eqErr() @@ -98,17 +112,18 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er return nil } - ctx, cancel := context.WithTimeout(ctx, cb.delay) - bcastDict.blks[key] = &blksInfo{ctx, cancel, []cid.Cid{blkCid}} + ctx, cancel := context.WithTimeout(ctx, cb.delay*time.Second) + bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) return nil } func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { bcastDict := cb.m[bh.Height] - key := BCastKey(bh) - bcastDict.lk.RLock() - defer bcastDict.lk.RUnlock() - bInfo, ok := bcastDict.blks[key] + key, err := BCastKey(bh) + if err != nil { + return err + } + bInfo, ok := bcastDict.load(key) if !ok { return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) } @@ -126,6 +141,10 @@ func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { // keep currEpoch-2 and delete a few more in the past // as a sanity-check + // Garbage collection is triggered before block delivery, + // and we use the sanity-check in case there were a few rounds + // without delivery, and the garbage collection wasn't triggered + // for a few epochs. for i := 0; i < GC_SANITY_CHECK; i++ { delete(cb.m, currEpoch-abi.ChainEpoch(2-i)) } diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go index 2f3a9e4de..a2945b46b 100644 --- a/chain/sub/bcast/consistent_test.go +++ b/chain/sub/bcast/consistent_test.go @@ -1,27 +1,217 @@ package bcast_test import ( + "context" "crypto/rand" + "fmt" + mrand "math/rand" + "strconv" + "sync" "testing" + "time" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/sub/bcast" "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" ) +const TEST_DELAY = 1 + func TestSimpleDelivery(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + // Check that we wait for delivery. + start := time.Now() + testSimpleDelivery(t, cb, 100, 5) + since := time.Since(start) + require.GreaterOrEqual(t, since, TEST_DELAY*time.Second) } -func newBlock(t *testing.T, epoch abi.ChainEpoch) *types.BlockMsg { +func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) { + ctx := context.Background() + + wg := new(sync.WaitGroup) + errs := make([]error, 0) + wg.Add(numBlocks) + for i := 0; i < numBlocks; i++ { + go func(i int) { + // Add a random delay in block reception + r := mrand.Intn(200) + time.Sleep(time.Duration(r) * time.Millisecond) + blk := newBlock(t, epoch, randomProof(t), []byte("test"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + if err != nil { + errs = append(errs, err) + } + wg.Done() + }(i) + } + wg.Wait() + + for _, v := range errs { + t.Fatalf("error in delivery: %s", v) + } +} + +func TestSeveralEpochs(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + numEpochs := 5 + wg := new(sync.WaitGroup) + wg.Add(numEpochs) + for i := 0; i < numEpochs; i++ { + go func(i int) { + // Add a random delay between epochs + r := mrand.Intn(500) + time.Sleep(time.Duration(i*TEST_DELAY)*time.Second + time.Duration(r)*time.Millisecond) + rNumBlocks := mrand.Intn(5) + flip, err := flipCoin(0.7) + require.NoError(t, err) + t.Logf("Running epoch %d with %d with equivocation=%v", i, rNumBlocks, !flip) + if flip { + testSimpleDelivery(t, cb, abi.ChainEpoch(i), rNumBlocks) + } else { + testEquivocation(t, cb, abi.ChainEpoch(i), rNumBlocks) + } + wg.Done() + }(i) + } + wg.Wait() +} + +// bias is expected to be 0-1 +func flipCoin(bias float32) (bool, error) { + if bias > 1 || bias < 0 { + return false, fmt.Errorf("wrong bias. expected (0,1)") + } + r := mrand.Intn(100) + return r < int(bias*100), nil +} + +func testEquivocation(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) { + ctx := context.Background() + + wg := new(sync.WaitGroup) + errs := make([]error, 0) + wg.Add(numBlocks + 1) + for i := 0; i < numBlocks; i++ { + proof := randomProof(t) + // Valid blocks + go func(i int, proof []byte) { + r := mrand.Intn(200) + time.Sleep(time.Duration(r) * time.Millisecond) + blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + if err != nil { + errs = append(errs, err) + } + wg.Done() + }(i, proof) + + // Equivocation for the last block + if i == numBlocks-1 { + // Attempting equivocation + go func(i int, proof []byte) { + // Use the same proof and the same epoch + blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + // Equivocation detected + require.Error(t, err) + wg.Done() + }(i, proof) + } + } + wg.Wait() + + // The equivocated block arrived too late, so + // we delivered all the valid blocks. + require.Len(t, errs, 1) +} + +func TestEquivocation(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + testEquivocation(t, cb, 100, 5) +} + +func TestFailedEquivocation(t *testing.T) { + cb := bcast.NewConsistentBCast(TEST_DELAY) + ctx := context.Background() + numBlocks := 5 + + wg := new(sync.WaitGroup) + errs := make([]error, 0) + wg.Add(numBlocks + 1) + for i := 0; i < numBlocks; i++ { + proof := randomProof(t) + // Valid blocks + go func(i int, proof []byte) { + r := mrand.Intn(200) + time.Sleep(time.Duration(r) * time.Millisecond) + blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + if err != nil { + errs = append(errs, err) + } + wg.Done() + }(i, proof) + + // Equivocation for the last block + if i == numBlocks-1 { + // Attempting equivocation + go func(i int, proof []byte) { + // The equivocated block arrives late + time.Sleep(2 * TEST_DELAY * time.Second) + // Use the same proof and the same epoch + blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i))) + cb.RcvBlock(ctx, blk) + err := cb.WaitForDelivery(blk.Header) + // Equivocation detected + require.Error(t, err) + wg.Done() + }(i, proof) + } + } + wg.Wait() + + // The equivocated block arrived too late, so + // we delivered all the valid blocks. + require.Len(t, errs, 0) +} + +func randomProof(t *testing.T) []byte { proof := make([]byte, 10) _, err := rand.Read(proof) - if err != err { + if err != nil { + t.Fatal(err) + } + return proof +} + +func newBlock(t *testing.T, epoch abi.ChainEpoch, proof []byte, mCidSeed []byte) *types.BlockMsg { + h, err := multihash.Sum(mCidSeed, multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + testCid := cid.NewCidV0(h) + addr, err := address.NewIDAddress(10) + if err != nil { t.Fatal(err) } bh := &types.BlockHeader{ + Miner: addr, + ParentStateRoot: testCid, + ParentMessageReceipts: testCid, Ticket: &types.Ticket{ - VRFProof: []byte("vrf proof0000000vrf proof0000000"), + VRFProof: proof, }, - Height: 85919298723, + Height: epoch, + Messages: testCid, } return &types.BlockMsg{ Header: bh, From 1dadff303c2138d9fd9a467acf41a60cb080641b Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Sep 2022 16:06:18 +0200 Subject: [PATCH 03/27] added garbage collection test. minor fix --- chain/sub/bcast/consistent.go | 8 +++++++- chain/sub/bcast/consistent_test.go | 18 ++++++++++-------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index af31389ee..3ee1997e8 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -85,6 +85,10 @@ func (bInfo *blksInfo) eqErr() error { return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen") } +func (cb *ConsistentBCast) Len() int { + return len(cb.m) +} + func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error { cb.lk.Lock() bcastDict, ok := cb.m[blk.Header.Height] @@ -146,6 +150,8 @@ func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { // without delivery, and the garbage collection wasn't triggered // for a few epochs. for i := 0; i < GC_SANITY_CHECK; i++ { - delete(cb.m, currEpoch-abi.ChainEpoch(2-i)) + if currEpoch > GC_LOOKBACK { + delete(cb.m, currEpoch-abi.ChainEpoch(GC_LOOKBACK+i)) + } } } diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go index a2945b46b..1b1f4f34a 100644 --- a/chain/sub/bcast/consistent_test.go +++ b/chain/sub/bcast/consistent_test.go @@ -38,6 +38,7 @@ func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.Chain wg.Add(numBlocks) for i := 0; i < numBlocks; i++ { go func(i int) { + defer wg.Done() // Add a random delay in block reception r := mrand.Intn(200) time.Sleep(time.Duration(r) * time.Millisecond) @@ -47,7 +48,6 @@ func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.Chain if err != nil { errs = append(errs, err) } - wg.Done() }(i) } wg.Wait() @@ -64,6 +64,7 @@ func TestSeveralEpochs(t *testing.T) { wg.Add(numEpochs) for i := 0; i < numEpochs; i++ { go func(i int) { + defer wg.Done() // Add a random delay between epochs r := mrand.Intn(500) time.Sleep(time.Duration(i*TEST_DELAY)*time.Second + time.Duration(r)*time.Millisecond) @@ -76,10 +77,11 @@ func TestSeveralEpochs(t *testing.T) { } else { testEquivocation(t, cb, abi.ChainEpoch(i), rNumBlocks) } - wg.Done() + cb.GarbageCollect(abi.ChainEpoch(i)) }(i) } wg.Wait() + require.Equal(t, cb.Len(), bcast.GC_LOOKBACK) } // bias is expected to be 0-1 @@ -101,28 +103,28 @@ func testEquivocation(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEp proof := randomProof(t) // Valid blocks go func(i int, proof []byte) { + defer wg.Done() r := mrand.Intn(200) time.Sleep(time.Duration(r) * time.Millisecond) - blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i))) + blk := newBlock(t, epoch, proof, []byte("valid"+strconv.Itoa(i))) cb.RcvBlock(ctx, blk) err := cb.WaitForDelivery(blk.Header) if err != nil { errs = append(errs, err) } - wg.Done() }(i, proof) // Equivocation for the last block if i == numBlocks-1 { // Attempting equivocation go func(i int, proof []byte) { + defer wg.Done() // Use the same proof and the same epoch - blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i))) + blk := newBlock(t, epoch, proof, []byte("invalid"+strconv.Itoa(i))) cb.RcvBlock(ctx, blk) err := cb.WaitForDelivery(blk.Header) // Equivocation detected require.Error(t, err) - wg.Done() }(i, proof) } } @@ -150,6 +152,7 @@ func TestFailedEquivocation(t *testing.T) { proof := randomProof(t) // Valid blocks go func(i int, proof []byte) { + defer wg.Done() r := mrand.Intn(200) time.Sleep(time.Duration(r) * time.Millisecond) blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i))) @@ -158,13 +161,13 @@ func TestFailedEquivocation(t *testing.T) { if err != nil { errs = append(errs, err) } - wg.Done() }(i, proof) // Equivocation for the last block if i == numBlocks-1 { // Attempting equivocation go func(i int, proof []byte) { + defer wg.Done() // The equivocated block arrives late time.Sleep(2 * TEST_DELAY * time.Second) // Use the same proof and the same epoch @@ -173,7 +176,6 @@ func TestFailedEquivocation(t *testing.T) { err := cb.WaitForDelivery(blk.Header) // Equivocation detected require.Error(t, err) - wg.Done() }(i, proof) } } From d1a4f1dc50bb88ada311ba0b06f69a74703ab624 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Wed, 14 Sep 2022 19:59:29 +0200 Subject: [PATCH 04/27] fixed bugs in consistent broadcast integration --- chain/sub/bcast/consistent.go | 13 +++++++------ chain/sub/incoming.go | 16 +++++++++++----- node/modules/services.go | 2 +- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 3ee1997e8..5b6079cf5 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -53,15 +53,16 @@ type ConsistentBCast struct { m map[abi.ChainEpoch]*bcastDict } -func newBcastDict(delay time.Duration) *bcastDict { +func newBcastDict() *bcastDict { return &bcastDict{new(sync.Map)} } // TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation. func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { - proof := bh.Ticket.VRFProof - binary.PutVarint(proof, int64(bh.Height)) - return multihash.Sum(proof, multihash.SHA2_256, -1) + k := make([]byte, len(bh.Ticket.VRFProof)) + copy(k, bh.Ticket.VRFProof) + binary.PutVarint(k, int64(bh.Height)) + return multihash.Sum(k, multihash.SHA2_256, -1) } func NewConsistentBCast(delay time.Duration) *ConsistentBCast { @@ -93,7 +94,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er cb.lk.Lock() bcastDict, ok := cb.m[blk.Header.Height] if !ok { - bcastDict = newBcastDict(cb.delay) + bcastDict = newBcastDict() cb.m[blk.Header.Height] = bcastDict } cb.lk.Unlock() @@ -116,7 +117,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er return nil } - ctx, cancel := context.WithTimeout(ctx, cb.delay*time.Second) + ctx, cancel := context.WithTimeout(ctx, cb.delay) bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) return nil } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index e03241c23..6e7cf75ff 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -44,7 +44,7 @@ var msgCidPrefix = cid.Prefix{ MhLength: 32, } -func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { +func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self peer.ID, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { // Timeout after (block time + propagation delay). This is useless at // this point. timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second @@ -107,13 +107,19 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner) } - if err := cb.WaitForDelivery(blk.Header); err != nil { - log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) - return + // When we propose a new block ourselves, the proposed block also gets here through SyncSubmitBlock. + // If we are the block proposers we don't need to wait for delivery, we know the blocks are + // honest. + if src != self { + log.Infof("Waiting for consistent broadcast of block in height: %v", blk.Header.Height) + if err := cb.WaitForDelivery(blk.Header); err != nil { + log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) + return + } } - // Garbage collect the broadcast state cb.GarbageCollect(blk.Header.Height) + log.Infof("Block in height %v delivered successfully", blk.Header.Height) if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, diff --git a/node/modules/services.go b/node/modules/services.go index 18c0116aa..36fcf189e 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -166,7 +166,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, panic(err) } - go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) + go sub.HandleIncomingBlocks(ctx, blocksub, h.ID(), s, bserv, h.ConnManager()) } func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { From 72c80a3461bee4479d260f41a66fd3ad312d9d0e Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Fri, 16 Sep 2022 09:22:37 +0200 Subject: [PATCH 05/27] fixed unit tests --- chain/sub/bcast/consistent_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go index 1b1f4f34a..5dc2b198c 100644 --- a/chain/sub/bcast/consistent_test.go +++ b/chain/sub/bcast/consistent_test.go @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/require" ) -const TEST_DELAY = 1 +const TEST_DELAY = 1 * time.Second func TestSimpleDelivery(t *testing.T) { cb := bcast.NewConsistentBCast(TEST_DELAY) @@ -27,7 +27,7 @@ func TestSimpleDelivery(t *testing.T) { start := time.Now() testSimpleDelivery(t, cb, 100, 5) since := time.Since(start) - require.GreaterOrEqual(t, since, TEST_DELAY*time.Second) + require.GreaterOrEqual(t, since, TEST_DELAY) } func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) { @@ -67,7 +67,7 @@ func TestSeveralEpochs(t *testing.T) { defer wg.Done() // Add a random delay between epochs r := mrand.Intn(500) - time.Sleep(time.Duration(i*TEST_DELAY)*time.Second + time.Duration(r)*time.Millisecond) + time.Sleep(time.Duration(i)*TEST_DELAY + time.Duration(r)*time.Millisecond) rNumBlocks := mrand.Intn(5) flip, err := flipCoin(0.7) require.NoError(t, err) @@ -169,7 +169,7 @@ func TestFailedEquivocation(t *testing.T) { go func(i int, proof []byte) { defer wg.Done() // The equivocated block arrives late - time.Sleep(2 * TEST_DELAY * time.Second) + time.Sleep(2 * TEST_DELAY) // Use the same proof and the same epoch blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i))) cb.RcvBlock(ctx, blk) From 0209052821620a100099cae81eea303a771ddb0d Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Fri, 16 Sep 2022 13:09:27 +0200 Subject: [PATCH 06/27] minor changes --- chain/sub/incoming.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 6e7cf75ff..02358068d 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -119,7 +119,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p } // Garbage collect the broadcast state cb.GarbageCollect(blk.Header.Height) - log.Infof("Block in height %v delivered successfully", blk.Header.Height) + log.Infof("Block in height %v delivered successfully (cid=)", blk.Header.Height, blk.Cid()) if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, From 749d13359f330d0c5f62db9315de314d428dc1c2 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Mon, 19 Sep 2022 09:11:30 +0200 Subject: [PATCH 07/27] added testground test case for epoch boundary attack --- .../_compositions/epoch_boundary.toml | 71 +++++++++++++++++++ testplans/lotus-soup/epoch_boundary.go | 43 +++++++++++ testplans/lotus-soup/manifest.toml | 38 +++++++++- 3 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 testplans/lotus-soup/_compositions/epoch_boundary.toml create mode 100644 testplans/lotus-soup/epoch_boundary.go diff --git a/testplans/lotus-soup/_compositions/epoch_boundary.toml b/testplans/lotus-soup/_compositions/epoch_boundary.toml new file mode 100644 index 000000000..70bdb795d --- /dev/null +++ b/testplans/lotus-soup/_compositions/epoch_boundary.toml @@ -0,0 +1,71 @@ +[metadata] + name = "lotus-soup" + author = "" + +[global] + plan = "lotus-soup" + case = "epoch-boundary" + total_instances = 4 + builder = "exec:go" + runner = "local:exec" + +[global.build] + selectors = ["testground"] + +[global.run_config] + exposed_ports = { pprof = "6060", node_rpc = "1234", miner_rpc = "2345" } + +[global.build_config] + enable_go_build_cache = true + +[global.run.test_params] + clients = "0" + miners = "3" + genesis_timestamp_offset = "0" + balance = "20000000.5" # These balances will work for maximum 100 nodes, as TotalFilecoin is 2B + sectors = "10" + random_beacon_type = "mock" + mining_mode = "natural" + +[[groups]] + id = "bootstrapper" + [groups.instances] + count = 1 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "bootstrapper" + +[[groups]] + id = "miners" + [groups.instances] + count = 2 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "miner" + +[[groups]] + id = "attacker" + [groups.build] + + # TODO: Configure different versions for the different nodes. + # [[groups.build.dependencies]] + # module = "github.com/filecoin-project/lotus" + # version = "master" + + [groups.instances] + count = 1 + percentage = 0.0 + [groups.run] + [groups.run.test_params] + role = "miner" + +# [[groups]] +# id = "clients" +# [groups.instances] +# count = 0 +# percentage = 0.0 +# [groups.run] +# [groups.run.test_params] +# role = "client" diff --git a/testplans/lotus-soup/epoch_boundary.go b/testplans/lotus-soup/epoch_boundary.go new file mode 100644 index 000000000..9f30627ab --- /dev/null +++ b/testplans/lotus-soup/epoch_boundary.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "time" + + "github.com/filecoin-project/lotus/testplans/lotus-soup/testkit" +) + +// This test runs a set of miners and let them mine for some time. +// Each miner tracks the different blocks they are mining so we can +// process a posteriori the different chains they are mining. +// TODO: Include the attacker. +func epochBoundary(t *testkit.TestEnvironment) error { + t.RecordMessage("running node with role '%s'", t.Role) + + ctx := context.Background() + // Dispatch/forward non-client roles to defaults. + if t.Role != "miner" { + return testkit.HandleDefaultRole(t) + } + m, err := testkit.PrepareMiner(t) + if err != nil { + return err + } + go func() { + miner := m.FullApi + ch, _ := miner.ChainNotify(ctx) + for { + curr := <-ch + // We collect new blocks seen by the node along with its cid. + // We can process the results a posteriori to determine the number of equivocations. + t.RecordMessage("New Block: height=%v, cid=%v", curr[0].Val.Height(), curr[0].Val.Cids()) + } + }() + err = m.RunDefault() + if err != nil { + return err + } + time.Sleep(120 * time.Second) + t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount) + return nil +} diff --git a/testplans/lotus-soup/manifest.toml b/testplans/lotus-soup/manifest.toml index 9f5a57444..3162b5372 100644 --- a/testplans/lotus-soup/manifest.toml +++ b/testplans/lotus-soup/manifest.toml @@ -9,8 +9,8 @@ enabled = true [builders."docker:go"] enabled = true -build_base_image = "iptestground/oni-buildbase:v15-lotus" -runtime_image = "iptestground/oni-runtime:v10-debug" +# build_base_image = "iptestground/oni-buildbase:v15-lotus" +# runtime_image = "iptestground/oni-runtime:v10-debug" [runners."local:exec"] enabled = true @@ -61,6 +61,40 @@ instances = { min = 1, max = 100, default = 5 } # Bounce connection during push and pull data transfers bounce_conn_data_transfers = { type = "bool", default = false } +[[testcases]] +name = "epoch-boundary" +instances = { min = 1, max = 100, default = 5 } + + [testcases.params] + clients = { type = "int", default = 1 } + miners = { type = "int", default = 1 } + attackers = { type = "int", default = 1 } + balance = { type = "float", default = 1 } + sectors = { type = "int", default = 1 } + role = { type = "string" } + + genesis_timestamp_offset = { type = "int", default = 0 } + + random_beacon_type = { type = "enum", default = "mock", options = ["mock", "local-drand", "external-drand"] } + + # Params relevant to drand nodes. drand nodes should have role="drand", and must all be + # in the same composition group. There must be at least threshold drand nodes. + # To get lotus nodes to actually use the drand nodes, you must set random_beacon_type="local-drand" + # for the lotus node groups. + drand_period = { type = "duration", default="10s" } + drand_threshold = { type = "int", default = 2 } + drand_gossip_relay = { type = "bool", default = true } + drand_log_level = { type = "string", default="info" } + + # Params relevant to pubsub tracing + enable_pubsub_tracer = { type = "bool", default = false } + mining_mode = { type = "enum", default = "synchronized", options = ["synchronized", "natural"] } + + # Fast retrieval + fast_retrieval = { type = "bool", default = false } + + # Bounce connection during push and pull data transfers + bounce_conn_data_transfers = { type = "bool", default = false } [[testcases]] name = "drand-halting" From 8f48631fc48773821fd853b6d9612380b9d74cb4 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Mon, 19 Sep 2022 09:12:13 +0200 Subject: [PATCH 08/27] index new testcase --- testplans/lotus-soup/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testplans/lotus-soup/main.go b/testplans/lotus-soup/main.go index b1d17f018..406dffdd2 100644 --- a/testplans/lotus-soup/main.go +++ b/testplans/lotus-soup/main.go @@ -15,6 +15,7 @@ var cases = map[string]interface{}{ "drand-halting": testkit.WrapTestEnvironment(dealsE2E), "drand-outage": testkit.WrapTestEnvironment(dealsE2E), "paych-stress": testkit.WrapTestEnvironment(paych.Stress), + "epoch-boundary": testkit.WrapTestEnvironment(epochBoundary), } func main() { From ebafb6584eb57a4651a0b7b1db3a2c4e9029f229 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Thu, 22 Sep 2022 19:46:25 +0200 Subject: [PATCH 09/27] minor fixes. sync mining. --- .../_compositions/epoch_boundary.toml | 11 ++++++----- testplans/lotus-soup/epoch_boundary.go | 17 +++++++++++++---- testplans/lotus-soup/manifest.toml | 4 ++-- testplans/lotus-soup/testkit/role_miner.go | 1 + 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/testplans/lotus-soup/_compositions/epoch_boundary.toml b/testplans/lotus-soup/_compositions/epoch_boundary.toml index 70bdb795d..20709589f 100644 --- a/testplans/lotus-soup/_compositions/epoch_boundary.toml +++ b/testplans/lotus-soup/_compositions/epoch_boundary.toml @@ -25,7 +25,7 @@ balance = "20000000.5" # These balances will work for maximum 100 nodes, as TotalFilecoin is 2B sectors = "10" random_beacon_type = "mock" - mining_mode = "natural" + mining_mode = "synchronized" [[groups]] id = "bootstrapper" @@ -49,10 +49,11 @@ id = "attacker" [groups.build] - # TODO: Configure different versions for the different nodes. - # [[groups.build.dependencies]] - # module = "github.com/filecoin-project/lotus" - # version = "master" + # Implementation of the attacker. + [[groups.build.dependencies]] + module = "github.com/filecoin-project/lotus" + module = "github.com/adlrocha/lotus" + version = "b9ca4b71bf613ef9a1f21f302238c0d1fc55e65d" [groups.instances] count = 1 diff --git a/testplans/lotus-soup/epoch_boundary.go b/testplans/lotus-soup/epoch_boundary.go index 9f30627ab..9662676c5 100644 --- a/testplans/lotus-soup/epoch_boundary.go +++ b/testplans/lotus-soup/epoch_boundary.go @@ -10,7 +10,6 @@ import ( // This test runs a set of miners and let them mine for some time. // Each miner tracks the different blocks they are mining so we can // process a posteriori the different chains they are mining. -// TODO: Include the attacker. func epochBoundary(t *testkit.TestEnvironment) error { t.RecordMessage("running node with role '%s'", t.Role) @@ -19,6 +18,7 @@ func epochBoundary(t *testkit.TestEnvironment) error { if t.Role != "miner" { return testkit.HandleDefaultRole(t) } + // prepare miners to run. m, err := testkit.PrepareMiner(t) if err != nil { return err @@ -28,15 +28,24 @@ func epochBoundary(t *testkit.TestEnvironment) error { ch, _ := miner.ChainNotify(ctx) for { curr := <-ch - // We collect new blocks seen by the node along with its cid. - // We can process the results a posteriori to determine the number of equivocations. - t.RecordMessage("New Block: height=%v, cid=%v", curr[0].Val.Height(), curr[0].Val.Cids()) + for _, c := range curr { + if c.Type != "apply" { + continue + } + // We collect new blocks seen by the node along with its cid. + // We can process the results a posteriori to determine the number of equivocations. + ts := c.Val + t.RecordMessage("New Block: height=%v, cid=%v", ts.Height(), ts.Cids()) + } } }() err = m.RunDefault() if err != nil { return err } + + // time to mine in the experiment. + // TODO: Make this configurable and optionally make it a number of epochs. time.Sleep(120 * time.Second) t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount) return nil diff --git a/testplans/lotus-soup/manifest.toml b/testplans/lotus-soup/manifest.toml index 3162b5372..56d20aefd 100644 --- a/testplans/lotus-soup/manifest.toml +++ b/testplans/lotus-soup/manifest.toml @@ -9,8 +9,8 @@ enabled = true [builders."docker:go"] enabled = true -# build_base_image = "iptestground/oni-buildbase:v15-lotus" -# runtime_image = "iptestground/oni-runtime:v10-debug" +build_base_image = "iptestground/oni-buildbase:v15-lotus" +runtime_image = "iptestground/oni-runtime:v10-debug" [runners."local:exec"] enabled = true diff --git a/testplans/lotus-soup/testkit/role_miner.go b/testplans/lotus-soup/testkit/role_miner.go index 1a3319add..291b1f063 100644 --- a/testplans/lotus-soup/testkit/role_miner.go +++ b/testplans/lotus-soup/testkit/role_miner.go @@ -547,6 +547,7 @@ func (m *LotusMiner) RunDefault() error { defer t.RecordMessage("shutting down mining") defer close(done) + mine := true var i int for i = 0; mine; i++ { // synchronize all miners to mine the next block From 35f8abc72a8ab447771db0b7546ff59c9e4b19fb Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Fri, 23 Sep 2022 12:44:55 +0200 Subject: [PATCH 10/27] minor fix in composition --- testplans/lotus-soup/_compositions/epoch_boundary.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testplans/lotus-soup/_compositions/epoch_boundary.toml b/testplans/lotus-soup/_compositions/epoch_boundary.toml index 20709589f..65511d061 100644 --- a/testplans/lotus-soup/_compositions/epoch_boundary.toml +++ b/testplans/lotus-soup/_compositions/epoch_boundary.toml @@ -52,7 +52,7 @@ # Implementation of the attacker. [[groups.build.dependencies]] module = "github.com/filecoin-project/lotus" - module = "github.com/adlrocha/lotus" + target = "github.com/adlrocha/lotus" version = "b9ca4b71bf613ef9a1f21f302238c0d1fc55e65d" [groups.instances] From bf6541857ea83de3c3b9b2029215f41b52353ff8 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Mon, 26 Sep 2022 10:57:56 +0200 Subject: [PATCH 11/27] sync mining and gossipsub attacker depdendency --- testplans/lotus-soup/_compositions/epoch_boundary.toml | 6 ++++++ testplans/lotus-soup/testkit/role_miner.go | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/testplans/lotus-soup/_compositions/epoch_boundary.toml b/testplans/lotus-soup/_compositions/epoch_boundary.toml index 65511d061..2bcf9449c 100644 --- a/testplans/lotus-soup/_compositions/epoch_boundary.toml +++ b/testplans/lotus-soup/_compositions/epoch_boundary.toml @@ -55,6 +55,12 @@ target = "github.com/adlrocha/lotus" version = "b9ca4b71bf613ef9a1f21f302238c0d1fc55e65d" + # Modification of gossipsub + [[groups.build.dependencies]] + module = "github.com/libp2p/go-libp2p-pubsub" + target = "github.com/adlrocha/go-libp2p-pubsub" + version = "781f60ae88e4bcb79226232832e0a9b5a08dd3ed" + [groups.instances] count = 1 percentage = 0.0 diff --git a/testplans/lotus-soup/testkit/role_miner.go b/testplans/lotus-soup/testkit/role_miner.go index 291b1f063..3f67d056b 100644 --- a/testplans/lotus-soup/testkit/role_miner.go +++ b/testplans/lotus-soup/testkit/role_miner.go @@ -547,7 +547,7 @@ func (m *LotusMiner) RunDefault() error { defer t.RecordMessage("shutting down mining") defer close(done) - mine := true + mine = true var i int for i = 0; mine; i++ { // synchronize all miners to mine the next block From 91bd679d1e3e97fab1a63799c69969c569e55067 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Dec 2022 11:13:07 +0100 Subject: [PATCH 12/27] consistent broadcast delay as build param --- build/params_2k.go | 5 ++ build/params_mainnet.go | 5 ++ chain/sub/bcast/consistent.go | 19 +++-- chain/sub/bcast/consistent_test.go | 2 +- chain/sub/incoming.go | 2 +- .../_compositions/epoch_boundary.toml | 78 ------------------- testplans/lotus-soup/epoch_boundary.go | 52 ------------- 7 files changed, 23 insertions(+), 140 deletions(-) delete mode 100644 testplans/lotus-soup/_compositions/epoch_boundary.toml delete mode 100644 testplans/lotus-soup/epoch_boundary.go diff --git a/build/params_2k.go b/build/params_2k.go index f822d701e..48948ea50 100644 --- a/build/params_2k.go +++ b/build/params_2k.go @@ -6,6 +6,7 @@ package build import ( "os" "strconv" + "time" "github.com/ipfs/go-cid" @@ -131,3 +132,7 @@ const InteractivePoRepConfidence = 6 const BootstrapPeerThreshold = 1 var WhitelistedBlock = cid.Undef + +// Reducing the delivery delay for equivocation of +// consistent broadcast to just one second. +const CBDeliveryDelay = 1 * time.Second diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 296793131..93cdb66bd 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -7,6 +7,7 @@ import ( "math" "os" "strconv" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -126,3 +127,7 @@ const BootstrapPeerThreshold = 4 // we skip checks on message validity in this block to sidestep the zero-bls signature var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi") + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +const CBDeliveryDelay = 6 * time.Second diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 5b6079cf5..360476561 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -13,11 +13,13 @@ import ( "github.com/multiformats/go-multihash" ) -// TODO: Take const out of here and make them build params. const ( - DELAY = 6 * time.Second - GC_SANITY_CHECK = 5 - GC_LOOKBACK = 2 + // GcSanityCheck determines the number of epochs that in the past + // that will be garbage collected from the current epoch. + GcSanityCheck = 5 + // GcLookback determines the number of epochs kept in the consistent + // broadcast cache. + GcLookback = 2 ) type blksInfo struct { @@ -57,7 +59,8 @@ func newBcastDict() *bcastDict { return &bcastDict{new(sync.Map)} } -// TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation. +// TODO: the VRFProof may already be small enough so we may not need to use a hash here. +// we can maybe bypass the useless computation. func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { k := make([]byte, len(bh.Ticket.VRFProof)) copy(k, bh.Ticket.VRFProof) @@ -150,9 +153,9 @@ func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { // and we use the sanity-check in case there were a few rounds // without delivery, and the garbage collection wasn't triggered // for a few epochs. - for i := 0; i < GC_SANITY_CHECK; i++ { - if currEpoch > GC_LOOKBACK { - delete(cb.m, currEpoch-abi.ChainEpoch(GC_LOOKBACK+i)) + for i := 0; i < GcSanityCheck; i++ { + if currEpoch > GcLookback { + delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i)) } } } diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go index 5dc2b198c..d06293876 100644 --- a/chain/sub/bcast/consistent_test.go +++ b/chain/sub/bcast/consistent_test.go @@ -81,7 +81,7 @@ func TestSeveralEpochs(t *testing.T) { }(i) } wg.Wait() - require.Equal(t, cb.Len(), bcast.GC_LOOKBACK) + require.Equal(t, cb.Len(), bcast.GcLookback) } // bias is expected to be 0-1 diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 02358068d..cee5a75a6 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -48,7 +48,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p // Timeout after (block time + propagation delay). This is useless at // this point. timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second - cb := bcast.NewConsistentBCast(bcast.DELAY) + cb := bcast.NewConsistentBCast(build.CBDeliveryDelay) for { msg, err := bsub.Next(ctx) diff --git a/testplans/lotus-soup/_compositions/epoch_boundary.toml b/testplans/lotus-soup/_compositions/epoch_boundary.toml deleted file mode 100644 index 2bcf9449c..000000000 --- a/testplans/lotus-soup/_compositions/epoch_boundary.toml +++ /dev/null @@ -1,78 +0,0 @@ -[metadata] - name = "lotus-soup" - author = "" - -[global] - plan = "lotus-soup" - case = "epoch-boundary" - total_instances = 4 - builder = "exec:go" - runner = "local:exec" - -[global.build] - selectors = ["testground"] - -[global.run_config] - exposed_ports = { pprof = "6060", node_rpc = "1234", miner_rpc = "2345" } - -[global.build_config] - enable_go_build_cache = true - -[global.run.test_params] - clients = "0" - miners = "3" - genesis_timestamp_offset = "0" - balance = "20000000.5" # These balances will work for maximum 100 nodes, as TotalFilecoin is 2B - sectors = "10" - random_beacon_type = "mock" - mining_mode = "synchronized" - -[[groups]] - id = "bootstrapper" - [groups.instances] - count = 1 - percentage = 0.0 - [groups.run] - [groups.run.test_params] - role = "bootstrapper" - -[[groups]] - id = "miners" - [groups.instances] - count = 2 - percentage = 0.0 - [groups.run] - [groups.run.test_params] - role = "miner" - -[[groups]] - id = "attacker" - [groups.build] - - # Implementation of the attacker. - [[groups.build.dependencies]] - module = "github.com/filecoin-project/lotus" - target = "github.com/adlrocha/lotus" - version = "b9ca4b71bf613ef9a1f21f302238c0d1fc55e65d" - - # Modification of gossipsub - [[groups.build.dependencies]] - module = "github.com/libp2p/go-libp2p-pubsub" - target = "github.com/adlrocha/go-libp2p-pubsub" - version = "781f60ae88e4bcb79226232832e0a9b5a08dd3ed" - - [groups.instances] - count = 1 - percentage = 0.0 - [groups.run] - [groups.run.test_params] - role = "miner" - -# [[groups]] -# id = "clients" -# [groups.instances] -# count = 0 -# percentage = 0.0 -# [groups.run] -# [groups.run.test_params] -# role = "client" diff --git a/testplans/lotus-soup/epoch_boundary.go b/testplans/lotus-soup/epoch_boundary.go deleted file mode 100644 index 9662676c5..000000000 --- a/testplans/lotus-soup/epoch_boundary.go +++ /dev/null @@ -1,52 +0,0 @@ -package main - -import ( - "context" - "time" - - "github.com/filecoin-project/lotus/testplans/lotus-soup/testkit" -) - -// This test runs a set of miners and let them mine for some time. -// Each miner tracks the different blocks they are mining so we can -// process a posteriori the different chains they are mining. -func epochBoundary(t *testkit.TestEnvironment) error { - t.RecordMessage("running node with role '%s'", t.Role) - - ctx := context.Background() - // Dispatch/forward non-client roles to defaults. - if t.Role != "miner" { - return testkit.HandleDefaultRole(t) - } - // prepare miners to run. - m, err := testkit.PrepareMiner(t) - if err != nil { - return err - } - go func() { - miner := m.FullApi - ch, _ := miner.ChainNotify(ctx) - for { - curr := <-ch - for _, c := range curr { - if c.Type != "apply" { - continue - } - // We collect new blocks seen by the node along with its cid. - // We can process the results a posteriori to determine the number of equivocations. - ts := c.Val - t.RecordMessage("New Block: height=%v, cid=%v", ts.Height(), ts.Cids()) - } - } - }() - err = m.RunDefault() - if err != nil { - return err - } - - // time to mine in the experiment. - // TODO: Make this configurable and optionally make it a number of epochs. - time.Sleep(120 * time.Second) - t.SyncClient.MustSignalAndWait(ctx, testkit.StateDone, t.TestInstanceCount) - return nil -} From f2cc452d4cb1b2f5f2f5c89ced93c222a6a530a4 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Dec 2022 12:13:20 +0100 Subject: [PATCH 13/27] remove error from rcvBlock. type/docs gen --- build/params_2k.go | 4 ++-- chain/sub/bcast/consistent.go | 25 ++++++++++++++++--------- chain/sub/bcast/consistent_test.go | 10 ++++++---- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/build/params_2k.go b/build/params_2k.go index 48948ea50..390357bf2 100644 --- a/build/params_2k.go +++ b/build/params_2k.go @@ -134,5 +134,5 @@ const BootstrapPeerThreshold = 1 var WhitelistedBlock = cid.Undef // Reducing the delivery delay for equivocation of -// consistent broadcast to just one second. -const CBDeliveryDelay = 1 * time.Second +// consistent broadcast to just half a second. +const CBDeliveryDelay = 500 * time.Milisecond diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 360476561..b69845476 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -7,12 +7,17 @@ import ( "sync" "time" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multihash" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" ) +var log = logging.Logger("sub-cb") + const ( // GcSanityCheck determines the number of epochs that in the past // that will be garbage collected from the current epoch. @@ -86,14 +91,14 @@ func cidExists(cids []cid.Cid, c cid.Cid) bool { func (bInfo *blksInfo) eqErr() error { bInfo.cancel() - return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen") + return fmt.Errorf("different blocks with the same ticket already seen") } func (cb *ConsistentBCast) Len() int { return len(cb.m) } -func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error { +func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { cb.lk.Lock() bcastDict, ok := cb.m[blk.Header.Height] if !ok { @@ -103,26 +108,28 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) er cb.lk.Unlock() key, err := BCastKey(blk.Header) if err != nil { - return err + log.Errorf("couldn't hash blk info for height %d: %s", blk.Header.Height, err) + return } blkCid := blk.Cid() bInfo, ok := bcastDict.load(key) if ok { if len(bInfo.blks) > 1 { - return bInfo.eqErr() + log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) + return } if !cidExists(bInfo.blks, blkCid) { bInfo.blks = append(bInfo.blks, blkCid) - return bInfo.eqErr() + log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) + return } - return nil + return } ctx, cancel := context.WithTimeout(ctx, cb.delay) bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) - return nil } func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go index d06293876..ca84ab4b1 100644 --- a/chain/sub/bcast/consistent_test.go +++ b/chain/sub/bcast/consistent_test.go @@ -10,13 +10,15 @@ import ( "testing" "time" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/chain/sub/bcast" - "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/sub/bcast" + "github.com/filecoin-project/lotus/chain/types" ) const TEST_DELAY = 1 * time.Second From 939e515d23a2d40b24732fe40a7e3f0a17998636 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Dec 2022 12:45:01 +0100 Subject: [PATCH 14/27] fix race in cb cache --- chain/sub/bcast/consistent.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index b69845476..cc600cb10 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -53,8 +53,16 @@ func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) { bd.m.Store(key.String(), d) } +func (bd *bcastDict) blkLen(key multihash.Multihash) int { + v, ok := bd.m.Load(key.String()) + if !ok { + return 0 + } + return len(v.(*blksInfo).blks) +} + type ConsistentBCast struct { - lk sync.Mutex + lk sync.RWMutex delay time.Duration // FIXME: Make this a slice??? Less storage but needs indexing logic. m map[abi.ChainEpoch]*bcastDict @@ -95,6 +103,8 @@ func (bInfo *blksInfo) eqErr() error { } func (cb *ConsistentBCast) Len() int { + cb.lk.RLock() + defer cb.lk.RUnlock() return len(cb.m) } @@ -121,7 +131,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { } if !cidExists(bInfo.blks, blkCid) { - bInfo.blks = append(bInfo.blks, blkCid) + bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)}) log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) return } @@ -133,7 +143,9 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { } func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { + cb.lk.RLock() bcastDict := cb.m[bh.Height] + cb.lk.RUnlock() key, err := BCastKey(bh) if err != nil { return err @@ -144,7 +156,7 @@ func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { } // Wait for the timeout <-bInfo.ctx.Done() - if len(bInfo.blks) > 1 { + if bcastDict.blkLen(key) > 1 { return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) } return nil From d574d04075a2e57885e5bb5f84be2e708d7dba46 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Dec 2022 13:03:42 +0100 Subject: [PATCH 15/27] set small cb delivery delay for paych itests --- build/params_mainnet.go | 2 +- itests/kit/init.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 93cdb66bd..1612f4ab9 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -130,4 +130,4 @@ var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjygu // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -const CBDeliveryDelay = 6 * time.Second +var CBDeliveryDelay = 6 * time.Second diff --git a/itests/kit/init.go b/itests/kit/init.go index 9397c53a2..c6545edda 100644 --- a/itests/kit/init.go +++ b/itests/kit/init.go @@ -3,6 +3,7 @@ package kit import ( "fmt" "os" + "time" logging "github.com/ipfs/go-log/v2" @@ -40,6 +41,12 @@ func init() { build.InsecurePoStValidation = true + // NOTE: If we want the payment channel itests to pass that use a + // block time of 5*millisecond, we need to set the consistent broadcast + // delay to something lower than that block time. + // todo: configure such a low delay only for paych tests. + build.CBDeliveryDelay = 2 * time.Millisecond + if err := os.Setenv("BELLMAN_NO_GPU", "1"); err != nil { panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err)) } From 3988cc9b2cfbb903336433df3321e2a6c0245741 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 13 Dec 2022 14:50:02 +0100 Subject: [PATCH 16/27] disabling cb delivery delay for sync tests --- chain/sync_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/chain/sync_test.go b/chain/sync_test.go index 18520a07f..f4f553df3 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -45,6 +45,11 @@ func init() { policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1) policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048)) policy.SetMinVerifiedDealSize(abi.NewStoragePower(256)) + + // these tests assume really fast block times. disabling + // the consistent broadcast delay to avoid them from adding + // an unnecessary overhead. + build.CBDeliveryDelay = 2 * time.Millisecond } const source = 0 From c11ffa58a8283778ee51383bcafe46995b4f2060 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Fri, 10 Mar 2023 09:27:30 +0100 Subject: [PATCH 17/27] address review --- chain/sub/incoming.go | 4 ++-- itests/kit/init.go | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index cee5a75a6..6226e45d8 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -111,7 +111,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p // If we are the block proposers we don't need to wait for delivery, we know the blocks are // honest. if src != self { - log.Infof("Waiting for consistent broadcast of block in height: %v", blk.Header.Height) + log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height) if err := cb.WaitForDelivery(blk.Header); err != nil { log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) return @@ -119,7 +119,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p } // Garbage collect the broadcast state cb.GarbageCollect(blk.Header.Height) - log.Infof("Block in height %v delivered successfully (cid=)", blk.Header.Height, blk.Cid()) + log.Debugf("Block in height %v delivered successfully (cid=)", blk.Header.Height, blk.Cid()) if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, diff --git a/itests/kit/init.go b/itests/kit/init.go index c6545edda..dbcb49aae 100644 --- a/itests/kit/init.go +++ b/itests/kit/init.go @@ -3,7 +3,6 @@ package kit import ( "fmt" "os" - "time" logging "github.com/ipfs/go-log/v2" @@ -41,11 +40,10 @@ func init() { build.InsecurePoStValidation = true - // NOTE: If we want the payment channel itests to pass that use a - // block time of 5*millisecond, we need to set the consistent broadcast - // delay to something lower than that block time. - // todo: configure such a low delay only for paych tests. - build.CBDeliveryDelay = 2 * time.Millisecond + // Disabling consistent broadcast in itests. Many tests use really fast + // block times, and adding this additional delay for block delivery + // would make these tests to fail. + build.CBDeliveryDelay = 0 if err := os.Setenv("BELLMAN_NO_GPU", "1"); err != nil { panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err)) From f59c246c7a539896b4e5dfc084399a2d353d0c79 Mon Sep 17 00:00:00 2001 From: adlrocha Date: Thu, 16 Mar 2023 16:09:45 +0100 Subject: [PATCH 18/27] Update chain/sub/bcast/consistent.go Co-authored-by: Aayush Rajasekaran --- chain/sub/bcast/consistent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index cc600cb10..22a0db160 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -19,7 +19,7 @@ import ( var log = logging.Logger("sub-cb") const ( - // GcSanityCheck determines the number of epochs that in the past + // GcSanityCheck determines the number of epochs in the past // that will be garbage collected from the current epoch. GcSanityCheck = 5 // GcLookback determines the number of epochs kept in the consistent From 8d260d7478a35ee4457b210d55a327074ff8b7a1 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Thu, 16 Mar 2023 17:03:25 +0100 Subject: [PATCH 19/27] address review --- chain/sub/bcast/consistent.go | 61 +++++++++++++++++------------------ chain/sub/incoming.go | 4 +-- 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 22a0db160..89708432c 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -2,14 +2,12 @@ package bcast import ( "context" - "encoding/binary" - "fmt" "sync" "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-multihash" + "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -24,7 +22,7 @@ const ( GcSanityCheck = 5 // GcLookback determines the number of epochs kept in the consistent // broadcast cache. - GcLookback = 2 + GcLookback = 1000 ) type blksInfo struct { @@ -41,20 +39,20 @@ type bcastDict struct { m *sync.Map } -func (bd *bcastDict) load(key multihash.Multihash) (*blksInfo, bool) { - v, ok := bd.m.Load(key.String()) +func (bd *bcastDict) load(key []byte) (*blksInfo, bool) { + v, ok := bd.m.Load(key) if !ok { return nil, ok } return v.(*blksInfo), ok } -func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) { - bd.m.Store(key.String(), d) +func (bd *bcastDict) store(key []byte, d *blksInfo) { + bd.m.Store(key, d) } -func (bd *bcastDict) blkLen(key multihash.Multihash) int { - v, ok := bd.m.Load(key.String()) +func (bd *bcastDict) blkLen(key []byte) int { + v, ok := bd.m.Load(key) if !ok { return 0 } @@ -64,21 +62,15 @@ func (bd *bcastDict) blkLen(key multihash.Multihash) int { type ConsistentBCast struct { lk sync.RWMutex delay time.Duration - // FIXME: Make this a slice??? Less storage but needs indexing logic. - m map[abi.ChainEpoch]*bcastDict + m map[abi.ChainEpoch]*bcastDict } func newBcastDict() *bcastDict { return &bcastDict{new(sync.Map)} } -// TODO: the VRFProof may already be small enough so we may not need to use a hash here. -// we can maybe bypass the useless computation. -func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) { - k := make([]byte, len(bh.Ticket.VRFProof)) - copy(k, bh.Ticket.VRFProof) - binary.PutVarint(k, int64(bh.Height)) - return multihash.Sum(k, multihash.SHA2_256, -1) +func BCastKey(bh *types.BlockHeader) []byte { + return bh.Ticket.VRFProof } func NewConsistentBCast(delay time.Duration) *ConsistentBCast { @@ -99,7 +91,7 @@ func cidExists(cids []cid.Cid, c cid.Cid) bool { func (bInfo *blksInfo) eqErr() error { bInfo.cancel() - return fmt.Errorf("different blocks with the same ticket already seen") + return xerrors.Errorf("different blocks with the same ticket already seen") } func (cb *ConsistentBCast) Len() int { @@ -108,6 +100,17 @@ func (cb *ConsistentBCast) Len() int { return len(cb.m) } +// RcvBlock is called every time a new block is received through the network. +// +// This function keeps track of all the blocks with a specific VRFProof received +// for the same height. Every time a new block with a VRFProof not seen at certain +// height is received, a new timer is triggered to wait for the delay time determined by +// the consistent broadcast before informing the syncer. During this time, if a new +// block with the same VRFProof for that height is received, it means a miner is +// trying to equivocate, and both blocks are discarded. +// +// The delay time should be set to a value high enough to allow any block sent for +// certain epoch to be propagated to a large amount of miners in the network. func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { cb.lk.Lock() bcastDict, ok := cb.m[blk.Header.Height] @@ -116,11 +119,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { cb.m[blk.Header.Height] = bcastDict } cb.lk.Unlock() - key, err := BCastKey(blk.Header) - if err != nil { - log.Errorf("couldn't hash blk info for height %d: %s", blk.Header.Height, err) - return - } + key := BCastKey(blk.Header) blkCid := blk.Cid() bInfo, ok := bcastDict.load(key) @@ -142,22 +141,22 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}}) } +// WaitForDelivery is called before informing the syncer about a new block +// to check if the consistent broadcast delay triggered or if the block should +// be held off for a bit more time. func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { cb.lk.RLock() bcastDict := cb.m[bh.Height] cb.lk.RUnlock() - key, err := BCastKey(bh) - if err != nil { - return err - } + key := BCastKey(bh) bInfo, ok := bcastDict.load(key) if !ok { - return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) + return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) } // Wait for the timeout <-bInfo.ctx.Done() if bcastDict.blkLen(key) > 1 { - return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) + return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height) } return nil } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 6226e45d8..6436cc27d 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -113,13 +113,13 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p if src != self { log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height) if err := cb.WaitForDelivery(blk.Header); err != nil { - log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src) + log.Errorf("not informing syncer about new block, potential equivocation detected (cid: %s, source: %s): %s; ", blk.Header.Cid(), src, err) return } } // Garbage collect the broadcast state cb.GarbageCollect(blk.Header.Height) - log.Debugf("Block in height %v delivered successfully (cid=)", blk.Header.Height, blk.Cid()) + log.Debugf("Block in height %v delivered successfully (cid=%s)", blk.Header.Height, blk.Cid()) if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, From 90c2f9dbe234426eec8045cea66048e50248215d Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Thu, 16 Mar 2023 17:17:59 +0100 Subject: [PATCH 20/27] minor fix --- chain/sub/bcast/consistent.go | 6 +++--- chain/sub/bcast/consistent_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 89708432c..165476ffb 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -40,7 +40,7 @@ type bcastDict struct { } func (bd *bcastDict) load(key []byte) (*blksInfo, bool) { - v, ok := bd.m.Load(key) + v, ok := bd.m.Load(string(key)) if !ok { return nil, ok } @@ -48,11 +48,11 @@ func (bd *bcastDict) load(key []byte) (*blksInfo, bool) { } func (bd *bcastDict) store(key []byte, d *blksInfo) { - bd.m.Store(key, d) + bd.m.Store(string(key), d) } func (bd *bcastDict) blkLen(key []byte) int { - v, ok := bd.m.Load(key) + v, ok := bd.m.Load(string(key)) if !ok { return 0 } diff --git a/chain/sub/bcast/consistent_test.go b/chain/sub/bcast/consistent_test.go index ca84ab4b1..8beb0574f 100644 --- a/chain/sub/bcast/consistent_test.go +++ b/chain/sub/bcast/consistent_test.go @@ -61,7 +61,7 @@ func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.Chain func TestSeveralEpochs(t *testing.T) { cb := bcast.NewConsistentBCast(TEST_DELAY) - numEpochs := 5 + numEpochs := 6 wg := new(sync.WaitGroup) wg.Add(numEpochs) for i := 0; i < numEpochs; i++ { @@ -83,7 +83,7 @@ func TestSeveralEpochs(t *testing.T) { }(i) } wg.Wait() - require.Equal(t, cb.Len(), bcast.GcLookback) + require.Equal(t, cb.Len(), numEpochs) } // bias is expected to be 0-1 From fa7e1ef78eb505dd01ba94d6e3987e13a1f1f119 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Mon, 20 Mar 2023 18:10:34 +0100 Subject: [PATCH 21/27] set CB delay to 2 secs --- build/params_mainnet.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 1612f4ab9..e2e2b8c8e 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -130,4 +130,4 @@ var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjygu // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -var CBDeliveryDelay = 6 * time.Second +var CBDeliveryDelay = 2 * time.Second From 92f6d3e468ed9a77092442e71203e1850fa5320f Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 28 Mar 2023 10:01:43 +0200 Subject: [PATCH 22/27] global locking strategy for blockInfo map --- chain/sub/bcast/consistent.go | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 165476ffb..93621d513 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -32,33 +32,27 @@ type blksInfo struct { } type bcastDict struct { - // thread-safe map impl for the dictionary - // sync.Map accepts `any` as keys and values. - // To make it type safe and only support the right - // types we use this auxiliary type. - m *sync.Map + m map[string]*blksInfo } func (bd *bcastDict) load(key []byte) (*blksInfo, bool) { - v, ok := bd.m.Load(string(key)) + v, ok := bd.m[string(key)] if !ok { return nil, ok } - return v.(*blksInfo), ok -} - -func (bd *bcastDict) store(key []byte, d *blksInfo) { - bd.m.Store(string(key), d) + return v, ok } func (bd *bcastDict) blkLen(key []byte) int { - v, ok := bd.m.Load(string(key)) - if !ok { - return 0 - } - return len(v.(*blksInfo).blks) + return len(bd.m[string(key)].blks) } +func (bd *bcastDict) store(key []byte, d *blksInfo) { + bd.m[string(key)] = d +} + +// ConsistentBCast tracks recent information about the +// blocks and tickets received at different epochs type ConsistentBCast struct { lk sync.RWMutex delay time.Duration @@ -66,7 +60,7 @@ type ConsistentBCast struct { } func newBcastDict() *bcastDict { - return &bcastDict{new(sync.Map)} + return &bcastDict{m: make(map[string]*blksInfo)} } func BCastKey(bh *types.BlockHeader) []byte { @@ -113,12 +107,13 @@ func (cb *ConsistentBCast) Len() int { // certain epoch to be propagated to a large amount of miners in the network. func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { cb.lk.Lock() + defer cb.lk.Unlock() bcastDict, ok := cb.m[blk.Header.Height] if !ok { bcastDict = newBcastDict() cb.m[blk.Header.Height] = bcastDict } - cb.lk.Unlock() + key := BCastKey(blk.Header) blkCid := blk.Cid() @@ -147,12 +142,13 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { cb.lk.RLock() bcastDict := cb.m[bh.Height] - cb.lk.RUnlock() key := BCastKey(bh) bInfo, ok := bcastDict.load(key) + cb.lk.RUnlock() if !ok { return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key) } + // Wait for the timeout <-bInfo.ctx.Done() if bcastDict.blkLen(key) > 1 { From df82a8240ebc1ba15a15e5cd65dfacb1a36e72c7 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 28 Mar 2023 10:33:26 +0200 Subject: [PATCH 23/27] add comments --- chain/sub/bcast/consistent.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 93621d513..75209431d 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -126,6 +126,9 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) { if !cidExists(bInfo.blks, blkCid) { bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)}) + // By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for + // the epoch-ticket combination making them to fail and not be sent to the syncer, as + // a potential equivocation is detected. log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr()) return } From 1a771e4310062ee47b7202ba6fb3fe93747ab061 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 28 Mar 2023 16:52:32 +0200 Subject: [PATCH 24/27] include a deeper gc round --- chain/sub/bcast/consistent.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index 75209431d..c8c91e71a 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -19,10 +19,16 @@ var log = logging.Logger("sub-cb") const ( // GcSanityCheck determines the number of epochs in the past // that will be garbage collected from the current epoch. - GcSanityCheck = 5 + GcSanityCheck = 100 // GcLookback determines the number of epochs kept in the consistent // broadcast cache. - GcLookback = 1000 + GcLookback = 5 + // GcDeepCheck determines the number of epochs in the past that we + // we try cleaning in the deep garbage collection round. + GcDeepCheck = 2880 // (24h*60m*60s)/30s per block + // GcDeepInterval determines after the number of epochs for which + // we are going to start a deeper garbage collection round. + GcDeepInterval = 1000 ) type blksInfo struct { @@ -54,9 +60,10 @@ func (bd *bcastDict) store(key []byte, d *blksInfo) { // ConsistentBCast tracks recent information about the // blocks and tickets received at different epochs type ConsistentBCast struct { - lk sync.RWMutex - delay time.Duration - m map[abi.ChainEpoch]*bcastDict + lk sync.RWMutex + delay time.Duration + m map[abi.ChainEpoch]*bcastDict + lastDeepGc abi.ChainEpoch } func newBcastDict() *bcastDict { @@ -160,17 +167,29 @@ func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error { return nil } +// GarbageCollect cleans the consistent broadcast cache periodically. +// +// A light garbage collection is triggered before every block delivery +// while a deeper one is triggered once every GcDeepCheck to ensure +// that nothing was left behind. func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) { cb.lk.Lock() defer cb.lk.Unlock() - // keep currEpoch-2 and delete a few more in the past + // perform a deeper sanity check every now and then + gcRange := GcSanityCheck + if cb.lastDeepGc+GcDeepInterval > currEpoch { + gcRange = GcDeepCheck + cb.lastDeepGc = currEpoch + } + + // keep currEpoch-gcRange and delete a few more in the past // as a sanity-check // Garbage collection is triggered before block delivery, // and we use the sanity-check in case there were a few rounds // without delivery, and the garbage collection wasn't triggered // for a few epochs. - for i := 0; i < GcSanityCheck; i++ { + for i := 0; i < gcRange; i++ { if currEpoch > GcLookback { delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i)) } From f24fc836b3cec75eb674590f5e6457c770f2af0d Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Tue, 28 Mar 2023 18:22:28 +0200 Subject: [PATCH 25/27] add CB param to all testnet builds --- build/params_butterfly.go | 6 ++++++ build/params_calibnet.go | 5 +++++ build/params_interop.go | 5 +++++ build/params_mainnet.go | 2 +- 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/build/params_butterfly.go b/build/params_butterfly.go index b00381b44..137ab7dee 100644 --- a/build/params_butterfly.go +++ b/build/params_butterfly.go @@ -4,6 +4,8 @@ package build import ( + "time" + "github.com/ipfs/go-cid" "github.com/filecoin-project/go-address" @@ -87,3 +89,7 @@ const BootstrapPeerThreshold = 2 const Eip155ChainId = 3141592 var WhitelistedBlock = cid.Undef + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +const CBDeliveryDelay = 2 * time.Second diff --git a/build/params_calibnet.go b/build/params_calibnet.go index 32923f7a8..7bfca2a42 100644 --- a/build/params_calibnet.go +++ b/build/params_calibnet.go @@ -6,6 +6,7 @@ package build import ( "os" "strconv" + "time" "github.com/ipfs/go-cid" @@ -122,3 +123,7 @@ const BootstrapPeerThreshold = 4 const Eip155ChainId = 314159 var WhitelistedBlock = cid.Undef + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +const CBDeliveryDelay = 2 * time.Second diff --git a/build/params_interop.go b/build/params_interop.go index 4d94de049..0fb865248 100644 --- a/build/params_interop.go +++ b/build/params_interop.go @@ -6,6 +6,7 @@ package build import ( "os" "strconv" + "time" "github.com/ipfs/go-cid" @@ -128,3 +129,7 @@ const BootstrapPeerThreshold = 2 const Eip155ChainId = 3141592 var WhitelistedBlock = cid.Undef + +// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. +// This determines the wait time for the detection of potential equivocations. +const CBDeliveryDelay = 2 * time.Second diff --git a/build/params_mainnet.go b/build/params_mainnet.go index 5fe4a6202..bb205a827 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -141,4 +141,4 @@ var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjygu // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -var CBDeliveryDelay = 2 * time.Second +const CBDeliveryDelay = 2 * time.Second From 103d786c720afe125b1063c552cd5533d1c39919 Mon Sep 17 00:00:00 2001 From: Alfonso de la Rocha Date: Wed, 29 Mar 2023 17:43:10 +0200 Subject: [PATCH 26/27] return CBDeliveryDelay into a var --- build/params_2k.go | 2 +- build/params_butterfly.go | 3 ++- build/params_calibnet.go | 3 ++- build/params_interop.go | 3 ++- build/params_mainnet.go | 3 ++- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/build/params_2k.go b/build/params_2k.go index fb8b1beea..8220ce8aa 100644 --- a/build/params_2k.go +++ b/build/params_2k.go @@ -143,4 +143,4 @@ var WhitelistedBlock = cid.Undef // Reducing the delivery delay for equivocation of // consistent broadcast to just half a second. -const CBDeliveryDelay = 500 * time.Milisecond +var CBDeliveryDelay = 500 * time.Milisecond diff --git a/build/params_butterfly.go b/build/params_butterfly.go index 137ab7dee..4fdac1ec8 100644 --- a/build/params_butterfly.go +++ b/build/params_butterfly.go @@ -92,4 +92,5 @@ var WhitelistedBlock = cid.Undef // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -const CBDeliveryDelay = 2 * time.Second +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/build/params_calibnet.go b/build/params_calibnet.go index 7bfca2a42..35ae1796c 100644 --- a/build/params_calibnet.go +++ b/build/params_calibnet.go @@ -126,4 +126,5 @@ var WhitelistedBlock = cid.Undef // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -const CBDeliveryDelay = 2 * time.Second +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/build/params_interop.go b/build/params_interop.go index 0fb865248..72cfdca35 100644 --- a/build/params_interop.go +++ b/build/params_interop.go @@ -132,4 +132,5 @@ var WhitelistedBlock = cid.Undef // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -const CBDeliveryDelay = 2 * time.Second +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second diff --git a/build/params_mainnet.go b/build/params_mainnet.go index bb205a827..d4cf6ff4b 100644 --- a/build/params_mainnet.go +++ b/build/params_mainnet.go @@ -141,4 +141,5 @@ var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjygu // CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast. // This determines the wait time for the detection of potential equivocations. -const CBDeliveryDelay = 2 * time.Second +// It is a variable instead of a constant so it can be conveniently configured in tests +var CBDeliveryDelay = 2 * time.Second From 682ddf6ffa7ffd430c9e843a50428d46790f2daa Mon Sep 17 00:00:00 2001 From: adlrocha Date: Thu, 30 Mar 2023 12:38:00 +0200 Subject: [PATCH 27/27] Update chain/sub/bcast/consistent.go Co-authored-by: Aayush Rajasekaran --- chain/sub/bcast/consistent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sub/bcast/consistent.go b/chain/sub/bcast/consistent.go index c8c91e71a..58e8bc98f 100644 --- a/chain/sub/bcast/consistent.go +++ b/chain/sub/bcast/consistent.go @@ -25,7 +25,7 @@ const ( GcLookback = 5 // GcDeepCheck determines the number of epochs in the past that we // we try cleaning in the deep garbage collection round. - GcDeepCheck = 2880 // (24h*60m*60s)/30s per block + GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch // GcDeepInterval determines after the number of epochs for which // we are going to start a deeper garbage collection round. GcDeepInterval = 1000