wip: draft impl of consistent bcast
This commit is contained in:
parent
29fff4fb06
commit
c03ad9dcfd
132
chain/sub/bcast/consistent.go
Normal file
132
chain/sub/bcast/consistent.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
package bcast
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/multiformats/go-multihash"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: Take const out of here and make them build params.
|
||||||
|
const (
|
||||||
|
DELAY = 6 * time.Second
|
||||||
|
GC_SANITY_CHECK = 5
|
||||||
|
GC_LOOKBACK = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
type blksInfo struct {
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
blks []cid.Cid
|
||||||
|
}
|
||||||
|
|
||||||
|
type bcastDict struct {
|
||||||
|
// TODO: Consider making this a KeyMutexed map
|
||||||
|
lk sync.RWMutex
|
||||||
|
blks map[cid.Cid]*blksInfo // map[epoch + VRFProof]blksInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConsistentBCast struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
delay time.Duration
|
||||||
|
// FIXME: Make this a slice??? Less storage but needs indexing logic.
|
||||||
|
m map[abi.ChainEpoch]*bcastDict
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBcastDict(delay time.Duration) *bcastDict {
|
||||||
|
return &bcastDict{
|
||||||
|
blks: make(map[cid.Cid]*blksInfo),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: What if the VRFProof is already small?? We don´t need the CID. Useless computation.
|
||||||
|
func BCastKey(bh *types.BlockHeader) cid.Cid {
|
||||||
|
proof := bh.Ticket.VRFProof
|
||||||
|
binary.PutVarint(proof, int64(bh.Height))
|
||||||
|
return cid.NewCidV0(multihash.Multihash(proof))
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConsistentBCast(delay time.Duration) *ConsistentBCast {
|
||||||
|
return &ConsistentBCast{
|
||||||
|
delay: delay,
|
||||||
|
m: make(map[abi.ChainEpoch]*bcastDict),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func cidExists(cids []cid.Cid, c cid.Cid) bool {
|
||||||
|
for _, v := range cids {
|
||||||
|
if v == c {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bInfo *blksInfo) eqErr() error {
|
||||||
|
bInfo.cancel()
|
||||||
|
return fmt.Errorf("equivocation error detected. Different block with the same ticket already seen")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *ConsistentBCast) RcvBlock(ctx context.Context, blk *types.BlockMsg) error {
|
||||||
|
cb.lk.Lock()
|
||||||
|
bcastDict, ok := cb.m[blk.Header.Height]
|
||||||
|
if !ok {
|
||||||
|
bcastDict = newBcastDict(cb.delay)
|
||||||
|
}
|
||||||
|
cb.lk.Unlock()
|
||||||
|
key := BCastKey(blk.Header)
|
||||||
|
blkCid := blk.Cid()
|
||||||
|
|
||||||
|
bcastDict.lk.Lock()
|
||||||
|
defer bcastDict.lk.Unlock()
|
||||||
|
bInfo, ok := bcastDict.blks[key]
|
||||||
|
if ok {
|
||||||
|
if len(bInfo.blks) > 1 {
|
||||||
|
return bInfo.eqErr()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !cidExists(bInfo.blks, blkCid) {
|
||||||
|
bInfo.blks = append(bInfo.blks, blkCid)
|
||||||
|
return bInfo.eqErr()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, cb.delay)
|
||||||
|
bcastDict.blks[key] = &blksInfo{ctx, cancel, []cid.Cid{blkCid}}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *ConsistentBCast) WaitForDelivery(bh *types.BlockHeader) error {
|
||||||
|
bcastDict := cb.m[bh.Height]
|
||||||
|
key := BCastKey(bh)
|
||||||
|
bcastDict.lk.RLock()
|
||||||
|
defer bcastDict.lk.RUnlock()
|
||||||
|
bInfo, ok := bcastDict.blks[key]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("something went wrong, unknown block with Epoch + VRFProof (cid=%s) in consistent broadcast storage", key)
|
||||||
|
}
|
||||||
|
// Wait for the timeout
|
||||||
|
<-bInfo.ctx.Done()
|
||||||
|
if len(bInfo.blks) > 1 {
|
||||||
|
return fmt.Errorf("equivocation detected for epoch %d. Two blocks being broadcast with same VRFProof", bh.Height)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cb *ConsistentBCast) GarbageCollect(currEpoch abi.ChainEpoch) {
|
||||||
|
cb.lk.Lock()
|
||||||
|
defer cb.lk.Unlock()
|
||||||
|
|
||||||
|
// keep currEpoch-2 and delete a few more in the past
|
||||||
|
// as a sanity-check
|
||||||
|
for i := 0; i < GC_SANITY_CHECK; i++ {
|
||||||
|
delete(cb.m, currEpoch-abi.ChainEpoch(2-i))
|
||||||
|
}
|
||||||
|
}
|
29
chain/sub/bcast/consistent_test.go
Normal file
29
chain/sub/bcast/consistent_test.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package bcast_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSimpleDelivery(t *testing.T) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBlock(t *testing.T, epoch abi.ChainEpoch) *types.BlockMsg {
|
||||||
|
proof := make([]byte, 10)
|
||||||
|
_, err := rand.Read(proof)
|
||||||
|
if err != err {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
bh := &types.BlockHeader{
|
||||||
|
Ticket: &types.Ticket{
|
||||||
|
VRFProof: []byte("vrf proof0000000vrf proof0000000"),
|
||||||
|
},
|
||||||
|
Height: 85919298723,
|
||||||
|
}
|
||||||
|
return &types.BlockMsg{
|
||||||
|
Header: bh,
|
||||||
|
}
|
||||||
|
}
|
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/consensus"
|
"github.com/filecoin-project/lotus/chain/consensus"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/sub/bcast"
|
||||||
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
|
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/metrics"
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
@ -47,6 +48,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
// Timeout after (block time + propagation delay). This is useless at
|
// Timeout after (block time + propagation delay). This is useless at
|
||||||
// this point.
|
// this point.
|
||||||
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
|
timeout := time.Duration(build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
|
||||||
|
cb := bcast.NewConsistentBCast(bcast.DELAY)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
msg, err := bsub.Next(ctx)
|
msg, err := bsub.Next(ctx)
|
||||||
@ -67,6 +69,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
|
|
||||||
src := msg.GetFrom()
|
src := msg.GetFrom()
|
||||||
|
|
||||||
|
// Notify consistent broadcast about a new block
|
||||||
|
cb.RcvBlock(ctx, blk)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@ -102,6 +107,14 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
|
log.Warnw("received block with large delay from miner", "block", blk.Cid(), "delay", delay, "miner", blk.Header.Miner)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := cb.WaitForDelivery(blk.Header); err != nil {
|
||||||
|
log.Errorf("couldn't deliver block to syncer over pubsub: %s; source: %s", err, src)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Garbage collect the broadcast state
|
||||||
|
cb.GarbageCollect(blk.Header.Height)
|
||||||
|
|
||||||
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
|
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
|
||||||
Header: blk.Header,
|
Header: blk.Header,
|
||||||
BlsMessages: bmsgs,
|
BlsMessages: bmsgs,
|
||||||
|
Loading…
Reference in New Issue
Block a user