lotus/node/modules/storageminer_dagstore.go
Raúl Kripalani 270bad8144 pull dagstore migration into Lotus and simplify.
Migration registers all shards with lazy init.

Shards are then initialized as they are retrieved for the
first time, or in bulk through a lotus-shed tool that will
be provided separately.
2021-08-04 16:59:27 +01:00

113 lines
3.1 KiB
Go

package modules
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/filecoin-project/dagstore"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
mdagstore "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
const (
EnvDAGStoreCopyConcurrency = "LOTUS_DAGSTORE_COPY_CONCURRENCY"
DefaultDAGStoreDir = "dagStore"
)
// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts.
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode) (mdagstore.MinerAPI, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, err
}
// caps the amount of concurrent calls to the storage, so that we don't
// spam it during heavy processes like bulk migration.
if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrencyStorageCalls = concurrency
}
}
mountApi := mdagstore.NewMinerAPI(pieceStore, rpn, cfg.MaxConcurrencyStorageCalls)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
return mountApi, nil
}
// DAGStore constructs a DAG store using the supplied minerAPI, and the
// user configuration. It returns both the DAGStore and the Wrapper suitable for
// passing to markets.
func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, nil, err
}
// fall back to default root directory if not explicitly set in the config.
if cfg.RootDir == "" {
cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir)
}
v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrentReadyFetches = concurrency
}
}
dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI)
if err != nil {
return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return w.Start(ctx)
},
OnStop: func(context.Context) error {
return w.Close()
},
})
return dagst, w, nil
}
func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) {
cfg, err := r.Config()
if err != nil {
return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err)
}
mcfg, ok := cfg.(*config.StorageMiner)
if !ok {
return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg)
}
return mcfg.DAGStore, nil
}