address review
This commit is contained in:
parent
f59c246c7a
commit
8d260d7478
@ -2,14 +2,12 @@ package bcast
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
@ -24,7 +22,7 @@ const (
|
||||
GcSanityCheck = 5
|
||||
// GcLookback determines the number of epochs kept in the consistent
|
||||
// broadcast cache.
|
||||
GcLookback = 2
|
||||
GcLookback = 1000
|
||||
)
|
||||
|
||||
type blksInfo struct {
|
||||
@ -41,20 +39,20 @@ type bcastDict struct {
|
||||
m *sync.Map
|
||||
}
|
||||
|
||||
func (bd *bcastDict) load(key multihash.Multihash) (*blksInfo, bool) {
|
||||
v, ok := bd.m.Load(key.String())
|
||||
func (bd *bcastDict) load(key []byte) (*blksInfo, bool) {
|
||||
v, ok := bd.m.Load(key)
|
||||
if !ok {
|
||||
return nil, ok
|
||||
}
|
||||
return v.(*blksInfo), ok
|
||||
}
|
||||
|
||||
func (bd *bcastDict) store(key multihash.Multihash, d *blksInfo) {
|
||||
bd.m.Store(key.String(), d)
|
||||
func (bd *bcastDict) store(key []byte, d *blksInfo) {
|
||||
bd.m.Store(key, d)
|
||||
}
|
||||
|
||||
func (bd *bcastDict) blkLen(key multihash.Multihash) int {
|
||||
v, ok := bd.m.Load(key.String())
|
||||
func (bd *bcastDict) blkLen(key []byte) int {
|
||||
v, ok := bd.m.Load(key)
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
@ -64,21 +62,15 @@ func (bd *bcastDict) blkLen(key multihash.Multihash) int {
|
||||
type ConsistentBCast struct {
|
||||
lk sync.RWMutex
|
||||
delay time.Duration
|
||||
// FIXME: Make this a slice??? Less storage but needs indexing logic.
|
||||
m map[abi.ChainEpoch]*bcastDict
|
||||
m map[abi.ChainEpoch]*bcastDict
|
||||
}
|
||||
|
||||
func newBcastDict() *bcastDict {
|
||||
return &bcastDict{new(sync.Map)}
|
||||
}
|
||||
|
||||
// TODO: the VRFProof may already be small enough so we may not need to use a hash here.
|
||||
// we can maybe bypass the useless computation.
|
||||
func BCastKey(bh *types.BlockHeader) (multihash.Multihash, error) {
|
||||
k := make([]byte, len(bh.Ticket.VRFProof))
|
||||
copy(k, bh.Ticket.VRFProof)
|
||||
binary.PutVarint(k, int64(bh.Height))
|
||||
return multihash.Sum(k, multihash.SHA2_256, -1)
|
||||
func BCastKey(bh *types.BlockHeader) []byte {
|
||||
return bh.Ticket.VRFProof
|
||||
}
|
||||
|
||||
func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
|
||||
@ -99,7 +91,7 @@ func cidExists(cids []cid.Cid, c cid.Cid) bool {
|
||||
|
||||
func (bInfo *blksInfo) eqErr() error {
|
||||
bInfo.cancel()
|
||||
return fmt.Errorf("different blocks with the same ticket already seen")
|
||||
return xerrors.Errorf("different blocks with the same ticket already seen")
|
||||
}
|
||||
|
||||
func (cb *ConsistentBCast) Len() int {
|
||||
@ -108,6 +100,17 @@ func (cb *ConsistentBCast) Len() int {
|
||||
return len(cb.m)
|
||||
}
|
||||
|
||||
// RcvBlock is called every time a new block is received through the network.
|
||||
//
|
||||
// This function keeps track of all the blocks with a specific VRFProof received
|
||||
// for the same height. Every time a new block with a VRFProof not seen at certain
|
||||
// height is received, a new timer is triggered to wait for the delay time determined by
|
||||
// the consistent broadcast before informing the syncer. During this time, if a new
|
||||
// block with the same VRFProof for that height is received, it means a miner is
|
||||
// trying to equivocate, and both blocks are discarded.
|
||||
//
|
||||
// The delay time should be set to a value high enough to allow any block sent for
|
||||
// certain epoch to be propagated to a large amount of miners in the network.
|
||||
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
|
||||
cb.lk.Lock()
|
||||
bcastDict, ok := cb.m[blk.Header.Height]
|
||||
@ -116,11 +119,7 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
|
||||
cb.m[blk.Header.Height] = bcastDict
|
||||
}
|
||||
cb.lk.Unlock()
|
||||
key, err := BCastKey(blk.Header)
|
||||
if err != nil {
|
||||
log.Errorf("couldn't hash blk info for height %d: %s", blk.Header.Height, err)
|
||||
return
|
||||
}
|
||||
key := BCastKey(blk.Header)
|
||||
blkCid := blk.Cid()
|
||||
|
||||
bInfo, ok := bcastDict.load(key)
|
||||
@ -142,22 +141,22 @@ func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) {
|
||||
bcastDict.store(key, &blksInfo{ctx, cancel, []cid.Cid{blkCid}})
|
||||
}
|
||||
|
||||
// WaitForDelivery is called before informing the syncer about a new block
|
||||
// to check if the consistent broadcast delay triggered or if the block should
|
||||
// be held off for a bit more time.
|
||||
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
|
||||
cb.lk.RLock()
|
||||
bcastDict := cb.m[bh.Height]
|
||||
cb.lk.RUnlock()
|
||||
key, err := BCastKey(bh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := BCastKey(bh)
|
||||
bInfo, ok := bcastDict.load(key)
|
||||
if !ok {
|
||||
return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
|
||||
return xerrors.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
|
||||
}
|
||||
// Wait for the timeout
|
||||
<-bInfo.ctx.Done()
|
||||
if bcastDict.blkLen(key) > 1 {
|
||||
return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
|
||||
return xerrors.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -113,13 +113,13 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, self p
|
||||
if src != self {
|
||||
log.Debugf("Waiting for consistent broadcast of block in height: %v", blk.Header.Height)
|
||||
if err := cb.WaitForDelivery(blk.Header); err != nil {
|
||||
log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src)
|
||||
log.Errorf("not informing syncer about new block, potential equivocation detected (cid: %s, source: %s): %s; ", blk.Header.Cid(), src, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
// Garbage collect the broadcast state
|
||||
cb.GarbageCollect(blk.Header.Height)
|
||||
log.Debugf("Block in height %v delivered successfully (cid=)", blk.Header.Height, blk.Cid())
|
||||
log.Debugf("Block in height %v delivered successfully (cid=%s)", blk.Header.Height, blk.Cid())
|
||||
|
||||
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
|
||||
Header: blk.Header,
|
||||
|
Loading…
Reference in New Issue
Block a user