Merge pull request #573 from filecoin-project/feat/mark-bad-chains

sync: Mark chains with bad blocks as bad too
This commit is contained in:
Whyrusleeping 2019-11-13 05:12:06 +09:00 committed by GitHub
commit aa26935533
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 17 deletions

View File

@ -122,7 +122,7 @@ func init() {
} }
// Sync // Sync
const BadBlockCacheSize = 8192 const BadBlockCacheSize = 1 << 15
// assuming 4000 blocks per round, this lets us not lose any messages across a // assuming 4000 blocks per round, this lets us not lose any messages across a
// 10 block reorg. // 10 block reorg.

View File

@ -129,6 +129,7 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR
log.Warn("encountered error while responding to block sync request: ", err) log.Warn("encountered error while responding to block sync request: ", err)
return &BlockSyncResponse{ return &BlockSyncResponse{
Status: 203, Status: 203,
Message: err.Error(),
}, nil }, nil
} }

View File

@ -269,7 +269,7 @@ func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Ad
return nil, xerrors.Errorf("making a block for next tipset failed: %w", err) return nil, xerrors.Errorf("making a block for next tipset failed: %w", err)
} }
if err := cg.cs.PersistBlockHeader(fblk.Header); err != nil { if err := cg.cs.PersistBlockHeaders(fblk.Header); err != nil {
return nil, xerrors.Errorf("chainstore AddBlock: %w", err) return nil, xerrors.Errorf("chainstore AddBlock: %w", err)
} }

View File

@ -322,7 +322,7 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
if !ok { if !ok {
return nil return nil
} }
sig, ok := val.(*types.Signature) sig, ok := val.(types.Signature)
if !ok { if !ok {
log.Warnf("value in signature cache was not a signature (got %T)", val) log.Warnf("value in signature cache was not a signature (got %T)", val)
return nil return nil
@ -330,6 +330,6 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
return &types.SignedMessage{ return &types.SignedMessage{
Message: *msg, Message: *msg,
Signature: *sig, Signature: sig,
} }
} }

View File

@ -198,7 +198,7 @@ func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error { func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
if err := cs.PersistBlockHeader(b); err != nil { if err := cs.PersistBlockHeaders(b); err != nil {
return err return err
} }
} }
@ -423,13 +423,17 @@ func (cs *ChainStore) AddToTipSetTracker(b *types.BlockHeader) error {
return nil return nil
} }
func (cs *ChainStore) PersistBlockHeader(b *types.BlockHeader) error { func (cs *ChainStore) PersistBlockHeaders(b ...*types.BlockHeader) (err error) {
sb, err := b.ToStorageBlock() sbs := make([]block.Block, len(b))
if err != nil {
return err for i, header := range b {
sbs[i], err = header.ToStorageBlock()
if err != nil {
return err
}
} }
return cs.bs.Put(sb) return cs.bs.PutMany(sbs)
} }
type storable interface { type storable interface {
@ -487,7 +491,7 @@ func (cs *ChainStore) expandTipset(b *types.BlockHeader) (*types.TipSet, error)
} }
func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error { func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error {
if err := cs.PersistBlockHeader(b); err != nil { if err := cs.PersistBlockHeaders(b); err != nil {
return err return err
} }

View File

@ -755,10 +755,16 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height()) syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height())
var acceptedBlocks []cid.Cid
loop: loop:
for blockSet[len(blockSet)-1].Height() > untilHeight { for blockSet[len(blockSet)-1].Height() > untilHeight {
for _, bc := range at { for _, bc := range at {
if syncer.bad.Has(bc) { if syncer.bad.Has(bc) {
for _, b := range acceptedBlocks {
syncer.bad.Add(b)
}
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc) return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc)
} }
} }
@ -766,6 +772,8 @@ loop:
// If, for some reason, we have a suffix of the chain locally, handle that here // If, for some reason, we have a suffix of the chain locally, handle that here
ts, err := syncer.store.LoadTipSet(at) ts, err := syncer.store.LoadTipSet(at)
if err == nil { if err == nil {
acceptedBlocks = append(acceptedBlocks, at...)
blockSet = append(blockSet, ts) blockSet = append(blockSet, ts)
at = ts.Parents() at = ts.Parents()
continue continue
@ -801,12 +809,18 @@ loop:
} }
for _, bc := range b.Cids() { for _, bc := range b.Cids() {
if syncer.bad.Has(bc) { if syncer.bad.Has(bc) {
for _, b := range acceptedBlocks {
syncer.bad.Add(b)
}
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc) return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc)
} }
} }
blockSet = append(blockSet, b) blockSet = append(blockSet, b)
} }
acceptedBlocks = append(acceptedBlocks, at...)
syncer.syncState.SetHeight(blks[len(blks)-1].Height()) syncer.syncState.SetHeight(blks[len(blks)-1].Height())
at = blks[len(blks)-1].Parents() at = blks[len(blks)-1].Parents()
} }
@ -990,13 +1004,14 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
syncer.syncState.SetStage(api.StagePersistHeaders) syncer.syncState.SetStage(api.StagePersistHeaders)
toPersist := make([]*types.BlockHeader, 0, len(headers)*build.BlocksPerEpoch)
for _, ts := range headers { for _, ts := range headers {
for _, b := range ts.Blocks() { toPersist = append(toPersist, ts.Blocks()...)
if err := syncer.store.PersistBlockHeader(b); err != nil {
return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
}
}
} }
if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
}
toPersist = nil
syncer.syncState.SetStage(api.StageMessages) syncer.syncState.SetStage(api.StageMessages)

View File

@ -16,7 +16,8 @@ import (
func main() { func main() {
logging.SetLogLevel("*", "INFO") logging.SetLogLevel("*", "INFO")
logging.SetLogLevel("dht", "ERROR") logging.SetLogLevel("dht", "ERROR")
logging.SetLogLevel("swarm", "WARN") logging.SetLogLevel("swarm2", "WARN")
logging.SetLogLevel("bitswap", "WARN")
local := []*cli.Command{ local := []*cli.Command{
DaemonCmd, DaemonCmd,