give syncer ability to deal with forked chains

This commit is contained in:
whyrusleeping 2019-10-05 18:51:48 -06:00
parent 5cb819b322
commit 0182b804a2
4 changed files with 79 additions and 23 deletions

View File

@ -361,6 +361,10 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
for _, mi := range bts.BlsMsgIncludes[i] { for _, mi := range bts.BlsMsgIncludes[i] {
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi]) fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
} }
for _, mi := range bts.SecpkMsgIncludes[i] {
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
}
fts.Blocks = append(fts.Blocks, fb) fts.Blocks = append(fts.Blocks, fb)
} }

View File

@ -31,7 +31,7 @@ import (
var log = logging.Logger("gen") var log = logging.Logger("gen")
const msgsPerBlock = 0 const msgsPerBlock = 5
type ChainGen struct { type ChainGen struct {
accounts []address.Address accounts []address.Address
@ -308,6 +308,18 @@ func (cg *ChainGen) makeBlock(parents *types.TipSet, m address.Address, eproof t
return fblk, err return fblk, err
} }
// This function is awkward. It's used to deal with messages made when
// simulating forks
func (cg *ChainGen) ResyncBankerNonce(ts *types.TipSet) error {
act, err := cg.sm.GetActor(cg.banker, ts)
if err != nil {
return err
}
cg.bankerNonce = act.Nonce
return nil
}
func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) { func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, cg.msgsPerBlock) msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
for m := range msgs { for m := range msgs {

View File

@ -23,7 +23,6 @@ import (
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
@ -333,6 +332,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
func (syncer *Syncer) Sync(maybeHead *types.TipSet) error { func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
ctx := context.TODO()
syncer.syncLock.Lock() syncer.syncLock.Lock()
defer syncer.syncLock.Unlock() defer syncer.syncLock.Unlock()
@ -340,12 +340,12 @@ func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
return nil return nil
} }
if err := syncer.collectChain(maybeHead); err != nil { if err := syncer.collectChain(ctx, maybeHead); err != nil {
return xerrors.Errorf("collectChain failed: %w", err) return xerrors.Errorf("collectChain failed: %w", err)
} }
if err := syncer.store.PutTipSet(maybeHead); err != nil { if err := syncer.store.PutTipSet(maybeHead); err != nil {
return errors.Wrap(err, "failed to put synced tipset to chainstore") return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
} }
return nil return nil
@ -506,7 +506,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
} }
if nonces[m.From] != m.Nonce { if nonces[m.From] != m.Nonce {
return xerrors.Errorf("wrong nonce") return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce)
} }
nonces[m.From]++ nonces[m.From]++
@ -560,7 +560,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return nil return nil
} }
func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
blockSet := []*types.TipSet{from} blockSet := []*types.TipSet{from}
at := from.Parents() at := from.Parents()
@ -629,13 +629,48 @@ loop:
// TODO: handle the case where we are on a fork and its not a simple fast forward // TODO: handle the case where we are on a fork and its not a simple fast forward
// need to walk back to either a common ancestor, or until we hit the fork length threshold. // need to walk back to either a common ancestor, or until we hit the fork length threshold.
return nil, xerrors.Errorf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
fork, err := syncer.syncFork(ctx, last, to)
if err != nil {
return nil, xerrors.Errorf("failed to sync fork: %w", err)
}
blockSet = append(blockSet, fork...)
} }
return blockSet, nil return blockSet, nil
} }
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), 100)
if err != nil {
return nil, err
}
nts, err := syncer.store.LoadTipSet(to.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
}
for cur := 0; cur < len(tips); {
if nts.Equals(tips[cur]) {
return tips[:cur+1], nil
}
if nts.Height() < tips[cur].Height() {
cur++
} else {
nts, err = syncer.store.LoadTipSet(nts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading next local tipset: %w", err)
}
}
}
return nil, xerrors.Errorf("fork was longer than our threshold")
}
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
syncer.syncState.SetHeight(0) syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error { return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
@ -661,7 +696,6 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
return err return err
} }
if fts == nil { if fts == nil {
fmt.Println("Failed to fill tipset for: ", beg, headers[beg].Cids(), headers[beg].Height())
break break
} }
if err := cb(fts); err != nil { if err := cb(fts); err != nil {
@ -741,10 +775,10 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
return nil return nil
} }
func (syncer *Syncer) collectChain(ts *types.TipSet) error { func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts) syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
headers, err := syncer.collectHeaders(ts, syncer.store.GetHeaviestTipSet()) headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
if err != nil { if err != nil {
return err return err
} }

View File

@ -288,14 +288,18 @@ func (tu *syncTestUtil) compareSourceState(with int) {
} }
func (tu *syncTestUtil) waitUntilSync(from, to int) { func (tu *syncTestUtil) waitUntilSync(from, to int) {
ctx, cancel := context.WithCancel(context.Background()) target, err := tu.nds[from].ChainHead(tu.ctx)
defer cancel()
target, err := tu.nds[from].ChainHead(ctx)
if err != nil { if err != nil {
tu.t.Fatal(err) tu.t.Fatal(err)
} }
tu.waitUntilSyncTarget(to, target)
}
func (tu *syncTestUtil) waitUntilSyncTarget(to int, target *types.TipSet) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hc, err := tu.nds[to].ChainNotify(ctx) hc, err := tu.nds[to].ChainNotify(ctx)
if err != nil { if err != nil {
tu.t.Fatal(err) tu.t.Fatal(err)
@ -389,17 +393,18 @@ func TestSyncFork(t *testing.T) {
base := tu.g.CurTipset base := tu.g.CurTipset
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height()) fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
a := tu.mineOnBlock(base, p1, []int{0})
// The two nodes fork at this point into 'a' and 'b'
a1 := tu.mineOnBlock(base, p1, []int{0})
a := tu.mineOnBlock(a1, p1, []int{0})
a = tu.mineOnBlock(a, p1, []int{0})
tu.g.ResyncBankerNonce(a1.TipSet())
// chain B will now be heaviest
b := tu.mineOnBlock(base, p2, []int{1}) b := tu.mineOnBlock(base, p2, []int{1})
phead()
a = tu.mineOnBlock(a, p1, []int{0})
b = tu.mineOnBlock(b, p2, []int{1}) b = tu.mineOnBlock(b, p2, []int{1})
phead()
a = tu.mineOnBlock(a, p1, []int{0})
b = tu.mineOnBlock(b, p2, []int{1}) b = tu.mineOnBlock(b, p2, []int{1})
phead() b = tu.mineOnBlock(b, p2, []int{1})
fmt.Println("A: ", a.Cids(), a.TipSet().Height()) fmt.Println("A: ", a.Cids(), a.TipSet().Height())
fmt.Println("B: ", b.Cids(), b.TipSet().Height()) fmt.Println("B: ", b.Cids(), b.TipSet().Height())
@ -408,7 +413,8 @@ func TestSyncFork(t *testing.T) {
require.NoError(t, tu.mn.LinkAll()) require.NoError(t, tu.mn.LinkAll())
tu.connect(p1, p2) tu.connect(p1, p2)
tu.waitUntilSync(p1, p2) tu.waitUntilSyncTarget(p1, b.TipSet())
tu.waitUntilSyncTarget(p2, b.TipSet())
phead() phead()
} }