Merge pull request #4287 from filecoin-project/feat/flush_validation_cache_on_chain_import
Dump the block validation cache whenever we perform an import
This commit is contained in:
commit
151577f04e
@ -8,6 +8,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@ -38,7 +39,9 @@ import (
|
|||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
block "github.com/ipfs/go-block-format"
|
block "github.com/ipfs/go-block-format"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
dstore "github.com/ipfs/go-datastore"
|
dstore "github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/query"
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
car "github.com/ipld/go-car"
|
car "github.com/ipld/go-car"
|
||||||
@ -102,7 +105,7 @@ type HeadChangeEvt struct {
|
|||||||
// 2. a block => messages references cache.
|
// 2. a block => messages references cache.
|
||||||
type ChainStore struct {
|
type ChainStore struct {
|
||||||
bs bstore.Blockstore
|
bs bstore.Blockstore
|
||||||
ds dstore.Datastore
|
ds dstore.Batching
|
||||||
|
|
||||||
heaviestLk sync.Mutex
|
heaviestLk sync.Mutex
|
||||||
heaviest *types.TipSet
|
heaviest *types.TipSet
|
||||||
@ -446,6 +449,53 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FlushValidationCache removes all results of block validation from the
|
||||||
|
// chain metadata store. Usually the first step after a new chain import.
|
||||||
|
func (cs *ChainStore) FlushValidationCache() error {
|
||||||
|
log.Infof("clearing block validation cache...")
|
||||||
|
|
||||||
|
dsWalk, err := cs.ds.Query(query.Query{
|
||||||
|
// Potential TODO: the validation cache is not a namespace on its own
|
||||||
|
// but is rather constructed as prefixed-key `foo:bar` via .Instance(), which
|
||||||
|
// in turn does not work with the filter, which can match only on `foo/bar`
|
||||||
|
//
|
||||||
|
// If this is addressed (blockcache goes into its own sub-namespace) then
|
||||||
|
// strings.HasPrefix(...) below can be skipped
|
||||||
|
//
|
||||||
|
//Prefix: blockValidationCacheKeyPrefix.String()
|
||||||
|
KeysOnly: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to initialize key listing query: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
allKeys, err := dsWalk.Rest()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to run key listing query: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
batch, err := cs.ds.Batch()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to open a DS batch: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
delCnt := 0
|
||||||
|
for _, k := range allKeys {
|
||||||
|
if strings.HasPrefix(k.Key, blockValidationCacheKeyPrefix.String()) {
|
||||||
|
delCnt++
|
||||||
|
batch.Delete(datastore.RawKey(k.Key)) // nolint:errcheck
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := batch.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("failed to commit the DS batch: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("%d block validation entries cleared.", delCnt)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SetHead sets the chainstores current 'best' head node.
|
// SetHead sets the chainstores current 'best' head node.
|
||||||
// This should only be called if something is broken and needs fixing
|
// This should only be called if something is broken and needs fixing
|
||||||
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
|
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
|
||||||
|
@ -436,6 +436,10 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
return xerrors.Errorf("importing chain failed: %w", err)
|
return xerrors.Errorf("importing chain failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := cst.FlushValidationCache(); err != nil {
|
||||||
|
return xerrors.Errorf("flushing validation cache failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
gb, err := cst.GetTipsetByHeight(context.TODO(), 0, ts, true)
|
gb, err := cst.GetTipsetByHeight(context.TODO(), 0, ts, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -455,7 +459,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("accepting %s as new head", ts.Cids())
|
log.Infof("accepting %s as new head", ts.Cids())
|
||||||
if err := cst.SetHead(ts); err != nil {
|
if err := cst.SetHead(ts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user