chainstore lifecycle: close via Close() instead of context.
This commit is contained in:
parent
b13a41a44d
commit
0c6072a1a0
@ -245,7 +245,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
|||||||
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, ds, sys, j)
|
cs := store.NewChainStore(bs, bs, ds, sys, j)
|
||||||
|
|
||||||
genfb := &types.FullBlock{Header: genb.Genesis}
|
genfb := &types.FullBlock{Header: genb.Genesis}
|
||||||
gents := store.NewFullTipSet([]*types.FullBlock{genfb})
|
gents := store.NewFullTipSet([]*types.FullBlock{genfb})
|
||||||
|
@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto
|
|||||||
}
|
}
|
||||||
|
|
||||||
// temp chainstore
|
// temp chainstore
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, datastore.NewMapDatastore(), sys, j)
|
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j)
|
||||||
|
|
||||||
// Verify PreSealed Data
|
// Verify PreSealed Data
|
||||||
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)
|
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)
|
||||||
|
@ -31,7 +31,8 @@ func TestIndexSeeks(t *testing.T) {
|
|||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
|
|
||||||
nbs := blockstore.NewTemporarySync()
|
nbs := blockstore.NewTemporarySync()
|
||||||
cs := store.NewChainStore(context.Background(), nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
|
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
_, err = cs.Import(bytes.NewReader(gencar))
|
_, err = cs.Import(bytes.NewReader(gencar))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -134,24 +134,30 @@ type ChainStore struct {
|
|||||||
|
|
||||||
evtTypes [1]journal.EventType
|
evtTypes [1]journal.EventType
|
||||||
journal journal.Journal
|
journal journal.Journal
|
||||||
|
|
||||||
|
cancelFn context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// localbs is guaranteed to fail Get* if requested block isn't stored locally
|
// localbs is guaranteed to fail Get* if requested block isn't stored locally
|
||||||
func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
||||||
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
||||||
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
tsCache, _ := lru.NewARC(DefaultTipSetCacheSize)
|
||||||
if j == nil {
|
if j == nil {
|
||||||
j = journal.NilJournal()
|
j = journal.NilJournal()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
cs := &ChainStore{
|
cs := &ChainStore{
|
||||||
bs: bs,
|
bs: bs,
|
||||||
localbs: localbs,
|
localbs: localbs,
|
||||||
ds: ds,
|
ds: ds,
|
||||||
bestTips: pubsub.New(64),
|
bestTips: pubsub.New(64),
|
||||||
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
|
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
|
||||||
mmCache: c,
|
mmCache: mmCache,
|
||||||
tsCache: tsc,
|
tsCache: tsCache,
|
||||||
vmcalls: vmcalls,
|
vmcalls: vmcalls,
|
||||||
|
cancelFn: cancel,
|
||||||
journal: j,
|
journal: j,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,7 +198,7 @@ func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blo
|
|||||||
|
|
||||||
hcmetric := func(rev, app []*types.TipSet) error {
|
hcmetric := func(rev, app []*types.TipSet) error {
|
||||||
for _, r := range app {
|
for _, r := range app {
|
||||||
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
|
stats.Record(context.Background(), metrics.ChainNodeHeight.M(int64(r.Height())))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -203,6 +209,12 @@ func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blo
|
|||||||
return cs
|
return cs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs *ChainStore) Close() error {
|
||||||
|
cs.cancelFn()
|
||||||
|
cs.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) Load() error {
|
func (cs *ChainStore) Load() error {
|
||||||
head, err := cs.ds.Get(chainHeadKey)
|
head, err := cs.ds.Get(chainHeadKey)
|
||||||
if err == dstore.ErrNotFound {
|
if err == dstore.ErrNotFound {
|
||||||
@ -405,7 +417,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
|||||||
notifees := make([]ReorgNotifee, len(initialNotifees))
|
notifees := make([]ReorgNotifee, len(initialNotifees))
|
||||||
copy(notifees, initialNotifees)
|
copy(notifees, initialNotifees)
|
||||||
|
|
||||||
|
cs.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer cs.wg.Done()
|
||||||
defer log.Warn("reorgWorker quit")
|
defer log.Warn("reorgWorker quit")
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -70,7 +70,8 @@ func BenchmarkGetRandomness(b *testing.B) {
|
|||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, mds, nil, nil)
|
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
@ -104,7 +105,8 @@ func TestChainExportImport(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
nbs := blockstore.NewTemporary()
|
nbs := blockstore.NewTemporary()
|
||||||
cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), nil, nil)
|
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
root, err := cs.Import(buf)
|
root, err := cs.Import(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -138,7 +140,9 @@ func TestChainExportImportFull(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
nbs := blockstore.NewTemporary()
|
nbs := blockstore.NewTemporary()
|
||||||
cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), nil, nil)
|
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
root, err := cs.Import(buf)
|
root, err := cs.Import(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -262,7 +262,9 @@ var importBenchCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
metadataDs := datastore.NewMapDatastore()
|
metadataDs := datastore.NewMapDatastore()
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, metadataDs, vm.Syscalls(verifier), nil)
|
cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
stm := stmgr.NewStateManager(cs)
|
stm := stmgr.NewStateManager(cs)
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -188,7 +188,8 @@ var chainBalanceStateCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
cst := cbor.NewCborStore(bs)
|
cst := cbor.NewCborStore(bs)
|
||||||
store := adt.WrapStore(ctx, cst)
|
store := adt.WrapStore(ctx, cst)
|
||||||
@ -408,7 +409,8 @@ var chainPledgeCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
cst := cbor.NewCborStore(bs)
|
cst := cbor.NewCborStore(bs)
|
||||||
store := adt.WrapStore(ctx, cst)
|
store := adt.WrapStore(ctx, cst)
|
||||||
|
@ -90,7 +90,9 @@ var exportChainCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, mds, nil, nil)
|
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
if err := cs.Load(); err != nil {
|
if err := cs.Load(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,8 @@ var genesisVerifyCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
|
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, datastore.NewMapDatastore(), nil, nil)
|
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
cf := cctx.Args().Get(0)
|
cf := cctx.Args().Get(0)
|
||||||
f, err := os.Open(cf)
|
f, err := os.Open(cf)
|
||||||
|
@ -169,7 +169,9 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
if err := cs.Load(); err != nil {
|
if err := cs.Load(); err != nil {
|
||||||
return fmt.Errorf("loading chainstore: %w", err)
|
return fmt.Errorf("loading chainstore: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -420,9 +420,8 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
return xerrors.Errorf("failed to open journal: %w", err)
|
return xerrors.Errorf("failed to open journal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
|
||||||
defer cancel()
|
defer cst.Close()
|
||||||
cst := store.NewChainStore(ctx, bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
|
|
||||||
|
|
||||||
log.Infof("importing chain from %s...", fname)
|
log.Infof("importing chain from %s...", fname)
|
||||||
|
|
||||||
@ -467,7 +466,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("accepting %s as new head", ts.Cids())
|
log.Infof("accepting %s as new head", ts.Cids())
|
||||||
if err := cst.ForceHeadSilent(ctx, ts); err != nil {
|
if err := cst.ForceHeadSilent(context.Background(), ts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,10 +87,12 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
|
|||||||
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
|
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
|
||||||
vmRand = NewFixedRand()
|
vmRand = NewFixedRand()
|
||||||
|
|
||||||
cs = store.NewChainStore(context.Background(), bs, bs, ds, syscalls, nil)
|
cs = store.NewChainStore(bs, bs, ds, syscalls, nil)
|
||||||
sm = stmgr.NewStateManager(cs)
|
sm = stmgr.NewStateManager(cs)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
defer cs.Close() //nolint:errcheck
|
||||||
|
|
||||||
blocks := make([]store.BlockMessages, 0, len(tipset.Blocks))
|
blocks := make([]store.BlockMessages, 0, len(tipset.Blocks))
|
||||||
for _, b := range tipset.Blocks {
|
for _, b := range tipset.Blocks {
|
||||||
sb := store.BlockMessages{
|
sb := store.BlockMessages{
|
||||||
|
@ -111,14 +111,19 @@ func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ChainStore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)
|
||||||
chain := store.NewChainStore(ctx, bs, lbs, ds, syscalls, j)
|
|
||||||
|
|
||||||
if err := chain.Load(); err != nil {
|
if err := chain.Load(); err != nil {
|
||||||
log.Warnf("loading chain state from disk: %s", err)
|
log.Warnf("loading chain state from disk: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStop: func(_ context.Context) error {
|
||||||
|
return chain.Close()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
return chain
|
return chain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user