lotus/node/modules/storageminer_dagstore.go

119 lines
3.3 KiB
Go
Raw Normal View History

2021-08-03 11:17:50 +00:00
package modules
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/filecoin-project/dagstore"
2021-08-03 11:17:50 +00:00
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
2021-08-03 22:22:46 +00:00
"go.uber.org/fx"
"golang.org/x/xerrors"
mdagstore "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/config"
2021-08-03 11:17:50 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
const (
EnvDAGStoreCopyConcurrency = "LOTUS_DAGSTORE_COPY_CONCURRENCY"
DefaultDAGStoreDir = "dagStore"
)
// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts.
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode) (mdagstore.MinerAPI, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, err
}
// caps the amount of concurrent calls to the storage, so that we don't
// spam it during heavy processes like bulk migration.
if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrencyStorageCalls = concurrency
}
}
mountApi := mdagstore.NewMinerAPI(pieceStore, rpn, cfg.MaxConcurrencyStorageCalls)
2021-08-03 11:17:50 +00:00
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
return mountApi, nil
}
// DAGStore constructs a DAG store using the supplied minerAPI, and the
// user configuration. It returns both the DAGStore and the Wrapper suitable for
// passing to markets.
func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) {
cfg, err := extractDAGStoreConfig(r)
2021-08-03 11:17:50 +00:00
if err != nil {
return nil, nil, err
2021-08-03 11:17:50 +00:00
}
// populate default directories if not explicitly set in the config.
defaultDir := filepath.Join(r.Path(), DefaultDAGStoreDir)
if cfg.TransientsDir == "" {
cfg.TransientsDir = filepath.Join(defaultDir, "transients")
}
if cfg.IndexDir == "" {
cfg.IndexDir = filepath.Join(defaultDir, "index")
}
if cfg.DatastoreDir == "" {
cfg.DatastoreDir = filepath.Join(defaultDir, "datastore")
}
v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)
2021-08-03 11:17:50 +00:00
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrentReadyFetches = concurrency
2021-08-03 11:17:50 +00:00
}
}
dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI)
2021-08-03 11:17:50 +00:00
if err != nil {
return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err)
2021-08-03 11:17:50 +00:00
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return w.Start(ctx)
2021-08-03 11:17:50 +00:00
},
OnStop: func(context.Context) error {
return w.Close()
2021-08-03 11:17:50 +00:00
},
})
return dagst, w, nil
}
2021-08-03 11:17:50 +00:00
func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) {
cfg, err := r.Config()
2021-08-03 11:17:50 +00:00
if err != nil {
return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err)
}
2021-08-03 22:22:46 +00:00
mcfg, ok := cfg.(*config.StorageMiner)
if !ok {
return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg)
2021-08-03 11:17:50 +00:00
}
return mcfg.DAGStore, nil
2021-08-03 11:17:50 +00:00
}