From 87d6daf7452a71d070e2afd2643609ea64aeb905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 16 Nov 2020 15:00:13 +0000 Subject: [PATCH 1/6] fix blockstore double open issue on --import-snapshot. Fixes https://github.com/filecoin-project/lotus/issues/4850. --- cmd/lotus/daemon.go | 8 -------- go.mod | 2 +- go.sum | 2 ++ 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index f49278a2b..3f833b969 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -410,14 +410,6 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { return xerrors.Errorf("failed to open blockstore: %w", err) } - defer func() { - if c, ok := bs.(io.Closer); ok { - if err := c.Close(); err != nil { - log.Warnf("failed to close blockstore: %s", err) - } - } - }() - mds, err := lr.Datastore("/metadata") if err != nil { return err diff --git a/go.mod b/go.mod index 8999e901c..b182cc007 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 github.com/ipfs/go-graphsync v0.5.0 - github.com/ipfs/go-ipfs-blockstore v1.0.2 + github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 diff --git a/go.sum b/go.sum index 5bd67d1cd..ca4843a51 100644 --- a/go.sum +++ b/go.sum @@ -569,6 +569,8 @@ github.com/ipfs/go-ipfs-blockstore v1.0.1 h1:fnuVj4XdZp4yExhd0CnUwAiMNJHiPnfInhi github.com/ipfs/go-ipfs-blockstore v1.0.1/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= github.com/ipfs/go-ipfs-blockstore v1.0.2 h1:Z8nUlBHK7wVKPKliQCQR9tLgUtz4J2QRbqFcJrqzM+E= github.com/ipfs/go-ipfs-blockstore v1.0.2/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= +github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f h1:AQQb5zZj7KKTEFh9EaAUXc5Q+F7SbYkjfYogZnEzfUc= +github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw= From d7b4f92f1f632bf870e57490c32a048acc7d5a9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 16 Nov 2020 15:52:19 +0000 Subject: [PATCH 2/6] add context to ChainStore. --- chain/gen/gen.go | 2 +- chain/gen/genesis/genesis.go | 2 +- chain/store/index_test.go | 2 +- chain/store/store.go | 5 ++--- chain/store/store_test.go | 6 +++--- cmd/lotus-bench/import.go | 2 +- cmd/lotus-shed/balances.go | 4 ++-- cmd/lotus-shed/export.go | 2 +- cmd/lotus-shed/genesis-verify.go | 2 +- cmd/lotus-shed/pruning.go | 2 +- cmd/lotus/daemon.go | 5 ++++- conformance/driver.go | 2 +- node/modules/chain.go | 5 +++-- 13 files changed, 22 insertions(+), 19 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index ef717dc75..70f0e3c4b 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -245,7 +245,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("make genesis block failed: %w", err) } - cs := store.NewChainStore(bs, bs, ds, sys, j) + cs := store.NewChainStore(context.Background(), bs, bs, ds, sys, j) genfb := &types.FullBlock{Header: genb.Genesis} gents := store.NewFullTipSet([]*types.FullBlock{genfb}) diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index e441af7ae..f67bfc97d 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto } // temp chainstore - cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j) + cs := store.NewChainStore(context.Background(), bs, bs, datastore.NewMapDatastore(), sys, j) // Verify PreSealed Data stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs) diff --git a/chain/store/index_test.go b/chain/store/index_test.go index 11ff4371f..84736ca1d 100644 --- a/chain/store/index_test.go +++ b/chain/store/index_test.go @@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) { ctx := context.TODO() nbs := blockstore.NewTemporarySync() - cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil) _, err = cs.Import(bytes.NewReader(gencar)) if err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index bde179316..9ee8aedb6 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -137,7 +137,7 @@ type ChainStore struct { } // localbs is guaranteed to fail Get* if requested block isn't stored locally -func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { +func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { c, _ := lru.NewARC(DefaultMsgMetaCacheSize) tsc, _ := lru.NewARC(DefaultTipSetCacheSize) if j == nil { @@ -191,7 +191,6 @@ func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Ba } hcmetric := func(rev, app []*types.TipSet) error { - ctx := context.Background() for _, r := range app { stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height()))) } @@ -199,7 +198,7 @@ func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Ba } cs.reorgNotifeeCh = make(chan ReorgNotifee) - cs.reorgCh = cs.reorgWorker(context.TODO(), []ReorgNotifee{hcnf, hcmetric}) + cs.reorgCh = cs.reorgWorker(ctx, []ReorgNotifee{hcnf, hcmetric}) return cs } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index bef066024..c0c308009 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -70,7 +70,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - cs := store.NewChainStore(bs, bs, mds, nil, nil) + cs := store.NewChainStore(context.Background(), bs, bs, mds, nil, nil) b.ResetTimer() @@ -104,7 +104,7 @@ func TestChainExportImport(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), nil, nil) root, err := cs.Import(buf) if err != nil { @@ -138,7 +138,7 @@ func TestChainExportImportFull(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), nil, nil) root, err := cs.Import(buf) if err != nil { t.Fatal(err) diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index c99bed158..0356e0b2b 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -262,7 +262,7 @@ var importBenchCmd = &cli.Command{ } metadataDs := datastore.NewMapDatastore() - cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil) + cs := store.NewChainStore(context.Background(), bs, bs, metadataDs, vm.Syscalls(verifier), nil) stm := stmgr.NewStateManager(cs) startTime := time.Now() diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 474dfe685..4142e3bf1 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -188,7 +188,7 @@ var chainBalanceStateCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cst := cbor.NewCborStore(bs) store := adt.WrapStore(ctx, cst) @@ -408,7 +408,7 @@ var chainPledgeCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cst := cbor.NewCborStore(bs) store := adt.WrapStore(ctx, cst) diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 42434f3d2..009c8c29a 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -90,7 +90,7 @@ var exportChainCmd = &cli.Command{ return err } - cs := store.NewChainStore(bs, bs, mds, nil, nil) + cs := store.NewChainStore(context.Background(), bs, bs, mds, nil, nil) if err := cs.Load(); err != nil { return err } diff --git a/cmd/lotus-shed/genesis-verify.go b/cmd/lotus-shed/genesis-verify.go index e15a42374..2175989c0 100644 --- a/cmd/lotus-shed/genesis-verify.go +++ b/cmd/lotus-shed/genesis-verify.go @@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{ } bs := blockstore.NewBlockstore(datastore.NewMapDatastore()) - cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(context.Background(), bs, bs, datastore.NewMapDatastore(), nil, nil) cf := cctx.Args().Get(0) f, err := os.Open(cf) diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index 76d283f6b..31bf1c626 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -169,7 +169,7 @@ var stateTreePruneCmd = &cli.Command{ return nil } - cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) if err := cs.Load(); err != nil { return fmt.Errorf("loading chainstore: %w", err) } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 3f833b969..8407a9094 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -419,7 +419,10 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { if err != nil { return xerrors.Errorf("failed to open journal: %w", err) } - cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cst := store.NewChainStore(ctx, bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) log.Infof("importing chain from %s...", fname) diff --git a/conformance/driver.go b/conformance/driver.go index 91f461722..813d3319d 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot syscalls = vm.Syscalls(ffiwrapper.ProofVerifier) vmRand = NewFixedRand() - cs = store.NewChainStore(bs, bs, ds, syscalls, nil) + cs = store.NewChainStore(context.Background(), bs, bs, ds, syscalls, nil) sm = stmgr.NewStateManager(cs) ) diff --git a/node/modules/chain.go b/node/modules/chain.go index fe0ebdfbb..e120555d1 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -111,8 +111,9 @@ func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap return nil } -func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { - chain := store.NewChainStore(bs, lbs, ds, syscalls, j) +func ChainStore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { + ctx := helpers.LifecycleCtx(mctx, lc) + chain := store.NewChainStore(ctx, bs, lbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) From bac51dd4e3448b6b12fe7814d1b3eee0aa35879d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 16 Nov 2020 16:03:47 +0000 Subject: [PATCH 3/6] daemon --import-snapshot: force head silently, without triggering reorg. --- chain/store/store.go | 2 +- cmd/lotus/daemon.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 9ee8aedb6..235b5bf2e 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -382,7 +382,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS // particular tipset to carry out a benchmark, verification, etc. on a chain // segment. func (cs *ChainStore) ForceHeadSilent(_ context.Context, ts *types.TipSet) error { - log.Warnf("(!!!) forcing a new head silently; only use this only for testing; new head: %s", ts) + log.Warnf("(!!!) forcing a new head silently; new head: %s", ts) cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 8407a9094..a88ed751e 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -467,7 +467,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { } log.Infof("accepting %s as new head", ts.Cids()) - if err := cst.SetHead(ts); err != nil { + if err := cst.ForceHeadSilent(ctx, ts); err != nil { return err } From b13a41a44dbc2b66fb4e9e6b3ea4679e3093669e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 16 Nov 2020 16:30:17 +0000 Subject: [PATCH 4/6] go mod tidy. --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index ca4843a51..2953a7abf 100644 --- a/go.sum +++ b/go.sum @@ -567,8 +567,6 @@ github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86 github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU= github.com/ipfs/go-ipfs-blockstore v1.0.1 h1:fnuVj4XdZp4yExhd0CnUwAiMNJHiPnfInhiuwz4lW1w= github.com/ipfs/go-ipfs-blockstore v1.0.1/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= -github.com/ipfs/go-ipfs-blockstore v1.0.2 h1:Z8nUlBHK7wVKPKliQCQR9tLgUtz4J2QRbqFcJrqzM+E= -github.com/ipfs/go-ipfs-blockstore v1.0.2/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f h1:AQQb5zZj7KKTEFh9EaAUXc5Q+F7SbYkjfYogZnEzfUc= github.com/ipfs/go-ipfs-blockstore v1.0.3-0.20201116142306-a33814d3b08f/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= From 0c6072a1a032586809b0ab29b82324ec479e8be5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 16 Nov 2020 22:22:08 +0000 Subject: [PATCH 5/6] chainstore lifecycle: close via Close() instead of context. --- chain/gen/gen.go | 2 +- chain/gen/genesis/genesis.go | 2 +- chain/store/index_test.go | 3 ++- chain/store/store.go | 26 ++++++++++++++++++++------ chain/store/store_test.go | 10 +++++++--- cmd/lotus-bench/import.go | 4 +++- cmd/lotus-shed/balances.go | 6 ++++-- cmd/lotus-shed/export.go | 4 +++- cmd/lotus-shed/genesis-verify.go | 3 ++- cmd/lotus-shed/pruning.go | 4 +++- cmd/lotus/daemon.go | 7 +++---- conformance/driver.go | 4 +++- node/modules/chain.go | 11 ++++++++--- 13 files changed, 60 insertions(+), 26 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 70f0e3c4b..ef717dc75 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -245,7 +245,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("make genesis block failed: %w", err) } - cs := store.NewChainStore(context.Background(), bs, bs, ds, sys, j) + cs := store.NewChainStore(bs, bs, ds, sys, j) genfb := &types.FullBlock{Header: genb.Genesis} gents := store.NewFullTipSet([]*types.FullBlock{genfb}) diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index f67bfc97d..e441af7ae 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto } // temp chainstore - cs := store.NewChainStore(context.Background(), bs, bs, datastore.NewMapDatastore(), sys, j) + cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j) // Verify PreSealed Data stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs) diff --git a/chain/store/index_test.go b/chain/store/index_test.go index 84736ca1d..89756a252 100644 --- a/chain/store/index_test.go +++ b/chain/store/index_test.go @@ -31,7 +31,8 @@ func TestIndexSeeks(t *testing.T) { ctx := context.TODO() nbs := blockstore.NewTemporarySync() - cs := store.NewChainStore(context.Background(), nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil) + cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil) + defer cs.Close() //nolint:errcheck _, err = cs.Import(bytes.NewReader(gencar)) if err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index 235b5bf2e..f4ce8112b 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -134,24 +134,30 @@ type ChainStore struct { evtTypes [1]journal.EventType journal journal.Journal + + cancelFn context.CancelFunc + wg sync.WaitGroup } // localbs is guaranteed to fail Get* if requested block isn't stored locally -func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { - c, _ := lru.NewARC(DefaultMsgMetaCacheSize) - tsc, _ := lru.NewARC(DefaultTipSetCacheSize) +func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { + mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize) + tsCache, _ := lru.NewARC(DefaultTipSetCacheSize) if j == nil { j = journal.NilJournal() } + + ctx, cancel := context.WithCancel(context.Background()) cs := &ChainStore{ bs: bs, localbs: localbs, ds: ds, bestTips: pubsub.New(64), tipsets: make(map[abi.ChainEpoch][]cid.Cid), - mmCache: c, - tsCache: tsc, + mmCache: mmCache, + tsCache: tsCache, vmcalls: vmcalls, + cancelFn: cancel, journal: j, } @@ -192,7 +198,7 @@ func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blo hcmetric := func(rev, app []*types.TipSet) error { for _, r := range app { - stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height()))) + stats.Record(context.Background(), metrics.ChainNodeHeight.M(int64(r.Height()))) } return nil } @@ -203,6 +209,12 @@ func NewChainStore(ctx context.Context, bs bstore.Blockstore, localbs bstore.Blo return cs } +func (cs *ChainStore) Close() error { + cs.cancelFn() + cs.wg.Wait() + return nil +} + func (cs *ChainStore) Load() error { head, err := cs.ds.Get(chainHeadKey) if err == dstore.ErrNotFound { @@ -405,7 +417,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo notifees := make([]ReorgNotifee, len(initialNotifees)) copy(notifees, initialNotifees) + cs.wg.Add(1) go func() { + defer cs.wg.Done() defer log.Warn("reorgWorker quit") for { diff --git a/chain/store/store_test.go b/chain/store/store_test.go index c0c308009..5f1f336f3 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -70,7 +70,8 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - cs := store.NewChainStore(context.Background(), bs, bs, mds, nil, nil) + cs := store.NewChainStore(bs, bs, mds, nil, nil) + defer cs.Close() //nolint:errcheck b.ResetTimer() @@ -104,7 +105,8 @@ func TestChainExportImport(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil) + defer cs.Close() //nolint:errcheck root, err := cs.Import(buf) if err != nil { @@ -138,7 +140,9 @@ func TestChainExportImportFull(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(context.Background(), nbs, nbs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil) + defer cs.Close() //nolint:errcheck + root, err := cs.Import(buf) if err != nil { t.Fatal(err) diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 0356e0b2b..f8752fc99 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -262,7 +262,9 @@ var importBenchCmd = &cli.Command{ } metadataDs := datastore.NewMapDatastore() - cs := store.NewChainStore(context.Background(), bs, bs, metadataDs, vm.Syscalls(verifier), nil) + cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil) + defer cs.Close() //nolint:errcheck + stm := stmgr.NewStateManager(cs) startTime := time.Now() diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 4142e3bf1..da1263408 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -188,7 +188,8 @@ var chainBalanceStateCmd = &cli.Command{ return err } - cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) store := adt.WrapStore(ctx, cst) @@ -408,7 +409,8 @@ var chainPledgeCmd = &cli.Command{ return err } - cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + defer cs.Close() //nolint:errcheck cst := cbor.NewCborStore(bs) store := adt.WrapStore(ctx, cst) diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 009c8c29a..9b0703445 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -90,7 +90,9 @@ var exportChainCmd = &cli.Command{ return err } - cs := store.NewChainStore(context.Background(), bs, bs, mds, nil, nil) + cs := store.NewChainStore(bs, bs, mds, nil, nil) + defer cs.Close() //nolint:errcheck + if err := cs.Load(); err != nil { return err } diff --git a/cmd/lotus-shed/genesis-verify.go b/cmd/lotus-shed/genesis-verify.go index 2175989c0..20561eb5a 100644 --- a/cmd/lotus-shed/genesis-verify.go +++ b/cmd/lotus-shed/genesis-verify.go @@ -52,7 +52,8 @@ var genesisVerifyCmd = &cli.Command{ } bs := blockstore.NewBlockstore(datastore.NewMapDatastore()) - cs := store.NewChainStore(context.Background(), bs, bs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil) + defer cs.Close() //nolint:errcheck cf := cctx.Args().Get(0) f, err := os.Open(cf) diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index 31bf1c626..8728163c9 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -169,7 +169,9 @@ var stateTreePruneCmd = &cli.Command{ return nil } - cs := store.NewChainStore(context.Background(), bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + defer cs.Close() //nolint:errcheck + if err := cs.Load(); err != nil { return fmt.Errorf("loading chainstore: %w", err) } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index a88ed751e..fcbac3f08 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -420,9 +420,8 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { return xerrors.Errorf("failed to open journal: %w", err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cst := store.NewChainStore(ctx, bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) + cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) + defer cst.Close() log.Infof("importing chain from %s...", fname) @@ -467,7 +466,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { } log.Infof("accepting %s as new head", ts.Cids()) - if err := cst.ForceHeadSilent(ctx, ts); err != nil { + if err := cst.ForceHeadSilent(context.Background(), ts); err != nil { return err } diff --git a/conformance/driver.go b/conformance/driver.go index 813d3319d..833d50d7b 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -87,10 +87,12 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot syscalls = vm.Syscalls(ffiwrapper.ProofVerifier) vmRand = NewFixedRand() - cs = store.NewChainStore(context.Background(), bs, bs, ds, syscalls, nil) + cs = store.NewChainStore(bs, bs, ds, syscalls, nil) sm = stmgr.NewStateManager(cs) ) + defer cs.Close() //nolint:errcheck + blocks := make([]store.BlockMessages, 0, len(tipset.Blocks)) for _, b := range tipset.Blocks { sb := store.BlockMessages{ diff --git a/node/modules/chain.go b/node/modules/chain.go index e120555d1..095bb501c 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -111,14 +111,19 @@ func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap return nil } -func ChainStore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { - ctx := helpers.LifecycleCtx(mctx, lc) - chain := store.NewChainStore(ctx, bs, lbs, ds, syscalls, j) +func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { + chain := store.NewChainStore(bs, lbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) } + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return chain.Close() + }, + }) + return chain } From 27c0ce4a3a8ec0264d0e4fa952844bb649c68dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 16 Nov 2020 22:39:08 +0000 Subject: [PATCH 6/6] fix lint. --- cmd/lotus/daemon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index fcbac3f08..1d13b4082 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -421,7 +421,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { } cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) - defer cst.Close() + defer cst.Close() //nolint:errcheck log.Infof("importing chain from %s...", fname)