Merge pull request #10777 from filecoin-project/steb/revert-cb
Revert #9858 (consistent broadcast changes)
This commit is contained in:
commit
76f231618a
@ -6,7 +6,6 @@ package build
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
@ -146,7 +145,3 @@ const BootstrapPeerThreshold = 1
|
|||||||
const Eip155ChainId = 31415926
|
const Eip155ChainId = 31415926
|
||||||
|
|
||||||
var WhitelistedBlock = cid.Undef
|
var WhitelistedBlock = cid.Undef
|
||||||
|
|
||||||
// Reducing the delivery delay for equivocation of
|
|
||||||
// consistent broadcast to just half a second.
|
|
||||||
var CBDeliveryDelay = 500 * time.Millisecond
|
|
||||||
|
@ -4,8 +4,6 @@
|
|||||||
package build
|
package build
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -93,8 +91,3 @@ const BootstrapPeerThreshold = 2
|
|||||||
const Eip155ChainId = 3141592
|
const Eip155ChainId = 3141592
|
||||||
|
|
||||||
var WhitelistedBlock = cid.Undef
|
var WhitelistedBlock = cid.Undef
|
||||||
|
|
||||||
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
|
|
||||||
// This determines the wait time for the detection of potential equivocations.
|
|
||||||
// It is a variable instead of a constant so it can be conveniently configured in tests
|
|
||||||
var CBDeliveryDelay = 2 * time.Second
|
|
||||||
|
@ -6,7 +6,6 @@ package build
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
@ -129,8 +128,3 @@ const BootstrapPeerThreshold = 4
|
|||||||
const Eip155ChainId = 314159
|
const Eip155ChainId = 314159
|
||||||
|
|
||||||
var WhitelistedBlock = cid.Undef
|
var WhitelistedBlock = cid.Undef
|
||||||
|
|
||||||
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
|
|
||||||
// This determines the wait time for the detection of potential equivocations.
|
|
||||||
// It is a variable instead of a constant so it can be conveniently configured in tests
|
|
||||||
var CBDeliveryDelay = 2 * time.Second
|
|
||||||
|
@ -6,7 +6,6 @@ package build
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
@ -135,8 +134,3 @@ const BootstrapPeerThreshold = 2
|
|||||||
const Eip155ChainId = 3141592
|
const Eip155ChainId = 3141592
|
||||||
|
|
||||||
var WhitelistedBlock = cid.Undef
|
var WhitelistedBlock = cid.Undef
|
||||||
|
|
||||||
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
|
|
||||||
// This determines the wait time for the detection of potential equivocations.
|
|
||||||
// It is a variable instead of a constant so it can be conveniently configured in tests
|
|
||||||
var CBDeliveryDelay = 2 * time.Second
|
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
@ -148,8 +147,3 @@ const Eip155ChainId = 314
|
|||||||
|
|
||||||
// we skip checks on message validity in this block to sidestep the zero-bls signature
|
// we skip checks on message validity in this block to sidestep the zero-bls signature
|
||||||
var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi")
|
var WhitelistedBlock = MustParseCid("bafy2bzaceapyg2uyzk7vueh3xccxkuwbz3nxewjyguoxvhx77malc2lzn2ybi")
|
||||||
|
|
||||||
// CBDeliveryDelay is the delay before deliver in the synchronous consistent broadcast.
|
|
||||||
// This determines the wait time for the detection of potential equivocations.
|
|
||||||
// It is a variable instead of a constant so it can be conveniently configured in tests
|
|
||||||
var CBDeliveryDelay = 2 * time.Second
|
|
||||||
|
@ -1,203 +0,0 @@
|
|||||||
package bcast
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
var log = logging.Logger("sub-cb")
|
|
||||||
|
|
||||||
const (
|
|
||||||
// GcSanityCheck determines the number of epochs in the past
|
|
||||||
// that will be garbage collected from the current epoch.
|
|
||||||
GcSanityCheck = 100
|
|
||||||
// GcLookback determines the number of epochs kept in the consistent
|
|
||||||
// broadcast cache.
|
|
||||||
GcLookback = 5
|
|
||||||
// GcDeepCheck determines the number of epochs in the past that we
|
|
||||||
// we try cleaning in the deep garbage collection round.
|
|
||||||
GcDeepCheck = 2880 // (24h*60m*60s)/30s per epoch
|
|
||||||
// GcDeepInterval determines after the number of epochs for which
|
|
||||||
// we are going to start a deeper garbage collection round.
|
|
||||||
GcDeepInterval = 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
type blksInfo struct {
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
blks []cid.Cid
|
|
||||||
}
|
|
||||||
|
|
||||||
type bcastDict struct {
|
|
||||||
m map[string]*blksInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bd *bcastDict) load(key []byte) (*blksInfo, bool) {
|
|
||||||
v, ok := bd.m[string(key)]
|
|
||||||
if !ok {
|
|
||||||
return nil, ok
|
|
||||||
}
|
|
||||||
return v, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bd *bcastDict) blkLen(key []byte) int {
|
|
||||||
return len(bd.m[string(key)].blks)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bd *bcastDict) store(key []byte, d *blksInfo) {
|
|
||||||
bd.m[string(key)] = d
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConsistentBCast tracks recent information about the
|
|
||||||
// blocks and tickets received at different epochs
|
|
||||||
type ConsistentBCast struct {
|
|
||||||
lk sync.RWMutex
|
|
||||||
delay time.Duration
|
|
||||||
m map[abi.ChainEpoch]*bcastDict
|
|
||||||
lastDeepGc abi.ChainEpoch
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBcastDict() *bcastDict {
|
|
||||||
return &bcastDict{m: make(map[string]*blksInfo)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BCastKey(bh *types.BlockHeader) []byte {
|
|
||||||
return bh.Ticket.VRFProof
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
|
|
||||||
return &ConsistentBCast{
|
|
||||||
delay: delay,
|
|
||||||
m: make(map[abi.ChainEpoch]*bcastDict),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func cidExists(cids []cid.Cid, c cid.Cid) bool {
|
|
||||||
for _, v := range cids {
|
|
||||||
if v == c {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bInfo *blksInfo) eqErr() error {
|
|
||||||
bInfo.cancel()
|
|
||||||
return xerrors.Errorf("different blocks with the same ticket already seen")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cb *ConsistentBCast) Len() int {
|
|
||||||
cb.lk.RLock()
|
|
||||||
defer cb.lk.RUnlock()
|
|
||||||
return len(cb.m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RcvBlock is called every time a new block is received through the network.
|
|
||||||
//
|
|
||||||
// This function keeps track of all the blocks with a specific VRFProof received
|
|
||||||
// for the same height. Every time a new block with a VRFProof not seen at certain
|
|
||||||
// height is received, a new timer is triggered to wait for the delay time determined by
|
|
||||||
// the consistent broadcast before informing the syncer. During this time, if a new
|
|
||||||
// block with the same VRFProof for that height is received, it means a miner is
|
|
||||||
// trying to equivocate, and both blocks are discarded.
|
|
||||||
//
|
|
||||||
// The delay time should be set to a value high enough to allow any block sent for
|
|
||||||
// certain epoch to be propagated to a large amount of miners in the network.
|
|
||||||
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
|
|
||||||
cb.lk.Lock()
|
|
||||||
defer cb.lk.Unlock()
|
|
||||||
bcastDict, ok := cb.m[blk.Header.Height]
|
|
||||||
if !ok {
|
|
||||||
bcastDict = newBcastDict()
|
|
||||||
cb.m[blk.Header.Height] = bcastDict
|
|
||||||
}
|
|
||||||
|
|
||||||
key := BCastKey(blk.Header)
|
|
||||||
blkCid := blk.Cid()
|
|
||||||
|
|
||||||
bInfo, ok := bcastDict.load(key)
|
|
||||||
if ok {
|
|
||||||
if len(bInfo.blks) > 1 {
|
|
||||||
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !cidExists(bInfo.blks, blkCid) {
|
|
||||||
bcastDict.store(key, &blksInfo{bInfo.ctx, bInfo.cancel, append(bInfo.blks, blkCid)})
|
|
||||||
// By calling bInfo.eqErr() inside this log we cancel the context for all blocks waiting for
|
|
||||||
// the epoch-ticket combination making them to fail and not be sent to the syncer, as
|
|
||||||
// a potential equivocation is detected.
|
|
||||||
log.Errorf("equivocation detected for height %d: %s", blk.Header.Height, bInfo.eqErr())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, cb.delay)
|
|
||||||
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitForDelivery is called before informing the syncer about a new block
|
|
||||||
// to check if the consistent broadcast delay triggered or if the block should
|
|
||||||
// be held off for a bit more time.
|
|
||||||
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
|
|
||||||
cb.lk.RLock()
|
|
||||||
defer cb.lk.RUnlock()
|
|
||||||
|
|
||||||
bcastDict, ok := cb.m[bh.Height]
|
|
||||||
if !ok {
|
|
||||||
return xerrors.Errorf("block at height %d garbage collected before it could be processed", bh.Height)
|
|
||||||
}
|
|
||||||
key := BCastKey(bh)
|
|
||||||
bInfo, ok := bcastDict.load(key)
|
|
||||||
if !ok {
|
|
||||||
return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the timeout
|
|
||||||
cb.lk.RUnlock()
|
|
||||||
<-bInfo.ctx.Done()
|
|
||||||
cb.lk.RLock()
|
|
||||||
if bcastDict.blkLen(key) > 1 {
|
|
||||||
return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GarbageCollect cleans the consistent broadcast cache periodically.
|
|
||||||
//
|
|
||||||
// A light garbage collection is triggered before every block delivery
|
|
||||||
// while a deeper one is triggered once every GcDeepCheck to ensure
|
|
||||||
// that nothing was left behind.
|
|
||||||
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
|
|
||||||
cb.lk.Lock()
|
|
||||||
defer cb.lk.Unlock()
|
|
||||||
|
|
||||||
// perform a deeper sanity check every now and then
|
|
||||||
gcRange := GcSanityCheck
|
|
||||||
if cb.lastDeepGc+GcDeepInterval > currEpoch {
|
|
||||||
gcRange = GcDeepCheck
|
|
||||||
cb.lastDeepGc = currEpoch
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep currEpoch-gcRange and delete a few more in the past
|
|
||||||
// as a sanity-check
|
|
||||||
// Garbage collection is triggered before block delivery,
|
|
||||||
// and we use the sanity-check in case there were a few rounds
|
|
||||||
// without delivery, and the garbage collection wasn't triggered
|
|
||||||
// for a few epochs.
|
|
||||||
for i := 0; i < gcRange; i++ {
|
|
||||||
if currEpoch > GcLookback {
|
|
||||||
delete(cb.m, currEpoch-abi.ChainEpoch(GcLookback+i))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,223 +0,0 @@
|
|||||||
package bcast_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/rand"
|
|
||||||
"fmt"
|
|
||||||
mrand "math/rand"
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"github.com/multiformats/go-multihash"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/sub/bcast"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
const TEST_DELAY = 1 * time.Second
|
|
||||||
|
|
||||||
func TestSimpleDelivery(t *testing.T) {
|
|
||||||
cb := bcast.NewConsistentBCast(TEST_DELAY)
|
|
||||||
// Check that we wait for delivery.
|
|
||||||
start := time.Now()
|
|
||||||
testSimpleDelivery(t, cb, 100, 5)
|
|
||||||
since := time.Since(start)
|
|
||||||
require.GreaterOrEqual(t, since, TEST_DELAY)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testSimpleDelivery(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
errs := make([]error, 0)
|
|
||||||
wg.Add(numBlocks)
|
|
||||||
for i := 0; i < numBlocks; i++ {
|
|
||||||
go func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
// Add a random delay in block reception
|
|
||||||
r := mrand.Intn(200)
|
|
||||||
time.Sleep(time.Duration(r) * time.Millisecond)
|
|
||||||
blk := newBlock(t, epoch, randomProof(t), []byte("test"+strconv.Itoa(i)))
|
|
||||||
cb.RcvBlock(ctx, blk)
|
|
||||||
err := cb.WaitForDelivery(blk.Header)
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
for _, v := range errs {
|
|
||||||
t.Fatalf("error in delivery: %s", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSeveralEpochs(t *testing.T) {
|
|
||||||
cb := bcast.NewConsistentBCast(TEST_DELAY)
|
|
||||||
numEpochs := 6
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
wg.Add(numEpochs)
|
|
||||||
for i := 0; i < numEpochs; i++ {
|
|
||||||
go func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
// Add a random delay between epochs
|
|
||||||
r := mrand.Intn(500)
|
|
||||||
time.Sleep(time.Duration(i)*TEST_DELAY + time.Duration(r)*time.Millisecond)
|
|
||||||
rNumBlocks := mrand.Intn(5)
|
|
||||||
flip, err := flipCoin(0.7)
|
|
||||||
require.NoError(t, err)
|
|
||||||
t.Logf("Running epoch %d with %d with equivocation=%v", i, rNumBlocks, !flip)
|
|
||||||
if flip {
|
|
||||||
testSimpleDelivery(t, cb, abi.ChainEpoch(i), rNumBlocks)
|
|
||||||
} else {
|
|
||||||
testEquivocation(t, cb, abi.ChainEpoch(i), rNumBlocks)
|
|
||||||
}
|
|
||||||
cb.GarbageCollect(abi.ChainEpoch(i))
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
require.Equal(t, cb.Len(), numEpochs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// bias is expected to be 0-1
|
|
||||||
func flipCoin(bias float32) (bool, error) {
|
|
||||||
if bias > 1 || bias < 0 {
|
|
||||||
return false, fmt.Errorf("wrong bias. expected (0,1)")
|
|
||||||
}
|
|
||||||
r := mrand.Intn(100)
|
|
||||||
return r < int(bias*100), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func testEquivocation(t *testing.T, cb *bcast.ConsistentBCast, epoch abi.ChainEpoch, numBlocks int) {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
errs := make([]error, 0)
|
|
||||||
wg.Add(numBlocks + 1)
|
|
||||||
for i := 0; i < numBlocks; i++ {
|
|
||||||
proof := randomProof(t)
|
|
||||||
// Valid blocks
|
|
||||||
go func(i int, proof []byte) {
|
|
||||||
defer wg.Done()
|
|
||||||
r := mrand.Intn(200)
|
|
||||||
time.Sleep(time.Duration(r) * time.Millisecond)
|
|
||||||
blk := newBlock(t, epoch, proof, []byte("valid"+strconv.Itoa(i)))
|
|
||||||
cb.RcvBlock(ctx, blk)
|
|
||||||
err := cb.WaitForDelivery(blk.Header)
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
}(i, proof)
|
|
||||||
|
|
||||||
// Equivocation for the last block
|
|
||||||
if i == numBlocks-1 {
|
|
||||||
// Attempting equivocation
|
|
||||||
go func(i int, proof []byte) {
|
|
||||||
defer wg.Done()
|
|
||||||
// Use the same proof and the same epoch
|
|
||||||
blk := newBlock(t, epoch, proof, []byte("invalid"+strconv.Itoa(i)))
|
|
||||||
cb.RcvBlock(ctx, blk)
|
|
||||||
err := cb.WaitForDelivery(blk.Header)
|
|
||||||
// Equivocation detected
|
|
||||||
require.Error(t, err)
|
|
||||||
}(i, proof)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// The equivocated block arrived too late, so
|
|
||||||
// we delivered all the valid blocks.
|
|
||||||
require.Len(t, errs, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEquivocation(t *testing.T) {
|
|
||||||
cb := bcast.NewConsistentBCast(TEST_DELAY)
|
|
||||||
testEquivocation(t, cb, 100, 5)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFailedEquivocation(t *testing.T) {
|
|
||||||
cb := bcast.NewConsistentBCast(TEST_DELAY)
|
|
||||||
ctx := context.Background()
|
|
||||||
numBlocks := 5
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
errs := make([]error, 0)
|
|
||||||
wg.Add(numBlocks + 1)
|
|
||||||
for i := 0; i < numBlocks; i++ {
|
|
||||||
proof := randomProof(t)
|
|
||||||
// Valid blocks
|
|
||||||
go func(i int, proof []byte) {
|
|
||||||
defer wg.Done()
|
|
||||||
r := mrand.Intn(200)
|
|
||||||
time.Sleep(time.Duration(r) * time.Millisecond)
|
|
||||||
blk := newBlock(t, 100, proof, []byte("valid"+strconv.Itoa(i)))
|
|
||||||
cb.RcvBlock(ctx, blk)
|
|
||||||
err := cb.WaitForDelivery(blk.Header)
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
}
|
|
||||||
}(i, proof)
|
|
||||||
|
|
||||||
// Equivocation for the last block
|
|
||||||
if i == numBlocks-1 {
|
|
||||||
// Attempting equivocation
|
|
||||||
go func(i int, proof []byte) {
|
|
||||||
defer wg.Done()
|
|
||||||
// The equivocated block arrives late
|
|
||||||
time.Sleep(2 * TEST_DELAY)
|
|
||||||
// Use the same proof and the same epoch
|
|
||||||
blk := newBlock(t, 100, proof, []byte("invalid"+strconv.Itoa(i)))
|
|
||||||
cb.RcvBlock(ctx, blk)
|
|
||||||
err := cb.WaitForDelivery(blk.Header)
|
|
||||||
// Equivocation detected
|
|
||||||
require.Error(t, err)
|
|
||||||
}(i, proof)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// The equivocated block arrived too late, so
|
|
||||||
// we delivered all the valid blocks.
|
|
||||||
require.Len(t, errs, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func randomProof(t *testing.T) []byte {
|
|
||||||
proof := make([]byte, 10)
|
|
||||||
_, err := rand.Read(proof)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return proof
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBlock(t *testing.T, epoch abi.ChainEpoch, proof []byte, mCidSeed []byte) *types.BlockMsg {
|
|
||||||
h, err := multihash.Sum(mCidSeed, multihash.SHA2_256, -1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
testCid := cid.NewCidV0(h)
|
|
||||||
addr, err := address.NewIDAddress(10)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
bh := &types.BlockHeader{
|
|
||||||
Miner: addr,
|
|
||||||
ParentStateRoot: testCid,
|
|
||||||
ParentMessageReceipts: testCid,
|
|
||||||
Ticket: &types.Ticket{
|
|
||||||
VRFProof: proof,
|
|
||||||
},
|
|
||||||
Height: epoch,
|
|
||||||
Messages: testCid,
|
|
||||||
}
|
|
||||||
return &types.BlockMsg{
|
|
||||||
Header: bh,
|
|
||||||
}
|
|
||||||
}
|
|
@ -27,7 +27,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/consensus"
|
"github.com/filecoin-project/lotus/chain/consensus"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/sub/bcast"
|
|
||||||
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
|
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
@ -44,11 +43,10 @@ var msgCidPrefix = cid.Prefix{
|
|||||||
MhLength: 32,
|
MhLength: 32,
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self peer.ID, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
|
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
|
||||||
// Timeout after (block time + propagation delay). This is useless at
|
// Timeout after (block time + propagation delay). This is useless at
|
||||||
// this point.
|
// this point.
|
||||||
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
|
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
|
||||||
cb := bcast.NewConsistentBCast(build.CBDeliveryDelay)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
msg, err := bsub.Next(ctx)
|
msg, err := bsub.Next(ctx)
|
||||||
@ -69,9 +67,6 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p
|
|||||||
|
|
||||||
src := msg.GetFrom()
|
src := msg.GetFrom()
|
||||||
|
|
||||||
// Notify consistent broadcast about a new block
|
|
||||||
cb.RcvBlock(ctx, blk)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -107,20 +102,6 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p
|
|||||||
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
|
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
|
||||||
}
|
}
|
||||||
|
|
||||||
// When we propose a new block ourselves, the proposed block also gets here through SyncSubmitBlock.
|
|
||||||
// If we are the block proposers we don't need to wait for delivery, we know the blocks are
|
|
||||||
// honest.
|
|
||||||
if src != self {
|
|
||||||
log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height)
|
|
||||||
if err := cb.WaitForDelivery(blk.Header); err != nil {
|
|
||||||
log.Errorf("not informing syncer about new block, potential equivocation detected (cid: %s, source: %s): %s; ", blk.Header.Cid(), src, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Garbage collect the broadcast state
|
|
||||||
cb.GarbageCollect(blk.Header.Height)
|
|
||||||
log.Debugf("Block in height %v delivered successfully (cid=%s)", blk.Header.Height, blk.Cid())
|
|
||||||
|
|
||||||
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
|
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
|
||||||
Header: blk.Header,
|
Header: blk.Header,
|
||||||
BlsMessages: bmsgs,
|
BlsMessages: bmsgs,
|
||||||
|
@ -45,11 +45,6 @@ func init() {
|
|||||||
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
|
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
|
||||||
policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048))
|
policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048))
|
||||||
policy.SetMinVerifiedDealSize(abi.NewStoragePower(256))
|
policy.SetMinVerifiedDealSize(abi.NewStoragePower(256))
|
||||||
|
|
||||||
// these tests assume really fast block times. disabling
|
|
||||||
// the consistent broadcast delay to avoid them from adding
|
|
||||||
// an unnecessary overhead.
|
|
||||||
build.CBDeliveryDelay = 2 * time.Millisecond
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const source = 0
|
const source = 0
|
||||||
|
@ -40,11 +40,6 @@ func init() {
|
|||||||
|
|
||||||
build.InsecurePoStValidation = true
|
build.InsecurePoStValidation = true
|
||||||
|
|
||||||
// Disabling consistent broadcast in itests. Many tests use really fast
|
|
||||||
// block times, and adding this additional delay for block delivery
|
|
||||||
// would make these tests to fail.
|
|
||||||
build.CBDeliveryDelay = 0
|
|
||||||
|
|
||||||
if err := os.Setenv("BELLMAN_NO_GPU", "1"); err != nil {
|
if err := os.Setenv("BELLMAN_NO_GPU", "1"); err != nil {
|
||||||
panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err))
|
panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err))
|
||||||
}
|
}
|
||||||
|
@ -167,7 +167,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx,
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go sub.HandleIncomingBlocks(ctx, blocksub, h.ID(), s, bserv, h.ConnManager())
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
||||||
|
Loading…
Reference in New Issue
Block a user