Dump the block validation cache whenever we perform an import

This solves a problem with folks resurrecting long-out-of-sync nodes via
snapshot imports.

The interface switch to Batching is necessary: startup is too long otherwise
( 8 minutes just to clear everything on a relatively old node )
This commit is contained in:
Peter Rabbitson 2020-10-09 22:41:34 +02:00
parent 83ba7ec8cc
commit 6610a247cf
2 changed files with 46 additions and 2 deletions

View File

@ -8,6 +8,7 @@ import (
"io"
"os"
"strconv"
"strings"
"sync"
"golang.org/x/sync/errgroup"
@ -38,7 +39,9 @@ import (
lru "github.com/hashicorp/golang-lru"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dstore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
car "github.com/ipld/go-car"
@ -102,7 +105,7 @@ type HeadChangeEvt struct {
// 2. a block => messages references cache.
type ChainStore struct {
bs bstore.Blockstore
ds dstore.Datastore
ds dstore.Batching
heaviestLk sync.Mutex
heaviest *types.TipSet
@ -446,6 +449,43 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet)
return nil
}
// FlushValidationCache removes all results of block validation from the
// chain metadata store. Usually the first step after a new chain import.
func (cs *ChainStore) FlushValidationCache() error {
log.Infof("clearing block validation cache...")
dsWalk, err := cs.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("failed to initialize key listing query: %w", err)
}
allKeys, err := dsWalk.Rest()
if err != nil {
return xerrors.Errorf("failed to run key listing query: %w", err)
}
batch, err := cs.ds.Batch()
if err != nil {
return xerrors.Errorf("failed to open a DS batch: %w", err)
}
delCnt := 0
for _, k := range allKeys {
if strings.HasPrefix(k.Key, blockValidationCacheKeyPrefix.String()) {
delCnt++
batch.Delete(datastore.RawKey(k.Key))
}
}
if err := batch.Commit(); err != nil {
return xerrors.Errorf("failed to commit the DS batch: %w", err)
}
log.Infof("%d block validation entries cleared.", delCnt)
return nil
}
// SetHead sets the chainstores current 'best' head node.
// This should only be called if something is broken and needs fixing
func (cs *ChainStore) SetHead(ts *types.TipSet) error {

View File

@ -436,6 +436,10 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
return xerrors.Errorf("importing chain failed: %w", err)
}
if err := cst.FlushValidationCache(); err != nil {
return xerrors.Errorf("flushing validation cache failed: %w", err)
}
gb, err := cst.GetTipsetByHeight(context.TODO(), 0, ts, true)
if err != nil {
return err
@ -455,7 +459,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
}
}
log.Info("accepting %s as new head", ts.Cids())
log.Infof("accepting %s as new head", ts.Cids())
if err := cst.SetHead(ts); err != nil {
return err
}