call sync directly during submitblock

This commit is contained in:
whyrusleeping 2019-10-15 21:19:10 +09:00
parent 7c26e3c35f
commit aa644fcbbb
3 changed files with 73 additions and 33 deletions

View File

@ -94,7 +94,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
}
for _, b := range fts.Blocks {
if err := syncer.validateMsgMeta(b); err != nil {
if err := syncer.ValidateMsgMeta(b); err != nil {
log.Warnf("invalid block received: %s", err)
return
}
@ -122,7 +122,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
}()
}
func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error {
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
var bcids, scids []cbg.CBORMarshaler
for _, m := range fblk.BlsMessages {
c := cbg.CborCid(m.Cid())

View File

@ -32,9 +32,6 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [
require.NoError(t, err)
blks[i] = mts.TipSet
ts := mts.TipSet.TipSet()
fmt.Printf("tipset at H:%d: %s\n", ts.Height(), ts.Cids())
}
r, err := tu.g.YieldRepo()
@ -94,8 +91,38 @@ func (tu *syncTestUtil) Shutdown() {
tu.cancel()
}
func (tu *syncTestUtil) printHeads() {
for i, n := range tu.nds {
head, err := n.ChainHead(tu.ctx)
if err != nil {
tu.t.Fatal(err)
}
fmt.Printf("Node %d: %s\n", i, head.Cids())
}
}
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool) {
// TODO: would be great if we could pass a whole tipset here...
tu.pushTsExpectErr(to, fts, false)
if wait {
start := time.Now()
h, err := tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
for !h.Equals(fts.TipSet()) {
time.Sleep(time.Millisecond * 50)
h, err = tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
if time.Since(start) > time.Second*10 {
tu.t.Fatal("took too long waiting for block to be accepted")
}
}
}
}
func (tu *syncTestUtil) pushTsExpectErr(to int, fts *store.FullTipSet, experr bool) {
for _, fb := range fts.Blocks {
var b types.BlockMsg
@ -115,26 +142,17 @@ func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool)
b.BlsMessages = append(b.BlsMessages, c)
}
require.NoError(tu.t, tu.nds[to].SyncSubmitBlock(tu.ctx, &b))
}
if wait {
start := time.Now()
h, err := tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
for !h.Equals(fts.TipSet()) {
time.Sleep(time.Millisecond * 50)
h, err = tu.nds[to].ChainHead(tu.ctx)
err := tu.nds[to].SyncSubmitBlock(tu.ctx, &b)
if experr {
require.Error(tu.t, err, "expected submit block to fail")
} else {
require.NoError(tu.t, err)
if time.Since(start) > time.Second*10 {
tu.t.Fatal("took too long waiting for block to be accepted")
}
}
}
}
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int, wait bool) *store.FullTipSet {
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int, wait, fail bool) *store.FullTipSet {
if miners == nil {
for i := range tu.g.Miners {
miners = append(miners, i)
@ -151,13 +169,17 @@ func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int
mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs)
require.NoError(tu.t, err)
tu.pushFtsAndWait(src, mts.TipSet, wait)
if fail {
tu.pushTsExpectErr(src, mts.TipSet, true)
} else {
tu.pushFtsAndWait(src, mts.TipSet, wait)
}
return mts.TipSet
}
func (tu *syncTestUtil) mineNewBlock(src int, miners []int) {
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners, true)
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners, true, false)
tu.g.CurTipset = mts
}
@ -196,7 +218,9 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
lastTs := blocks[len(blocks)-1].Blocks
for _, lastB := range lastTs {
err = out.(*impl.FullNodeAPI).ChainAPI.Chain.AddBlock(lastB.Header)
cs := out.(*impl.FullNodeAPI).ChainAPI.Chain
require.NoError(tu.t, cs.AddToTipSetTracker(lastB.Header))
err = cs.AddBlock(lastB.Header)
require.NoError(tu.t, err)
}
@ -367,12 +391,17 @@ func TestSyncBadTimestamp(t *testing.T) {
return pts.MinTimestamp() + 2
}
a1 := tu.mineOnBlock(base, 0, nil, false)
fmt.Println("BASE: ", base.Cids())
tu.printHeads()
a1 := tu.mineOnBlock(base, 0, nil, false, true)
tu.g.Timestamper = nil
tu.g.ResyncBankerNonce(a1.TipSet())
a2 := tu.mineOnBlock(base, 0, nil, true)
fmt.Println("After mine bad block!")
tu.printHeads()
a2 := tu.mineOnBlock(base, 0, nil, true, false)
tu.waitUntilSync(0, client)
@ -426,16 +455,16 @@ func TestSyncFork(t *testing.T) {
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
// The two nodes fork at this point into 'a' and 'b'
a1 := tu.mineOnBlock(base, p1, []int{0}, true)
a := tu.mineOnBlock(a1, p1, []int{0}, true)
a = tu.mineOnBlock(a, p1, []int{0}, true)
a1 := tu.mineOnBlock(base, p1, []int{0}, true, false)
a := tu.mineOnBlock(a1, p1, []int{0}, true, false)
a = tu.mineOnBlock(a, p1, []int{0}, true, false)
tu.g.ResyncBankerNonce(a1.TipSet())
// chain B will now be heaviest
b := tu.mineOnBlock(base, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b := tu.mineOnBlock(base, p2, []int{1}, true, false)
b = tu.mineOnBlock(b, p2, []int{1}, true, false)
b = tu.mineOnBlock(b, p2, []int{1}, true, false)
b = tu.mineOnBlock(b, p2, []int{1}, true, false)
fmt.Println("A: ", a.Cids(), a.TipSet().Height())
fmt.Println("B: ", b.Cids(), b.TipSet().Height())

View File

@ -46,7 +46,18 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}
a.Syncer.InformNewBlock(a.Syncer.LocalPeer(), fb)
if err := a.Syncer.ValidateMsgMeta(fb); err != nil {
xerrors.Errorf("provided messages did not match block: %w", err)
}
ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header})
if err != nil {
return xerrors.Errorf("somehow failed to make a tipset out of a single block: %w", err)
}
if err := a.Syncer.Sync(ctx, ts); err != nil {
return xerrors.Errorf("sync to submitted block failed: %w", err)
}
b, err := blk.Serialize()
if err != nil {