Merge pull request #290 from filecoin-project/fix/sync-forked-chain
Fix/sync forked chain
This commit is contained in:
commit
d71b8bd8ba
@ -36,7 +36,7 @@ const BlockDelay = 6
|
|||||||
const AllowableClockDrift = BlockDelay * 2
|
const AllowableClockDrift = BlockDelay * 2
|
||||||
|
|
||||||
// Blocks
|
// Blocks
|
||||||
const ForkLengthThreshold = 20
|
const ForkLengthThreshold = 100
|
||||||
|
|
||||||
// /////
|
// /////
|
||||||
// Proofs / Mining
|
// Proofs / Mining
|
||||||
|
@ -361,6 +361,10 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
|
|||||||
for _, mi := range bts.BlsMsgIncludes[i] {
|
for _, mi := range bts.BlsMsgIncludes[i] {
|
||||||
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
|
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
|
||||||
}
|
}
|
||||||
|
for _, mi := range bts.SecpkMsgIncludes[i] {
|
||||||
|
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
|
||||||
|
}
|
||||||
|
|
||||||
fts.Blocks = append(fts.Blocks, fb)
|
fts.Blocks = append(fts.Blocks, fb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,11 +45,11 @@ type ChainGen struct {
|
|||||||
sm *stmgr.StateManager
|
sm *stmgr.StateManager
|
||||||
|
|
||||||
genesis *types.BlockHeader
|
genesis *types.BlockHeader
|
||||||
curTipset *store.FullTipSet
|
CurTipset *store.FullTipSet
|
||||||
|
|
||||||
w *wallet.Wallet
|
w *wallet.Wallet
|
||||||
|
|
||||||
miners []address.Address
|
Miners []address.Address
|
||||||
mworkers []address.Address
|
mworkers []address.Address
|
||||||
receivers []address.Address
|
receivers []address.Address
|
||||||
banker address.Address
|
banker address.Address
|
||||||
@ -104,7 +104,12 @@ func NewGenerator() (*ChainGen, error) {
|
|||||||
return nil, xerrors.Errorf("creating memrepo wallet failed: %w", err)
|
return nil, xerrors.Errorf("creating memrepo wallet failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, err := w.GenerateKey(types.KTBLS)
|
worker1, err := w.GenerateKey(types.KTBLS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to generate worker key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
worker2, err := w.GenerateKey(types.KTBLS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to generate worker key: %w", err)
|
return nil, xerrors.Errorf("failed to generate worker key: %w", err)
|
||||||
}
|
}
|
||||||
@ -123,13 +128,14 @@ func NewGenerator() (*ChainGen, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
minercfg := &GenMinerCfg{
|
minercfg := &GenMinerCfg{
|
||||||
Workers: []address.Address{worker, worker},
|
Workers: []address.Address{worker1, worker2},
|
||||||
Owners: []address.Address{worker, worker},
|
Owners: []address.Address{worker1, worker2},
|
||||||
PeerIDs: []peer.ID{"peerID1", "peerID2"},
|
PeerIDs: []peer.ID{"peerID1", "peerID2"},
|
||||||
}
|
}
|
||||||
|
|
||||||
genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{
|
genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{
|
||||||
worker: types.FromFil(40000),
|
worker1: types.FromFil(40000),
|
||||||
|
worker2: types.FromFil(40000),
|
||||||
banker: types.FromFil(50000),
|
banker: types.FromFil(50000),
|
||||||
}, minercfg, 100000)
|
}, minercfg, 100000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -159,12 +165,12 @@ func NewGenerator() (*ChainGen, error) {
|
|||||||
genesis: genb.Genesis,
|
genesis: genb.Genesis,
|
||||||
w: w,
|
w: w,
|
||||||
|
|
||||||
miners: minercfg.MinerAddrs,
|
Miners: minercfg.MinerAddrs,
|
||||||
mworkers: minercfg.Workers,
|
mworkers: minercfg.Workers,
|
||||||
banker: banker,
|
banker: banker,
|
||||||
receivers: receievers,
|
receivers: receievers,
|
||||||
|
|
||||||
curTipset: gents,
|
CurTipset: gents,
|
||||||
|
|
||||||
r: mr,
|
r: mr,
|
||||||
lr: lr,
|
lr: lr,
|
||||||
@ -191,8 +197,7 @@ func (cg *ChainGen) GenesisCar() ([]byte, error) {
|
|||||||
return out.Bytes(), nil
|
return out.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) {
|
func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) {
|
||||||
pts := cg.curTipset.TipSet()
|
|
||||||
|
|
||||||
var lastTicket *types.Ticket
|
var lastTicket *types.Ticket
|
||||||
if len(ticks) == 0 {
|
if len(ticks) == 0 {
|
||||||
@ -201,7 +206,7 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks
|
|||||||
lastTicket = ticks[len(ticks)-1]
|
lastTicket = ticks[len(ticks)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
st := cg.curTipset.TipSet().ParentState()
|
st := pts.ParentState()
|
||||||
|
|
||||||
worker, err := stmgr.GetMinerWorker(ctx, cg.sm, st, m)
|
worker, err := stmgr.GetMinerWorker(ctx, cg.sm, st, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -241,19 +246,27 @@ type MinedTipSet struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
|
func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
|
||||||
|
mts, err := cg.NextTipSetFromMiners(cg.CurTipset.TipSet(), cg.Miners)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cg.CurTipset = mts.TipSet
|
||||||
|
return mts, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Address) (*MinedTipSet, error) {
|
||||||
var blks []*types.FullBlock
|
var blks []*types.FullBlock
|
||||||
ticketSets := make([][]*types.Ticket, len(cg.miners))
|
ticketSets := make([][]*types.Ticket, len(miners))
|
||||||
|
|
||||||
msgs, err := cg.getRandomMessages()
|
msgs, err := cg.getRandomMessages()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
base := cg.curTipset.TipSet()
|
|
||||||
|
|
||||||
for len(blks) == 0 {
|
for len(blks) == 0 {
|
||||||
for i, m := range cg.miners {
|
for i, m := range miners {
|
||||||
proof, t, err := cg.nextBlockProof(context.TODO(), m, ticketSets[i])
|
proof, t, err := cg.nextBlockProof(context.TODO(), base, m, ticketSets[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("next block proof: %w", err)
|
return nil, xerrors.Errorf("next block proof: %w", err)
|
||||||
}
|
}
|
||||||
@ -265,7 +278,7 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
|
|||||||
return nil, xerrors.Errorf("making a block for next tipset failed: %w", err)
|
return nil, xerrors.Errorf("making a block for next tipset failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cg.cs.AddBlock(fblk.Header); err != nil {
|
if err := cg.cs.PersistBlockHeader(fblk.Header); err != nil {
|
||||||
return nil, xerrors.Errorf("chainstore AddBlock: %w", err)
|
return nil, xerrors.Errorf("chainstore AddBlock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,10 +287,10 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cg.curTipset = store.NewFullTipSet(blks)
|
fts := store.NewFullTipSet(blks)
|
||||||
|
|
||||||
return &MinedTipSet{
|
return &MinedTipSet{
|
||||||
TipSet: cg.curTipset,
|
TipSet: fts,
|
||||||
Messages: msgs,
|
Messages: msgs,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -294,6 +307,18 @@ func (cg *ChainGen) makeBlock(parents *types.TipSet, m address.Address, eproof t
|
|||||||
return fblk, err
|
return fblk, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function is awkward. It's used to deal with messages made when
|
||||||
|
// simulating forks
|
||||||
|
func (cg *ChainGen) ResyncBankerNonce(ts *types.TipSet) error {
|
||||||
|
act, err := cg.sm.GetActor(cg.banker, ts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cg.bankerNonce = act.Nonce
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
|
func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
|
||||||
msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
|
msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
|
||||||
for m := range msgs {
|
for m := range msgs {
|
||||||
@ -325,10 +350,6 @@ func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
|
|||||||
Message: msg,
|
Message: msg,
|
||||||
Signature: *sig,
|
Signature: *sig,
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := cg.cs.PutMessage(msgs[m]); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return msgs, nil
|
return msgs, nil
|
||||||
|
@ -60,8 +60,14 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wal
|
|||||||
|
|
||||||
blsMsgCids = append(blsMsgCids, c)
|
blsMsgCids = append(blsMsgCids, c)
|
||||||
} else {
|
} else {
|
||||||
secpkMsgCids = append(secpkMsgCids, msg.Cid())
|
c, err := sm.ChainStore().PutMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
secpkMsgCids = append(secpkMsgCids, c)
|
||||||
secpkMessages = append(secpkMessages, msg)
|
secpkMessages = append(secpkMessages, msg)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,7 +202,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
|
|||||||
|
|
||||||
// TODO: hardcoding 7000000 here is a little fragile, it changes any
|
// TODO: hardcoding 7000000 here is a little fragile, it changes any
|
||||||
// time anyone changes the initial account allocations
|
// time anyone changes the initial account allocations
|
||||||
rval, err := doExecValue(ctx, vm, actors.StorageMarketAddress, owner, types.FromFil(6000), actors.SMAMethods.CreateStorageMiner, params)
|
rval, err := doExecValue(ctx, vm, actors.StorageMarketAddress, owner, types.FromFil(6500), actors.SMAMethods.CreateStorageMiner, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("failed to create genesis miner: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to create genesis miner: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data))
|
log.Debugf("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data))
|
||||||
|
|
||||||
if err := m.Signature.Verify(m.Message.From, data); err != nil {
|
if err := m.Signature.Verify(m.Message.From, data); err != nil {
|
||||||
log.Warnf("mpooladd signature verification failed: %s", err)
|
log.Warnf("mpooladd signature verification failed: %s", err)
|
||||||
|
@ -174,31 +174,30 @@ func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
|
func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
|
||||||
fts := &FullTipSet{
|
ts, err := types.NewTipSet([]*types.BlockHeader{b})
|
||||||
Blocks: []*types.FullBlock{
|
if err != nil {
|
||||||
{Header: b},
|
return err
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cs.PutTipSet(fts); err != nil {
|
if err := cs.PutTipSet(ts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes())
|
return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) PutTipSet(ts *FullTipSet) error {
|
func (cs *ChainStore) PutTipSet(ts *types.TipSet) error {
|
||||||
for _, b := range ts.Blocks {
|
for _, b := range ts.Blocks() {
|
||||||
if err := cs.persistBlock(b); err != nil {
|
if err := cs.PersistBlockHeader(b); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
expanded, err := cs.expandTipset(ts.TipSet().Blocks()[0])
|
expanded, err := cs.expandTipset(ts.Blocks()[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("errored while expanding tipset: %w", err)
|
return xerrors.Errorf("errored while expanding tipset: %w", err)
|
||||||
}
|
}
|
||||||
log.Debugf("expanded %s into %s\n", ts.TipSet().Cids(), expanded.Cids())
|
log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids())
|
||||||
|
|
||||||
if err := cs.MaybeTakeHeavierTipSet(expanded); err != nil {
|
if err := cs.MaybeTakeHeavierTipSet(expanded); err != nil {
|
||||||
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet")
|
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet")
|
||||||
@ -374,24 +373,6 @@ func (cs *ChainStore) PersistBlockHeader(b *types.BlockHeader) error {
|
|||||||
return cs.bs.Put(sb)
|
return cs.bs.Put(sb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) persistBlock(b *types.FullBlock) error {
|
|
||||||
if err := cs.PersistBlockHeader(b.Header); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range b.BlsMessages {
|
|
||||||
if _, err := cs.PutMessage(m); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, m := range b.SecpkMessages {
|
|
||||||
if _, err := cs.PutMessage(m); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type storable interface {
|
type storable interface {
|
||||||
ToStorageBlock() (block.Block, error)
|
ToStorageBlock() (block.Block, error)
|
||||||
}
|
}
|
||||||
|
169
chain/sync.go
169
chain/sync.go
@ -23,7 +23,6 @@ import (
|
|||||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/pkg/errors"
|
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
@ -94,11 +93,19 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
if fts == nil {
|
if fts == nil {
|
||||||
panic("bad")
|
panic("bad")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, b := range fts.Blocks {
|
||||||
|
if err := syncer.validateMsgMeta(b); err != nil {
|
||||||
|
log.Warnf("invalid block received: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if from == syncer.self {
|
if from == syncer.self {
|
||||||
// TODO: this is kindof a hack...
|
// TODO: this is kindof a hack...
|
||||||
log.Debug("got block from ourselves")
|
log.Info("got block from ourselves")
|
||||||
|
|
||||||
if err := syncer.Sync(fts); err != nil {
|
if err := syncer.Sync(fts.TipSet()); err != nil {
|
||||||
log.Errorf("failed to sync our own block: %+v", err)
|
log.Errorf("failed to sync our own block: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,12 +117,37 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
syncer.Bsync.AddPeer(from)
|
syncer.Bsync.AddPeer(from)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := syncer.Sync(fts); err != nil {
|
if err := syncer.Sync(fts.TipSet()); err != nil {
|
||||||
log.Errorf("sync error: %+v", err)
|
log.Errorf("sync error: %+v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error {
|
||||||
|
var bcids, scids []cbg.CBORMarshaler
|
||||||
|
for _, m := range fblk.BlsMessages {
|
||||||
|
c := cbg.CborCid(m.Cid())
|
||||||
|
bcids = append(bcids, &c)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range fblk.SecpkMessages {
|
||||||
|
c := cbg.CborCid(m.Cid())
|
||||||
|
scids = append(scids, &c)
|
||||||
|
}
|
||||||
|
|
||||||
|
bs := amt.WrapBlockstore(syncer.store.Blockstore())
|
||||||
|
smroot, err := computeMsgMeta(bs, bcids, scids)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fblk.Header.Messages != smroot {
|
||||||
|
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
|
||||||
// TODO: search for other blocks that could form a tipset with this block
|
// TODO: search for other blocks that could form a tipset with this block
|
||||||
// and then send that tipset to InformNewHead
|
// and then send that tipset to InformNewHead
|
||||||
@ -171,11 +203,6 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes
|
|||||||
smsgCids = append(smsgCids, &c)
|
smsgCids = append(smsgCids, &c)
|
||||||
}
|
}
|
||||||
|
|
||||||
smroot, err := amt.FromArray(bs, smsgCids)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var bmsgs []*types.Message
|
var bmsgs []*types.Message
|
||||||
var bmsgCids []cbg.CBORMarshaler
|
var bmsgCids []cbg.CBORMarshaler
|
||||||
for _, m := range bmi[bi] {
|
for _, m := range bmi[bi] {
|
||||||
@ -184,15 +211,7 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes
|
|||||||
bmsgCids = append(bmsgCids, &c)
|
bmsgCids = append(bmsgCids, &c)
|
||||||
}
|
}
|
||||||
|
|
||||||
bmroot, err := amt.FromArray(bs, bmsgCids)
|
mrcid, err := computeMsgMeta(bs, bmsgCids, smsgCids)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
mrcid, err := bs.Put(&types.MsgMeta{
|
|
||||||
BlsMessages: bmroot,
|
|
||||||
SecpkMessages: smroot,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -213,6 +232,28 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes
|
|||||||
return fts, nil
|
return fts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func computeMsgMeta(bs amt.Blocks, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) {
|
||||||
|
bmroot, err := amt.FromArray(bs, bmsgCids)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
smroot, err := amt.FromArray(bs, smsgCids)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mrcid, err := bs.Put(&types.MsgMeta{
|
||||||
|
BlsMessages: bmroot,
|
||||||
|
SecpkMessages: smroot,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("failed to put msgmeta: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return mrcid, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) selectHead(heads map[peer.ID]*types.TipSet) (*types.TipSet, error) {
|
func (syncer *Syncer) selectHead(heads map[peer.ID]*types.TipSet) (*types.TipSet, error) {
|
||||||
var headsArr []*types.TipSet
|
var headsArr []*types.TipSet
|
||||||
for _, ts := range heads {
|
for _, ts := range heads {
|
||||||
@ -289,21 +330,22 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
|
|||||||
return fts, nil
|
return fts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error {
|
func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
|
||||||
|
|
||||||
|
ctx := context.TODO()
|
||||||
syncer.syncLock.Lock()
|
syncer.syncLock.Lock()
|
||||||
defer syncer.syncLock.Unlock()
|
defer syncer.syncLock.Unlock()
|
||||||
|
|
||||||
ts := maybeHead.TipSet()
|
if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) {
|
||||||
if syncer.Genesis.Equals(ts) || syncer.store.GetHeaviestTipSet().Equals(ts) {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := syncer.collectChain(maybeHead); err != nil {
|
if err := syncer.collectChain(ctx, maybeHead); err != nil {
|
||||||
return xerrors.Errorf("collectChain failed: %w", err)
|
return xerrors.Errorf("collectChain failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := syncer.store.PutTipSet(maybeHead); err != nil {
|
if err := syncer.store.PutTipSet(maybeHead); err != nil {
|
||||||
return errors.Wrap(err, "failed to put synced tipset to chainstore")
|
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -384,7 +426,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
|
|||||||
|
|
||||||
baseTs, err := syncer.store.LoadTipSet(h.Parents)
|
baseTs, err := syncer.store.LoadTipSet(h.Parents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("load tipset failed: %w", err)
|
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stateroot, precp, err := syncer.sm.TipSetState(baseTs)
|
stateroot, precp, err := syncer.sm.TipSetState(baseTs)
|
||||||
@ -464,7 +506,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
if nonces[m.From] != m.Nonce {
|
if nonces[m.From] != m.Nonce {
|
||||||
return xerrors.Errorf("wrong nonce")
|
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce)
|
||||||
}
|
}
|
||||||
nonces[m.From]++
|
nonces[m.From]++
|
||||||
|
|
||||||
@ -476,22 +518,49 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bs := amt.WrapBlockstore(syncer.store.Blockstore())
|
||||||
|
var blsCids []cbg.CBORMarshaler
|
||||||
for i, m := range b.BlsMessages {
|
for i, m := range b.BlsMessages {
|
||||||
if err := checkMsg(m); err != nil {
|
if err := checkMsg(m); err != nil {
|
||||||
xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
|
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
|
||||||
}
|
}
|
||||||
|
c := cbg.CborCid(m.Cid())
|
||||||
|
blsCids = append(blsCids, &c)
|
||||||
|
}
|
||||||
|
bmroot, err := amt.FromArray(bs, blsCids)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to build amt from bls msg cids: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var secpkCids []cbg.CBORMarshaler
|
||||||
for i, m := range b.SecpkMessages {
|
for i, m := range b.SecpkMessages {
|
||||||
if err := checkMsg(&m.Message); err != nil {
|
if err := checkMsg(&m.Message); err != nil {
|
||||||
xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
|
return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
|
||||||
}
|
}
|
||||||
|
c := cbg.CborCid(m.Cid())
|
||||||
|
secpkCids = append(secpkCids, &c)
|
||||||
|
}
|
||||||
|
smroot, err := amt.FromArray(bs, secpkCids)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to build amt from bls msg cids: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mrcid, err := bs.Put(&types.MsgMeta{
|
||||||
|
BlsMessages: bmroot,
|
||||||
|
SecpkMessages: smroot,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.Messages != mrcid {
|
||||||
|
return fmt.Errorf("messages didnt match message root in header")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||||
blockSet := []*types.TipSet{from}
|
blockSet := []*types.TipSet{from}
|
||||||
|
|
||||||
at := from.Parents()
|
at := from.Parents()
|
||||||
@ -558,15 +627,47 @@ loop:
|
|||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle the case where we are on a fork and its not a simple fast forward
|
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
||||||
// need to walk back to either a common ancestor, or until we hit the fork length threshold.
|
fork, err := syncer.syncFork(ctx, last, to)
|
||||||
return nil, xerrors.Errorf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to sync fork: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blockSet = append(blockSet, fork...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||||
|
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nts, err := syncer.store.LoadTipSet(to.Parents())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for cur := 0; cur < len(tips); {
|
||||||
|
|
||||||
|
if nts.Equals(tips[cur]) {
|
||||||
|
return tips[:cur+1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if nts.Height() < tips[cur].Height() {
|
||||||
|
cur++
|
||||||
|
} else {
|
||||||
|
nts, err = syncer.store.LoadTipSet(nts.Parents())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("loading next local tipset: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, xerrors.Errorf("fork was longer than our threshold")
|
||||||
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
|
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
|
||||||
syncer.syncState.SetHeight(0)
|
syncer.syncState.SetHeight(0)
|
||||||
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
|
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
|
||||||
@ -671,10 +772,10 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
|
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
|
||||||
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), fts.TipSet())
|
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
|
||||||
|
|
||||||
headers, err := syncer.collectHeaders(fts.TipSet(), syncer.store.GetHeaviestTipSet())
|
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -4,16 +4,20 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
|
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
"github.com/filecoin-project/go-lotus/api"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
"github.com/filecoin-project/go-lotus/chain/gen"
|
"github.com/filecoin-project/go-lotus/chain/gen"
|
||||||
"github.com/filecoin-project/go-lotus/chain/store"
|
"github.com/filecoin-project/go-lotus/chain/store"
|
||||||
"github.com/filecoin-project/go-lotus/chain/types"
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
"github.com/filecoin-project/go-lotus/node"
|
"github.com/filecoin-project/go-lotus/node"
|
||||||
|
"github.com/filecoin-project/go-lotus/node/impl"
|
||||||
"github.com/filecoin-project/go-lotus/node/modules"
|
"github.com/filecoin-project/go-lotus/node/modules"
|
||||||
"github.com/filecoin-project/go-lotus/node/repo"
|
"github.com/filecoin-project/go-lotus/node/repo"
|
||||||
)
|
)
|
||||||
@ -27,11 +31,12 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [
|
|||||||
mts, err := tu.g.NextTipSet()
|
mts, err := tu.g.NextTipSet()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fmt.Println("REPO WITH CHAIN NEXT TIPSET: ", mts.TipSet.TipSet().Height())
|
||||||
|
|
||||||
blks[i] = mts.TipSet
|
blks[i] = mts.TipSet
|
||||||
|
|
||||||
ts := mts.TipSet.TipSet()
|
ts := mts.TipSet.TipSet()
|
||||||
fmt.Printf("tipset at H:%d: %s\n", ts.Height(), ts.Cids())
|
fmt.Printf("tipset at H:%d: %s\n", ts.Height(), ts.Cids())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := tu.g.YieldRepo()
|
r, err := tu.g.YieldRepo()
|
||||||
@ -91,17 +96,70 @@ func (tu *syncTestUtil) Shutdown() {
|
|||||||
tu.cancel()
|
tu.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) mineNewBlock(src int) {
|
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet) {
|
||||||
mts, err := tu.g.NextTipSet()
|
// TODO: would be great if we could pass a whole tipset here...
|
||||||
|
for _, fb := range fts.Blocks {
|
||||||
|
var b types.BlockMsg
|
||||||
|
|
||||||
|
// -1 to match block.Height
|
||||||
|
b.Header = fb.Header
|
||||||
|
for _, msg := range fb.SecpkMessages {
|
||||||
|
c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg)
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
for _, msg := range mts.Messages {
|
b.SecpkMessages = append(b.SecpkMessages, c)
|
||||||
require.NoError(tu.t, tu.nds[src].MpoolPush(context.TODO(), msg))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, fblk := range mts.TipSet.Blocks {
|
for _, msg := range fb.BlsMessages {
|
||||||
require.NoError(tu.t, tu.nds[src].ChainSubmitBlock(context.TODO(), fblkToBlkMsg(fblk)))
|
c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
b.BlsMessages = append(b.BlsMessages, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
h, err := tu.nds[to].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
for !h.Equals(fts.TipSet()) {
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
h, err = tu.nds[to].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
if time.Since(start) > time.Second*10 {
|
||||||
|
tu.t.Fatal("took too long waiting for block to be accepted")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int) *store.FullTipSet {
|
||||||
|
if miners == nil {
|
||||||
|
for i := range tu.g.Miners {
|
||||||
|
miners = append(miners, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var maddrs []address.Address
|
||||||
|
for _, i := range miners {
|
||||||
|
maddrs = append(maddrs, tu.g.Miners[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Miner mining block: ", maddrs)
|
||||||
|
|
||||||
|
mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
tu.pushFtsAndWait(src, mts.TipSet)
|
||||||
|
|
||||||
|
return mts.TipSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) mineNewBlock(src int, miners []int) {
|
||||||
|
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners)
|
||||||
|
tu.g.CurTipset = mts
|
||||||
}
|
}
|
||||||
|
|
||||||
func fblkToBlkMsg(fb *types.FullBlock) *types.BlockMsg {
|
func fblkToBlkMsg(fb *types.FullBlock) *types.BlockMsg {
|
||||||
@ -137,6 +195,12 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
|
|||||||
)
|
)
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
lastTs := blocks[len(blocks)-1].Blocks
|
||||||
|
for _, lastB := range lastTs {
|
||||||
|
err = out.(*impl.FullNodeAPI).ChainAPI.Chain.AddBlock(lastB.Header)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
}
|
||||||
|
|
||||||
tu.genesis = genesis
|
tu.genesis = genesis
|
||||||
tu.blocks = blocks
|
tu.blocks = blocks
|
||||||
tu.nds = append(tu.nds, out) // always at 0
|
tu.nds = append(tu.nds, out) // always at 0
|
||||||
@ -164,6 +228,13 @@ func (tu *syncTestUtil) addClientNode() int {
|
|||||||
return len(tu.nds) - 1
|
return len(tu.nds) - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) pid(n int) peer.ID {
|
||||||
|
nal, err := tu.nds[n].NetAddrsListen(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
return nal.ID
|
||||||
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) connect(from, to int) {
|
func (tu *syncTestUtil) connect(from, to int) {
|
||||||
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
|
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
@ -172,6 +243,14 @@ func (tu *syncTestUtil) connect(from, to int) {
|
|||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) disconnect(from, to int) {
|
||||||
|
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
err = tu.nds[from].NetDisconnect(tu.ctx, toPI.ID)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) checkHeight(name string, n int, h int) {
|
func (tu *syncTestUtil) checkHeight(name string, n int, h int) {
|
||||||
b, err := tu.nds[n].ChainHead(tu.ctx)
|
b, err := tu.nds[n].ChainHead(tu.ctx)
|
||||||
require.NoError(tu.t, err)
|
require.NoError(tu.t, err)
|
||||||
@ -209,14 +288,18 @@ func (tu *syncTestUtil) compareSourceState(with int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tu *syncTestUtil) waitUntilSync(from, to int) {
|
func (tu *syncTestUtil) waitUntilSync(from, to int) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
target, err := tu.nds[from].ChainHead(tu.ctx)
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
target, err := tu.nds[from].ChainHead(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tu.t.Fatal(err)
|
tu.t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tu.waitUntilSyncTarget(to, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) waitUntilSyncTarget(to int, target *types.TipSet) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
hc, err := tu.nds[to].ChainNotify(ctx)
|
hc, err := tu.nds[to].ChainNotify(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tu.t.Fatal(err)
|
tu.t.Fatal(err)
|
||||||
@ -232,32 +315,6 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
|
|
||||||
// utility to simulate incoming blocks without miner process
|
|
||||||
// TODO: should call syncer directly, this won't work correctly in all cases
|
|
||||||
|
|
||||||
var b chain.BlockMsg
|
|
||||||
|
|
||||||
// -1 to match block.Height
|
|
||||||
b.Header = tu.blocks[h-1].Header
|
|
||||||
for _, msg := range tu.blocks[h-1].SecpkMessages {
|
|
||||||
c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg)
|
|
||||||
require.NoError(tu.t, err)
|
|
||||||
|
|
||||||
b.SecpkMessages = append(b.SecpkMessages, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) {
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
tu.submitSourceBlock(to, h+i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
func TestSyncSimple(t *testing.T) {
|
func TestSyncSimple(t *testing.T) {
|
||||||
H := 50
|
H := 50
|
||||||
tu := prepSyncTest(t, H)
|
tu := prepSyncTest(t, H)
|
||||||
@ -290,12 +347,78 @@ func TestSyncMining(t *testing.T) {
|
|||||||
tu.compareSourceState(client)
|
tu.compareSourceState(client)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
tu.mineNewBlock(0)
|
tu.mineNewBlock(0, nil)
|
||||||
tu.waitUntilSync(0, client)
|
tu.waitUntilSync(0, client)
|
||||||
tu.compareSourceState(client)
|
tu.compareSourceState(client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tu *syncTestUtil) loadChainToNode(to int) {
|
||||||
|
// utility to simulate incoming blocks without miner process
|
||||||
|
// TODO: should call syncer directly, this won't work correctly in all cases
|
||||||
|
|
||||||
|
for i := 0; i < len(tu.blocks); i++ {
|
||||||
|
tu.pushFtsAndWait(to, tu.blocks[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncFork(t *testing.T) {
|
||||||
|
H := 10
|
||||||
|
tu := prepSyncTest(t, H)
|
||||||
|
|
||||||
|
p1 := tu.addClientNode()
|
||||||
|
p2 := tu.addClientNode()
|
||||||
|
|
||||||
|
fmt.Println("GENESIS: ", tu.g.Genesis().Cid())
|
||||||
|
tu.loadChainToNode(p1)
|
||||||
|
tu.loadChainToNode(p2)
|
||||||
|
|
||||||
|
phead := func() {
|
||||||
|
h1, err := tu.nds[1].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
h2, err := tu.nds[2].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
fmt.Println("Node 1: ", h1.Cids(), h1.Parents(), h1.Height())
|
||||||
|
fmt.Println("Node 2: ", h2.Cids(), h1.Parents(), h2.Height())
|
||||||
|
//time.Sleep(time.Second * 2)
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
|
||||||
|
phead()
|
||||||
|
|
||||||
|
base := tu.g.CurTipset
|
||||||
|
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
|
||||||
|
|
||||||
|
// The two nodes fork at this point into 'a' and 'b'
|
||||||
|
a1 := tu.mineOnBlock(base, p1, []int{0})
|
||||||
|
a := tu.mineOnBlock(a1, p1, []int{0})
|
||||||
|
a = tu.mineOnBlock(a, p1, []int{0})
|
||||||
|
|
||||||
|
tu.g.ResyncBankerNonce(a1.TipSet())
|
||||||
|
// chain B will now be heaviest
|
||||||
|
b := tu.mineOnBlock(base, p2, []int{1})
|
||||||
|
b = tu.mineOnBlock(b, p2, []int{1})
|
||||||
|
b = tu.mineOnBlock(b, p2, []int{1})
|
||||||
|
b = tu.mineOnBlock(b, p2, []int{1})
|
||||||
|
|
||||||
|
fmt.Println("A: ", a.Cids(), a.TipSet().Height())
|
||||||
|
fmt.Println("B: ", b.Cids(), b.TipSet().Height())
|
||||||
|
|
||||||
|
// Now for the fun part!!
|
||||||
|
|
||||||
|
require.NoError(t, tu.mn.LinkAll())
|
||||||
|
tu.connect(p1, p2)
|
||||||
|
tu.waitUntilSyncTarget(p1, b.TipSet())
|
||||||
|
tu.waitUntilSyncTarget(p2, b.TipSet())
|
||||||
|
|
||||||
|
phead()
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkSyncBasic(b *testing.B) {
|
func BenchmarkSyncBasic(b *testing.B) {
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
runSyncBenchLength(b, 100)
|
runSyncBenchLength(b, 100)
|
||||||
@ -315,44 +438,3 @@ func runSyncBenchLength(b *testing.B, l int) {
|
|||||||
|
|
||||||
tu.waitUntilSync(0, client)
|
tu.waitUntilSync(0, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
TODO: this is broken because of how tu.submitSourceBlock works now
|
|
||||||
func TestSyncManual(t *testing.T) {
|
|
||||||
H := 20
|
|
||||||
tu := prepSyncTest(t, H)
|
|
||||||
|
|
||||||
client := tu.addClientNode()
|
|
||||||
tu.checkHeight("client", client, 0)
|
|
||||||
|
|
||||||
tu.submitSourceBlocks(client, 1, H)
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 1)
|
|
||||||
|
|
||||||
tu.checkHeight("client", client, H)
|
|
||||||
|
|
||||||
tu.compareSourceState(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSyncIncoming(t *testing.T) {
|
|
||||||
H := 1
|
|
||||||
tu := prepSyncTest(t, H)
|
|
||||||
|
|
||||||
producer := tu.addClientNode()
|
|
||||||
client := tu.addClientNode()
|
|
||||||
|
|
||||||
tu.mn.LinkAll()
|
|
||||||
tu.connect(client, producer)
|
|
||||||
|
|
||||||
for h := 0; h < H; h++ {
|
|
||||||
tu.submitSourceBlock(producer, h + 1)
|
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 200)
|
|
||||||
|
|
||||||
}
|
|
||||||
tu.checkHeight("client", client, H)
|
|
||||||
tu.checkHeight("producer", producer, H)
|
|
||||||
|
|
||||||
tu.compareSourceState(client)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
@ -56,7 +56,7 @@ func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool {
|
|||||||
tj := blks[j].LastTicket()
|
tj := blks[j].LastTicket()
|
||||||
|
|
||||||
if ti.Equals(tj) {
|
if ti.Equals(tj) {
|
||||||
log.Warn("blocks have same ticket")
|
//log.Warn("blocks have same ticket")
|
||||||
return blks[i].Cid().KeyString() < blks[j].Cid().KeyString()
|
return blks[i].Cid().KeyString() < blks[j].Cid().KeyString()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,9 +27,7 @@ func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
|
||||||
if err := a.Chain.AddBlock(blk.Header); err != nil {
|
// TODO: should we have some sort of fast path to adding a local block?
|
||||||
return xerrors.Errorf("AddBlock failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := blk.Serialize()
|
b, err := blk.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -99,6 +97,11 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// genesis block has no parent messages...
|
||||||
|
if b.Height == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: need to get the number of messages better than this
|
// TODO: need to get the number of messages better than this
|
||||||
pts, err := a.Chain.LoadTipSet(b.Parents)
|
pts, err := a.Chain.LoadTipSet(b.Parents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -124,6 +127,10 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.Height == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: need to get the number of messages better than this
|
// TODO: need to get the number of messages better than this
|
||||||
pts, err := a.Chain.LoadTipSet(b.Parents)
|
pts, err := a.Chain.LoadTipSet(b.Parents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user