From 270bad8144d03d1fac42a124177fb84f105e807f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 4 Aug 2021 16:59:27 +0100 Subject: [PATCH] pull dagstore migration into Lotus and simplify. Migration registers all shards with lazy init. Shards are then initialized as they are retrieved for the first time, or in bulk through a lotus-shed tool that will be provided separately. --- go.mod | 2 +- go.sum | 4 +- markets/dagstore/wrapper.go | 166 +++++++++++++++++++-- markets/dagstore/wrapper_migration_test.go | 110 ++++++++++++++ markets/dagstore/wrapper_test.go | 12 +- node/config/def.go | 3 - node/config/types.go | 30 ++-- node/modules/storageminer.go | 38 +++-- node/modules/storageminer_dagstore.go | 16 +- 9 files changed, 315 insertions(+), 66 deletions(-) create mode 100644 markets/dagstore/wrapper_migration_test.go diff --git a/go.mod b/go.mod index 75a67af52..41ed12893 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/filecoin-project/go-data-transfer v1.7.2 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 + github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804153033-3621b28b87bb github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1 diff --git a/go.sum b/go.sum index 17eafbd23..9ecf00675 100644 --- a/go.sum +++ b/go.sum @@ -290,8 +290,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 h1:zubk4E8s5KLw5Y2Or39A3Ob8c7DAT6FL/mJBs1dMkrQ= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804153033-3621b28b87bb h1:vJky03MaywXx4KF3yNCiM8VO5zj9eW0P3n3E0xqrsZw= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804153033-3621b28b87bb/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 9a31d68c3..c05753baa 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -3,10 +3,13 @@ package dagstore import ( "context" "errors" + "math" "os" + "path/filepath" "sync" "time" + "github.com/filecoin-project/go-statemachine/fsm" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" levelds "github.com/ipfs/go-ds-leveldb" @@ -15,6 +18,8 @@ import ( ldbopts "github.com/syndtr/goleveldb/leveldb/opt" "golang.org/x/xerrors" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/dagstore" @@ -25,17 +30,21 @@ import ( "github.com/filecoin-project/go-fil-markets/stores" ) -const maxRecoverAttempts = 1 +const ( + maxRecoverAttempts = 1 + shardRegMarker = ".shard-registration-complete" +) -var log = logging.Logger("dagstore-wrapper") +var log = logging.Logger("dagstore") type Wrapper struct { ctx context.Context cancel context.CancelFunc backgroundWg sync.WaitGroup + cfg config.DAGStoreConfig dagst dagstore.Interface - mountApi MinerAPI + minerAPI MinerAPI failureCh chan dagstore.ShardResult traceCh chan dagstore.Trace gcInterval time.Duration @@ -56,18 +65,24 @@ func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGSto // The dagstore will write Trace events to the `traceCh` here. traceCh := make(chan dagstore.Trace, 32) - dstore, err := newDatastore(cfg.DatastoreDir) + var ( + transientsDir = filepath.Join(cfg.RootDir, "transients") + datastoreDir = filepath.Join(cfg.RootDir, "datastore") + indexDir = filepath.Join(cfg.RootDir, "index") + ) + + dstore, err := newDatastore(datastoreDir) if err != nil { - return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", cfg.DatastoreDir, err) + return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", datastoreDir, err) } - irepo, err := index.NewFSRepo(cfg.IndexDir) + irepo, err := index.NewFSRepo(indexDir) if err != nil { return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo") } dcfg := dagstore.Config{ - TransientsDir: cfg.TransientsDir, + TransientsDir: transientsDir, IndexRepo: irepo, Datastore: dstore, MountRegistry: registry, @@ -86,8 +101,9 @@ func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGSto } w := &Wrapper{ + cfg: cfg, dagst: dagst, - mountApi: mountApi, + minerAPI: mountApi, failureCh: failureCh, traceCh: traceCh, gcInterval: time.Duration(cfg.GCInterval), @@ -233,7 +249,7 @@ func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.Closa func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { // Create a lotus mount with the piece CID key := shard.KeyFromCID(pieceCid) - mt, err := NewLotusMount(pieceCid, w.mountApi) + mt, err := NewLotusMount(pieceCid, w.minerAPI) if err != nil { return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err) } @@ -252,6 +268,138 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s return nil } +func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) { + log := log.Named("migrator") + + // Check if all deals have already been registered as shards + isComplete, err := w.registrationComplete() + if err != nil { + return false, xerrors.Errorf("failed to get dagstore migration status: %w", err) + } + if isComplete { + // All deals have been registered as shards, bail out + log.Info("no shard migration necessary; already marked complete") + return false, nil + } + + log.Infow("registering shards for all active deals in sealing subsystem", "count", len(deals)) + + inSealingSubsystem := make(map[fsm.StateKey]struct{}, len(providerstates.StatesKnownBySealingSubsystem)) + for _, s := range providerstates.StatesKnownBySealingSubsystem { + inSealingSubsystem[s] = struct{}{} + } + + // channel where results will be received, and channel where the total + // number of registered shards will be sent. + resch := make(chan dagstore.ShardResult, 32) + totalCh := make(chan int) + doneCh := make(chan struct{}) + + // Start making progress consuming results. We won't know how many to + // actually consume until we register all shards. + // + // If there are any problems registering shards, just log an error + go func() { + defer close(doneCh) + + var total = math.MaxInt64 + var res dagstore.ShardResult + for rcvd := 0; rcvd < total; { + select { + case total = <-totalCh: + // we now know the total number of registered shards + // nullify so that we no longer consume from it after closed. + close(totalCh) + totalCh = nil + case res = <-resch: + rcvd++ + if res.Error == nil { + log.Infow("async shard registration completed successfully", "shard_key", res.Key) + } else { + log.Warnw("async shard registration failed", "shard_key", res.Key, "error", res.Error) + } + } + } + }() + + // Filter for deals that are handed off. + // + // If the deal has not yet been handed off to the sealing subsystem, we + // don't need to call RegisterShard in this migration; RegisterShard will + // be called in the new code once the deal reaches the state where it's + // handed off to the sealing subsystem. + var registered int + for _, deal := range deals { + if deal.Ref.PieceCid == nil { + log.Warnw("deal has nil piece CID; skipping", "deal_id", deal.DealID) + continue + } + + // enrich log statements in this iteration with deal ID and piece CID. + log := log.With("deal_id", deal.DealID, "piece_cid", deal.Ref.PieceCid) + + // Filter for deals that have been handed off to the sealing subsystem + if _, ok := inSealingSubsystem[deal.State]; !ok { + log.Infow("deal not ready; skipping") + continue + } + + log.Infow("registering deal in dagstore with lazy init") + + // Register the deal as a shard with the DAG store with lazy initialization. + // The index will be populated the first time the deal is retrieved, or + // through the bulk initialization script. + err = w.RegisterShard(ctx, *deal.Ref.PieceCid, "", false, resch) + if err != nil { + log.Warnw("failed to register shard", "error", err) + continue + } + registered++ + } + + log.Infow("finished registering all shards", "total", registered) + totalCh <- registered + <-doneCh + + log.Infow("confirmed registration of all shards") + + // Completed registering all shards, so mark the migration as complete + err = w.markRegistrationComplete() + if err != nil { + log.Errorf("failed to mark shards as registered: %s", err) + } else { + log.Info("successfully marked migration as complete") + } + + log.Infow("dagstore migration complete") + + return true, nil +} + +// Check for the existence of a "marker" file indicating that the migration +// has completed +func (w *Wrapper) registrationComplete() (bool, error) { + path := filepath.Join(w.cfg.RootDir, shardRegMarker) + _, err := os.Stat(path) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil +} + +// Create a "marker" file indicating that the migration has completed +func (w *Wrapper) markRegistrationComplete() error { + path := filepath.Join(w.cfg.RootDir, shardRegMarker) + file, err := os.Create(path) + if err != nil { + return err + } + return file.Close() +} + func (w *Wrapper) Close() error { // Cancel the context w.cancel() diff --git a/markets/dagstore/wrapper_migration_test.go b/markets/dagstore/wrapper_migration_test.go new file mode 100644 index 000000000..044489428 --- /dev/null +++ b/markets/dagstore/wrapper_migration_test.go @@ -0,0 +1,110 @@ +package dagstore + +import ( + "context" + "testing" + + "github.com/filecoin-project/dagstore" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes" + tut "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/lotus/node/config" +) + +func TestShardRegistration(t *testing.T) { + ps := tut.NewTestPieceStore() + providerNode := testnodes.NewTestRetrievalProviderNode() + + ctx := context.Background() + cids := tut.GenerateCids(4) + pieceCidUnsealed := cids[0] + pieceCidSealed := cids[1] + pieceCidUnsealed2 := cids[2] + pieceCidUnsealed3 := cids[3] + + sealedSector := abi.SectorNumber(1) + unsealedSector1 := abi.SectorNumber(2) + unsealedSector2 := abi.SectorNumber(3) + unsealedSector3 := abi.SectorNumber(4) + + // ps.ExpectPiece(pieceCidUnsealed, piecestore.PieceInfo{ + // PieceCID: pieceCidUnsealed, + // Deals: []piecestore.DealInfo{ + // { + // SectorID: unsealedSector1, + // }, + // }, + // }) + // + // ps.ExpectPiece(pieceCidSealed, piecestore.PieceInfo{ + // PieceCID: pieceCidSealed, + // Deals: []piecestore.DealInfo{ + // { + // SectorID: sealedSector, + // }, + // }, + // }) + + deals := []storagemarket.MinerDeal{{ + // Should be registered + State: storagemarket.StorageDealSealing, + SectorNumber: unsealedSector1, + Ref: &storagemarket.DataRef{ + PieceCid: &pieceCidUnsealed, + }, + }, { + // Should be registered with lazy registration (because sector is sealed) + State: storagemarket.StorageDealSealing, + SectorNumber: sealedSector, + Ref: &storagemarket.DataRef{ + PieceCid: &pieceCidSealed, + }, + }, { + // Should be ignored because deal is no longer active + State: storagemarket.StorageDealError, + SectorNumber: unsealedSector2, + Ref: &storagemarket.DataRef{ + PieceCid: &pieceCidUnsealed2, + }, + }, { + // Should be ignored because deal is not yet sealing + State: storagemarket.StorageDealFundsReserved, + SectorNumber: unsealedSector3, + Ref: &storagemarket.DataRef{ + PieceCid: &pieceCidUnsealed3, + }, + }} + + cfg := config.DefaultStorageMiner().DAGStore + cfg.RootDir = t.TempDir() + + mapi := NewMinerAPI(ps, providerNode, 10) + dagst, w, err := NewDAGStore(cfg, mapi) + require.NoError(t, err) + require.NotNil(t, dagst) + require.NotNil(t, w) + + err = dagst.Start(context.Background()) + require.NoError(t, err) + + migrated, err := w.MigrateDeals(ctx, deals) + require.True(t, migrated) + require.NoError(t, err) + + info := dagst.AllShardsInfo() + require.Len(t, info, 2) + for _, i := range info { + require.Equal(t, dagstore.ShardStateNew, i.ShardState) + } + + // Run register shard migration again + migrated, err = w.MigrateDeals(ctx, deals) + require.False(t, migrated) + require.NoError(t, err) + + // ps.VerifyExpectations(t) +} diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 8748e4dbf..9d3e6939e 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -29,10 +29,8 @@ func TestWrapperAcquireRecovery(t *testing.T) { // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ - TransientsDir: t.TempDir(), - IndexDir: t.TempDir(), - DatastoreDir: t.TempDir(), - GCInterval: config.Duration(1 * time.Millisecond), + RootDir: t.TempDir(), + GCInterval: config.Duration(1 * time.Millisecond), }, mockLotusMount{}) require.NoError(t, err) @@ -82,10 +80,8 @@ func TestWrapperBackground(t *testing.T) { // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ - TransientsDir: t.TempDir(), - IndexDir: t.TempDir(), - DatastoreDir: t.TempDir(), - GCInterval: config.Duration(1 * time.Millisecond), + RootDir: t.TempDir(), + GCInterval: config.Duration(1 * time.Millisecond), }, mockLotusMount{}) require.NoError(t, err) diff --git a/node/config/def.go b/node/config/def.go index bef8fa8cf..b236e19f0 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -189,9 +189,6 @@ func DefaultStorageMiner() *StorageMiner { DealPublishControl: []string{}, }, - // The default DAGStoreConfig doesn't define any paths for transients, - // indices and the datastore. Empty values will lead to these being - // placed under /dagStore. DAGStore: DAGStoreConfig{ MaxConcurrentIndex: 5, MaxConcurrentReadyFetches: 2, diff --git a/node/config/types.go b/node/config/types.go index 9f77ee7a4..2b07ad28b 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -54,23 +54,19 @@ type StorageMiner struct { } type DAGStoreConfig struct { - // Path to the transients directory. The transients directory caches - // unsealed deals that have been fetched from the storage subsystem for - // serving retrievals. When empty or omitted, the default value applies. - // Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment) - // or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment) - TransientsDir string - - // Path to indices directory. When empty or omitted, the default value applies. - // Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment) - // or $LOTUS_MINER_PATH/dagStore/index (monolith deployment) - IndexDir string - - // Path to datastore directory. The datastore is a KV store tracking the - // state of shards known to the DAG store. - // Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment) - // or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment) - DatastoreDir string + // Path to the dagstore root directory. This directory contains three + // subdirectories, which can be symlinked to alternative locations if + // need be: + // + // - ./transients: caches unsealed deals that have been fetched from the + // storage subsystem for serving retrievals. + // - ./indices: stores shard indices. + // - ./datastore: holds the KV store tracking the state of every shard + // known to the DAG store. + // + // Default value: $LOTUS_MARKETS_PATH/dagStore (split deployment) or + // $LOTUS_MINER_PATH/dagStore (monolith deployment) + RootDir string // The maximum amount of indexing jobs that can run simultaneously. // Default value: 5. diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index f9107006e..888af6980 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -19,16 +19,6 @@ import ( dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtnet "github.com/filecoin-project/go-data-transfer/network" dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" - piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" - piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" - rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" - "github.com/filecoin-project/go-fil-markets/shared" - "github.com/filecoin-project/go-fil-markets/storagemarket" - storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" - smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-state-types/abi" @@ -47,6 +37,17 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" + piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" + piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" + rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" + "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-fil-markets/storagemarket" + storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" + smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -590,13 +591,20 @@ func StorageProvider(minerAddress dtypes.MinerAddress, return nil, err } - dagStorePath := filepath.Join(r.Path(), DefaultDAGStoreDir) - opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) - shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), dagStorePath, dsw, pieceStore, spn) - return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dsw, pieceStore, - dataTransfer, spn, address.Address(minerAddress), storedAsk, shardMigrator, opt) + return storageimpl.NewProvider( + net, + namespace.Wrap(ds, datastore.NewKey("/deals/provider")), + store, + dsw, + pieceStore, + dataTransfer, + spn, + address.Address(minerAddress), + storedAsk, + opt, + ) } func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index 6f032ed34..934198cff 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -8,10 +8,11 @@ import ( "strconv" "github.com/filecoin-project/dagstore" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + mdagstore "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -68,16 +69,9 @@ func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) ( return nil, nil, err } - // populate default directories if not explicitly set in the config. - defaultDir := filepath.Join(r.Path(), DefaultDAGStoreDir) - if cfg.TransientsDir == "" { - cfg.TransientsDir = filepath.Join(defaultDir, "transients") - } - if cfg.IndexDir == "" { - cfg.IndexDir = filepath.Join(defaultDir, "index") - } - if cfg.DatastoreDir == "" { - cfg.DatastoreDir = filepath.Join(defaultDir, "datastore") + // fall back to default root directory if not explicitly set in the config. + if cfg.RootDir == "" { + cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir) } v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)