Merge pull request #5282 from filecoin-project/chore/snake_context_through_blockstore_init
Snake a context through the Chain-blockstore creation
This commit is contained in:
commit
a541a2500a
@ -120,12 +120,12 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
|||||||
return nil, xerrors.Errorf("taking mem-repo lock failed: %w", err)
|
return nil, xerrors.Errorf("taking mem-repo lock failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ds, err := lr.Datastore("/metadata")
|
ds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
|
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := lr.Blockstore(repo.BlockstoreChain)
|
bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ func BenchmarkGetRandomness(b *testing.B) {
|
|||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := lr.Blockstore(repo.BlockstoreChain)
|
bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -65,7 +65,7 @@ func BenchmarkGetRandomness(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ func BackupCmd(repoFlag string, rt repo.RepoType, getApi BackupApiFn) *cli.Comma
|
|||||||
}
|
}
|
||||||
defer lr.Close() // nolint:errcheck
|
defer lr.Close() // nolint:errcheck
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting metadata datastore: %w", err)
|
return xerrors.Errorf("getting metadata datastore: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -308,7 +308,7 @@ var runCmd = &cli.Command{
|
|||||||
|
|
||||||
{
|
{
|
||||||
// init datastore for r.Exists
|
// init datastore for r.Exists
|
||||||
_, err := lr.Datastore("/metadata")
|
_, err := lr.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -327,7 +327,7 @@ var runCmd = &cli.Command{
|
|||||||
log.Error("closing repo", err)
|
log.Error("closing repo", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
ds, err := lr.Datastore("/metadata")
|
ds, err := lr.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -175,7 +175,7 @@ var chainBalanceStateCmd = &cli.Command{
|
|||||||
|
|
||||||
defer lkrepo.Close() //nolint:errcheck
|
defer lkrepo.Close() //nolint:errcheck
|
||||||
|
|
||||||
bs, err := lkrepo.Blockstore(repo.BlockstoreChain)
|
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
@ -188,7 +188,7 @@ var chainBalanceStateCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mds, err := lkrepo.Datastore("/metadata")
|
mds, err := lkrepo.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -396,7 +396,7 @@ var chainPledgeCmd = &cli.Command{
|
|||||||
|
|
||||||
defer lkrepo.Close() //nolint:errcheck
|
defer lkrepo.Close() //nolint:errcheck
|
||||||
|
|
||||||
bs, err := lkrepo.Blockstore(repo.BlockstoreChain)
|
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to open blockstore: %w", err)
|
return xerrors.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
@ -409,7 +409,7 @@ var chainPledgeCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mds, err := lkrepo.Datastore("/metadata")
|
mds, err := lkrepo.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -75,7 +76,7 @@ var datastoreListCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
defer lr.Close() //nolint:errcheck
|
defer lr.Close() //nolint:errcheck
|
||||||
|
|
||||||
ds, err := lr.Datastore(datastore.NewKey(cctx.Args().First()).String())
|
ds, err := lr.Datastore(context.Background(), datastore.NewKey(cctx.Args().First()).String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -141,7 +142,7 @@ var datastoreGetCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
defer lr.Close() //nolint:errcheck
|
defer lr.Close() //nolint:errcheck
|
||||||
|
|
||||||
ds, err := lr.Datastore(datastore.NewKey(cctx.Args().First()).String())
|
ds, err := lr.Datastore(context.Background(), datastore.NewKey(cctx.Args().First()).String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ var exportChainCmd = &cli.Command{
|
|||||||
|
|
||||||
defer fi.Close() //nolint:errcheck
|
defer fi.Close() //nolint:errcheck
|
||||||
|
|
||||||
bs, err := lr.Blockstore(repo.BlockstoreChain)
|
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
@ -85,7 +85,7 @@ var exportChainCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -24,6 +25,8 @@ var importCarCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("opening fs repo: %w", err)
|
return xerrors.Errorf("opening fs repo: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
exists, err := r.Exists()
|
exists, err := r.Exists()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -44,7 +47,7 @@ var importCarCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("opening the car file: %w", err)
|
return xerrors.Errorf("opening the car file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := lr.Blockstore(repo.BlockstoreChain)
|
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -99,6 +102,8 @@ var importObjectCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("opening fs repo: %w", err)
|
return xerrors.Errorf("opening fs repo: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
exists, err := r.Exists()
|
exists, err := r.Exists()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -113,7 +118,7 @@ var importObjectCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
defer lr.Close() //nolint:errcheck
|
defer lr.Close() //nolint:errcheck
|
||||||
|
|
||||||
bs, err := lr.Blockstore(repo.BlockstoreChain)
|
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
|
|
||||||
defer lkrepo.Close() //nolint:errcheck
|
defer lkrepo.Close() //nolint:errcheck
|
||||||
|
|
||||||
bs, err := lkrepo.Blockstore(repo.BlockstoreChain)
|
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
@ -151,7 +151,7 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
return fmt.Errorf("only badger blockstores are supported")
|
return fmt.Errorf("only badger blockstores are supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
mds, err := lkrepo.Datastore("/metadata")
|
mds, err := lkrepo.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -417,7 +417,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
|||||||
return xerrors.Errorf("peer ID from private key: %w", err)
|
return xerrors.Errorf("peer ID from private key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
@ -190,7 +191,7 @@ var initRestoreCmd = &cli.Command{
|
|||||||
|
|
||||||
log.Info("Restoring metadata backup")
|
log.Info("Restoring metadata backup")
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -118,7 +118,7 @@ var runCmd = &cli.Command{
|
|||||||
|
|
||||||
var w api.WalletAPI = lw
|
var w api.WalletAPI = lw
|
||||||
if cctx.Bool("ledger") {
|
if cctx.Bool("ledger") {
|
||||||
ds, err := lr.Datastore("/metadata")
|
ds, err := lr.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
dstore "github.com/ipfs/go-datastore"
|
dstore "github.com/ipfs/go-datastore"
|
||||||
@ -87,7 +88,7 @@ func restore(cctx *cli.Context, r repo.Repo) error {
|
|||||||
|
|
||||||
log.Info("Restoring metadata backup")
|
log.Info("Restoring metadata backup")
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,7 @@ var DaemonCmd = &cli.Command{
|
|||||||
issnapshot = true
|
issnapshot = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ImportChain(r, chainfile, issnapshot); err != nil {
|
if err := ImportChain(ctx, r, chainfile, issnapshot); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if cctx.Bool("halt-after-import") {
|
if cctx.Bool("halt-after-import") {
|
||||||
@ -389,7 +389,7 @@ func importKey(ctx context.Context, api api.FullNode, f string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) (err error) {
|
||||||
var rd io.Reader
|
var rd io.Reader
|
||||||
var l int64
|
var l int64
|
||||||
if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") {
|
if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") {
|
||||||
@ -432,12 +432,12 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
}
|
}
|
||||||
defer lr.Close() //nolint:errcheck
|
defer lr.Close() //nolint:errcheck
|
||||||
|
|
||||||
bs, err := lr.Blockstore(repo.BlockstoreChain)
|
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to open blockstore: %w", err)
|
return xerrors.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mds, err := lr.Datastore("/metadata")
|
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -473,7 +473,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
return xerrors.Errorf("flushing validation cache failed: %w", err)
|
return xerrors.Errorf("flushing validation cache failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gb, err := cst.GetTipsetByHeight(context.TODO(), 0, ts, true)
|
gb, err := cst.GetTipsetByHeight(ctx, 0, ts, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -487,13 +487,13 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
|||||||
|
|
||||||
if !snapshot {
|
if !snapshot {
|
||||||
log.Infof("validating imported chain...")
|
log.Infof("validating imported chain...")
|
||||||
if err := stm.ValidateChain(context.TODO(), ts); err != nil {
|
if err := stm.ValidateChain(ctx, ts); err != nil {
|
||||||
return xerrors.Errorf("chain validation failed: %w", err)
|
return xerrors.Errorf("chain validation failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("accepting %s as new head", ts.Cids())
|
log.Infof("accepting %s as new head", ts.Cids())
|
||||||
if err := cst.ForceHeadSilent(context.Background(), ts); err != nil {
|
if err := cst.ForceHeadSilent(ctx, ts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
|
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
|
||||||
bs, err := r.Blockstore(repo.BlockstoreChain)
|
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.BlockstoreChain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
||||||
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
||||||
@ -78,8 +79,9 @@ func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
|
func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
|
||||||
ds, err := r.Datastore("/client")
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
ds, err := r.Datastore(ctx, "/client")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
|
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/backupds"
|
"github.com/filecoin-project/lotus/lib/backupds"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -27,8 +28,9 @@ func KeyStore(lr repo.LockedRepo) (types.KeyStore, error) {
|
|||||||
return lr.KeyStore()
|
return lr.KeyStore()
|
||||||
}
|
}
|
||||||
|
|
||||||
func Datastore(r repo.LockedRepo) (dtypes.MetadataDS, error) {
|
func Datastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.MetadataDS, error) {
|
||||||
mds, err := r.Datastore("/metadata")
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
mds, err := r.Datastore(ctx, "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -362,8 +362,9 @@ func NewProviderPieceStore(lc fx.Lifecycle, ds dtypes.MetadataDS) (dtypes.Provid
|
|||||||
return ps, nil
|
return ps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func StagingMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.StagingMultiDstore, error) {
|
func StagingMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.StagingMultiDstore, error) {
|
||||||
ds, err := r.Datastore("/staging")
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
ds, err := r.Datastore(ctx, "/staging")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
|
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
|
||||||
}
|
}
|
||||||
@ -384,8 +385,9 @@ func StagingMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.StagingMu
|
|||||||
|
|
||||||
// StagingBlockstore creates a blockstore for staging blocks for a miner
|
// StagingBlockstore creates a blockstore for staging blocks for a miner
|
||||||
// in a storage deal, prior to sealing
|
// in a storage deal, prior to sealing
|
||||||
func StagingBlockstore(r repo.LockedRepo) (dtypes.StagingBlockstore, error) {
|
func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.StagingBlockstore, error) {
|
||||||
stagingds, err := r.Datastore("/staging")
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
stagingds, err := r.Datastore(ctx, "/staging")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package repo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -299,7 +300,7 @@ func (fsr *fsLockedRepo) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Blockstore returns a blockstore for the provided data domain.
|
// Blockstore returns a blockstore for the provided data domain.
|
||||||
func (fsr *fsLockedRepo) Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
||||||
if domain != BlockstoreChain {
|
if domain != BlockstoreChain {
|
||||||
return nil, ErrInvalidBlockstoreDomain
|
return nil, ErrInvalidBlockstoreDomain
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package repo
|
package repo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
@ -67,7 +68,7 @@ func (fsr *fsLockedRepo) openDatastores(readonly bool) (map[string]datastore.Bat
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
|
func (fsr *fsLockedRepo) Datastore(_ context.Context, ns string) (datastore.Batching, error) {
|
||||||
fsr.dsOnce.Do(func() {
|
fsr.dsOnce.Do(func() {
|
||||||
fsr.ds, fsr.dsErr = fsr.openDatastores(fsr.readonly)
|
fsr.ds, fsr.dsErr = fsr.openDatastores(fsr.readonly)
|
||||||
})
|
})
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package repo
|
package repo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
@ -51,10 +52,16 @@ type LockedRepo interface {
|
|||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
// Returns datastore defined in this repo.
|
// Returns datastore defined in this repo.
|
||||||
Datastore(namespace string) (datastore.Batching, error)
|
// The supplied context must only be used to initialize the datastore.
|
||||||
|
// The implementation should not retain the context for usage throughout
|
||||||
|
// the lifecycle.
|
||||||
|
Datastore(ctx context.Context, namespace string) (datastore.Batching, error)
|
||||||
|
|
||||||
// Blockstore returns an IPLD blockstore for the requested domain.
|
// Blockstore returns an IPLD blockstore for the requested domain.
|
||||||
Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error)
|
// The supplied context must only be used to initialize the blockstore.
|
||||||
|
// The implementation should not retain the context for usage throughout
|
||||||
|
// the lifecycle.
|
||||||
|
Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error)
|
||||||
|
|
||||||
// Returns config in this repo
|
// Returns config in this repo
|
||||||
Config() (interface{}, error)
|
Config() (interface{}, error)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package repo
|
package repo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
@ -236,7 +237,7 @@ func (lmem *lockedMemRepo) Close() error {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) {
|
func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Batching, error) {
|
||||||
if err := lmem.checkToken(); err != nil {
|
if err := lmem.checkToken(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -244,7 +245,7 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) {
|
|||||||
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
|
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lmem *lockedMemRepo) Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
||||||
if domain != BlockstoreChain {
|
if domain != BlockstoreChain {
|
||||||
return nil, ErrInvalidBlockstoreDomain
|
return nil, ErrInvalidBlockstoreDomain
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,7 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
ds, err := lr.Datastore("/metadata")
|
ds, err := lr.Datastore(context.TODO(), "/metadata")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = ds.Put(datastore.NewKey("miner-address"), act.Bytes())
|
err = ds.Put(datastore.NewKey("miner-address"), act.Bytes())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -165,7 +165,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ds, err := lr.Datastore("/metadata")
|
ds, err := lr.Datastore(context.Background(), "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user