don't add bad blocks to tipsetcache

This commit is contained in:
whyrusleeping 2019-10-10 12:04:10 +09:00
parent 0c6fcde9c4
commit 85684ba7e2
6 changed files with 74 additions and 31 deletions

View File

@ -46,6 +46,8 @@ type ChainGen struct {
genesis *types.BlockHeader
CurTipset *store.FullTipSet
Timestamper func(*types.TipSet, int) uint64
w *wallet.Wallet
Miners []address.Address
@ -289,7 +291,12 @@ func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Ad
func (cg *ChainGen) makeBlock(parents *types.TipSet, m address.Address, eproof types.ElectionProof, tickets []*types.Ticket, msgs []*types.SignedMessage) (*types.FullBlock, error) {
ts := parents.MinTimestamp() + (uint64(len(tickets)) * build.BlockDelay)
var ts uint64
if cg.Timestamper != nil {
ts = cg.Timestamper(parents, len(tickets))
} else {
ts = parents.MinTimestamp() + (uint64(len(tickets)) * build.BlockDelay)
}
fblk, err := MinerCreateBlock(context.TODO(), cg.sm, cg.w, m, parents, tickets, eproof, msgs, ts)
if err != nil {

View File

@ -341,7 +341,7 @@ func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet {
return cs.heaviest
}
func (cs *ChainStore) addToTipSetTracker(b *types.BlockHeader) error {
func (cs *ChainStore) AddToTipSetTracker(b *types.BlockHeader) error {
cs.tstLk.Lock()
defer cs.tstLk.Unlock()
@ -366,10 +366,6 @@ func (cs *ChainStore) PersistBlockHeader(b *types.BlockHeader) error {
return err
}
if err := cs.addToTipSetTracker(b); err != nil {
return xerrors.Errorf("failed to insert new block (%s) into tipset tracker: %w", b.Cid(), err)
}
return cs.bs.Put(sb)
}

View File

@ -44,7 +44,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return
}
log.Debug("inform new block over pubsub")
log.Infof("inform new block over pubsub: %s from %s", blk.Header.Cid(), msg.GetFrom())
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,

View File

@ -359,6 +359,10 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet)
syncer.bad.Add(b.Cid())
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}
if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil {
return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err)
}
}
return nil
}
@ -657,7 +661,7 @@ loop:
}
for _, bc := range b.Cids() {
if syncer.bad.Has(bc) {
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc)
return nil, xerrors.Errorf("(chain contained block marked previously as bad (%s, %s)", from.Cids(), bc)
}
}
blockSet = append(blockSet, b)
@ -792,7 +796,6 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
return xerrors.Errorf("message processing failed: %w", err)
}
}
}
return nil
@ -828,6 +831,10 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
return err
}
if !headers[0].Equals(ts) {
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
}
syncer.syncState.SetStage(api.StagePersistHeaders)
for _, ts := range headers {

View File

@ -31,8 +31,6 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [
mts, err := tu.g.NextTipSet()
require.NoError(t, err)
fmt.Println("REPO WITH CHAIN NEXT TIPSET: ", mts.TipSet.TipSet().Height())
blks[i] = mts.TipSet
ts := mts.TipSet.TipSet()
@ -96,7 +94,7 @@ func (tu *syncTestUtil) Shutdown() {
tu.cancel()
}
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet) {
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool) {
// TODO: would be great if we could pass a whole tipset here...
for _, fb := range fts.Blocks {
var b types.BlockMsg
@ -121,21 +119,23 @@ func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet) {
}
start := time.Now()
h, err := tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
for !h.Equals(fts.TipSet()) {
time.Sleep(time.Millisecond * 50)
h, err = tu.nds[to].ChainHead(tu.ctx)
if wait {
start := time.Now()
h, err := tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
for !h.Equals(fts.TipSet()) {
time.Sleep(time.Millisecond * 50)
h, err = tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
if time.Since(start) > time.Second*10 {
tu.t.Fatal("took too long waiting for block to be accepted")
if time.Since(start) > time.Second*10 {
tu.t.Fatal("took too long waiting for block to be accepted")
}
}
}
}
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int) *store.FullTipSet {
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int, wait bool) *store.FullTipSet {
if miners == nil {
for i := range tu.g.Miners {
miners = append(miners, i)
@ -152,13 +152,13 @@ func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int
mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs)
require.NoError(tu.t, err)
tu.pushFtsAndWait(src, mts.TipSet)
tu.pushFtsAndWait(src, mts.TipSet, wait)
return mts.TipSet
}
func (tu *syncTestUtil) mineNewBlock(src int, miners []int) {
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners)
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners, true)
tu.g.CurTipset = mts
}
@ -353,12 +353,44 @@ func TestSyncMining(t *testing.T) {
}
}
func TestSyncBadTimestamp(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)
client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())
tu.connect(client, 0)
tu.waitUntilSync(0, client)
base := tu.g.CurTipset
tu.g.Timestamper = func(pts *types.TipSet, tl int) uint64 {
return pts.MinTimestamp() + 2
}
a1 := tu.mineOnBlock(base, 0, nil, false)
tu.g.Timestamper = nil
tu.g.ResyncBankerNonce(a1.TipSet())
a2 := tu.mineOnBlock(base, 0, nil, true)
tu.waitUntilSync(0, client)
head, err := tu.nds[0].ChainHead(tu.ctx)
require.NoError(t, err)
if !head.Equals(a2.TipSet()) {
t.Fatalf("expected head to be %s, but got %s", a2.Cids(), head.Cids())
}
}
func (tu *syncTestUtil) loadChainToNode(to int) {
// utility to simulate incoming blocks without miner process
// TODO: should call syncer directly, this won't work correctly in all cases
for i := 0; i < len(tu.blocks); i++ {
tu.pushFtsAndWait(to, tu.blocks[i])
tu.pushFtsAndWait(to, tu.blocks[i], true)
}
}
@ -395,16 +427,16 @@ func TestSyncFork(t *testing.T) {
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
// The two nodes fork at this point into 'a' and 'b'
a1 := tu.mineOnBlock(base, p1, []int{0})
a := tu.mineOnBlock(a1, p1, []int{0})
a = tu.mineOnBlock(a, p1, []int{0})
a1 := tu.mineOnBlock(base, p1, []int{0}, true)
a := tu.mineOnBlock(a1, p1, []int{0}, true)
a = tu.mineOnBlock(a, p1, []int{0}, true)
tu.g.ResyncBankerNonce(a1.TipSet())
// chain B will now be heaviest
b := tu.mineOnBlock(base, p2, []int{1})
b = tu.mineOnBlock(b, p2, []int{1})
b = tu.mineOnBlock(b, p2, []int{1})
b = tu.mineOnBlock(b, p2, []int{1})
b := tu.mineOnBlock(base, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
fmt.Println("A: ", a.Cids(), a.TipSet().Height())
fmt.Println("B: ", b.Cids(), b.TipSet().Height())

View File

@ -71,6 +71,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
return
}
log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer())
hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)
}