pull dagstore migration into Lotus and simplify.

Migration registers all shards with lazy init.

Shards are then initialized as they are retrieved for the
first time, or in bulk through a lotus-shed tool that will
be provided separately.
This commit is contained in:
Raúl Kripalani 2021-08-04 16:59:27 +01:00
parent e8e73e5374
commit 270bad8144
9 changed files with 315 additions and 66 deletions

2
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.7.2
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804153033-3621b28b87bb
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1

4
go.sum
View File

@ -290,8 +290,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 h1:zubk4E8s5KLw5Y2Or39A3Ob8c7DAT6FL/mJBs1dMkrQ=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804153033-3621b28b87bb h1:vJky03MaywXx4KF3yNCiM8VO5zj9eW0P3n3E0xqrsZw=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804153033-3621b28b87bb/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -3,10 +3,13 @@ package dagstore
import (
"context"
"errors"
"math"
"os"
"path/filepath"
"sync"
"time"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
levelds "github.com/ipfs/go-ds-leveldb"
@ -15,6 +18,8 @@ import (
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/dagstore"
@ -25,17 +30,21 @@ import (
"github.com/filecoin-project/go-fil-markets/stores"
)
const maxRecoverAttempts = 1
const (
maxRecoverAttempts = 1
shardRegMarker = ".shard-registration-complete"
)
var log = logging.Logger("dagstore-wrapper")
var log = logging.Logger("dagstore")
type Wrapper struct {
ctx context.Context
cancel context.CancelFunc
backgroundWg sync.WaitGroup
cfg config.DAGStoreConfig
dagst dagstore.Interface
mountApi MinerAPI
minerAPI MinerAPI
failureCh chan dagstore.ShardResult
traceCh chan dagstore.Trace
gcInterval time.Duration
@ -56,18 +65,24 @@ func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGSto
// The dagstore will write Trace events to the `traceCh` here.
traceCh := make(chan dagstore.Trace, 32)
dstore, err := newDatastore(cfg.DatastoreDir)
var (
transientsDir = filepath.Join(cfg.RootDir, "transients")
datastoreDir = filepath.Join(cfg.RootDir, "datastore")
indexDir = filepath.Join(cfg.RootDir, "index")
)
dstore, err := newDatastore(datastoreDir)
if err != nil {
return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", cfg.DatastoreDir, err)
return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", datastoreDir, err)
}
irepo, err := index.NewFSRepo(cfg.IndexDir)
irepo, err := index.NewFSRepo(indexDir)
if err != nil {
return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo")
}
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
TransientsDir: transientsDir,
IndexRepo: irepo,
Datastore: dstore,
MountRegistry: registry,
@ -86,8 +101,9 @@ func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGSto
}
w := &Wrapper{
cfg: cfg,
dagst: dagst,
mountApi: mountApi,
minerAPI: mountApi,
failureCh: failureCh,
traceCh: traceCh,
gcInterval: time.Duration(cfg.GCInterval),
@ -233,7 +249,7 @@ func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.Closa
func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, w.mountApi)
mt, err := NewLotusMount(pieceCid, w.minerAPI)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}
@ -252,6 +268,138 @@ func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath s
return nil
}
func (w *Wrapper) MigrateDeals(ctx context.Context, deals []storagemarket.MinerDeal) (bool, error) {
log := log.Named("migrator")
// Check if all deals have already been registered as shards
isComplete, err := w.registrationComplete()
if err != nil {
return false, xerrors.Errorf("failed to get dagstore migration status: %w", err)
}
if isComplete {
// All deals have been registered as shards, bail out
log.Info("no shard migration necessary; already marked complete")
return false, nil
}
log.Infow("registering shards for all active deals in sealing subsystem", "count", len(deals))
inSealingSubsystem := make(map[fsm.StateKey]struct{}, len(providerstates.StatesKnownBySealingSubsystem))
for _, s := range providerstates.StatesKnownBySealingSubsystem {
inSealingSubsystem[s] = struct{}{}
}
// channel where results will be received, and channel where the total
// number of registered shards will be sent.
resch := make(chan dagstore.ShardResult, 32)
totalCh := make(chan int)
doneCh := make(chan struct{})
// Start making progress consuming results. We won't know how many to
// actually consume until we register all shards.
//
// If there are any problems registering shards, just log an error
go func() {
defer close(doneCh)
var total = math.MaxInt64
var res dagstore.ShardResult
for rcvd := 0; rcvd < total; {
select {
case total = <-totalCh:
// we now know the total number of registered shards
// nullify so that we no longer consume from it after closed.
close(totalCh)
totalCh = nil
case res = <-resch:
rcvd++
if res.Error == nil {
log.Infow("async shard registration completed successfully", "shard_key", res.Key)
} else {
log.Warnw("async shard registration failed", "shard_key", res.Key, "error", res.Error)
}
}
}
}()
// Filter for deals that are handed off.
//
// If the deal has not yet been handed off to the sealing subsystem, we
// don't need to call RegisterShard in this migration; RegisterShard will
// be called in the new code once the deal reaches the state where it's
// handed off to the sealing subsystem.
var registered int
for _, deal := range deals {
if deal.Ref.PieceCid == nil {
log.Warnw("deal has nil piece CID; skipping", "deal_id", deal.DealID)
continue
}
// enrich log statements in this iteration with deal ID and piece CID.
log := log.With("deal_id", deal.DealID, "piece_cid", deal.Ref.PieceCid)
// Filter for deals that have been handed off to the sealing subsystem
if _, ok := inSealingSubsystem[deal.State]; !ok {
log.Infow("deal not ready; skipping")
continue
}
log.Infow("registering deal in dagstore with lazy init")
// Register the deal as a shard with the DAG store with lazy initialization.
// The index will be populated the first time the deal is retrieved, or
// through the bulk initialization script.
err = w.RegisterShard(ctx, *deal.Ref.PieceCid, "", false, resch)
if err != nil {
log.Warnw("failed to register shard", "error", err)
continue
}
registered++
}
log.Infow("finished registering all shards", "total", registered)
totalCh <- registered
<-doneCh
log.Infow("confirmed registration of all shards")
// Completed registering all shards, so mark the migration as complete
err = w.markRegistrationComplete()
if err != nil {
log.Errorf("failed to mark shards as registered: %s", err)
} else {
log.Info("successfully marked migration as complete")
}
log.Infow("dagstore migration complete")
return true, nil
}
// Check for the existence of a "marker" file indicating that the migration
// has completed
func (w *Wrapper) registrationComplete() (bool, error) {
path := filepath.Join(w.cfg.RootDir, shardRegMarker)
_, err := os.Stat(path)
if os.IsNotExist(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
// Create a "marker" file indicating that the migration has completed
func (w *Wrapper) markRegistrationComplete() error {
path := filepath.Join(w.cfg.RootDir, shardRegMarker)
file, err := os.Create(path)
if err != nil {
return err
}
return file.Close()
}
func (w *Wrapper) Close() error {
// Cancel the context
w.cancel()

View File

@ -0,0 +1,110 @@
package dagstore
import (
"context"
"testing"
"github.com/filecoin-project/dagstore"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/node/config"
)
func TestShardRegistration(t *testing.T) {
ps := tut.NewTestPieceStore()
providerNode := testnodes.NewTestRetrievalProviderNode()
ctx := context.Background()
cids := tut.GenerateCids(4)
pieceCidUnsealed := cids[0]
pieceCidSealed := cids[1]
pieceCidUnsealed2 := cids[2]
pieceCidUnsealed3 := cids[3]
sealedSector := abi.SectorNumber(1)
unsealedSector1 := abi.SectorNumber(2)
unsealedSector2 := abi.SectorNumber(3)
unsealedSector3 := abi.SectorNumber(4)
// ps.ExpectPiece(pieceCidUnsealed, piecestore.PieceInfo{
// PieceCID: pieceCidUnsealed,
// Deals: []piecestore.DealInfo{
// {
// SectorID: unsealedSector1,
// },
// },
// })
//
// ps.ExpectPiece(pieceCidSealed, piecestore.PieceInfo{
// PieceCID: pieceCidSealed,
// Deals: []piecestore.DealInfo{
// {
// SectorID: sealedSector,
// },
// },
// })
deals := []storagemarket.MinerDeal{{
// Should be registered
State: storagemarket.StorageDealSealing,
SectorNumber: unsealedSector1,
Ref: &storagemarket.DataRef{
PieceCid: &pieceCidUnsealed,
},
}, {
// Should be registered with lazy registration (because sector is sealed)
State: storagemarket.StorageDealSealing,
SectorNumber: sealedSector,
Ref: &storagemarket.DataRef{
PieceCid: &pieceCidSealed,
},
}, {
// Should be ignored because deal is no longer active
State: storagemarket.StorageDealError,
SectorNumber: unsealedSector2,
Ref: &storagemarket.DataRef{
PieceCid: &pieceCidUnsealed2,
},
}, {
// Should be ignored because deal is not yet sealing
State: storagemarket.StorageDealFundsReserved,
SectorNumber: unsealedSector3,
Ref: &storagemarket.DataRef{
PieceCid: &pieceCidUnsealed3,
},
}}
cfg := config.DefaultStorageMiner().DAGStore
cfg.RootDir = t.TempDir()
mapi := NewMinerAPI(ps, providerNode, 10)
dagst, w, err := NewDAGStore(cfg, mapi)
require.NoError(t, err)
require.NotNil(t, dagst)
require.NotNil(t, w)
err = dagst.Start(context.Background())
require.NoError(t, err)
migrated, err := w.MigrateDeals(ctx, deals)
require.True(t, migrated)
require.NoError(t, err)
info := dagst.AllShardsInfo()
require.Len(t, info, 2)
for _, i := range info {
require.Equal(t, dagstore.ShardStateNew, i.ShardState)
}
// Run register shard migration again
migrated, err = w.MigrateDeals(ctx, deals)
require.False(t, migrated)
require.NoError(t, err)
// ps.VerifyExpectations(t)
}

View File

@ -29,10 +29,8 @@ func TestWrapperAcquireRecovery(t *testing.T) {
// Create a DAG store wrapper
dagst, w, err := NewDAGStore(config.DAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
DatastoreDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
RootDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
}, mockLotusMount{})
require.NoError(t, err)
@ -82,10 +80,8 @@ func TestWrapperBackground(t *testing.T) {
// Create a DAG store wrapper
dagst, w, err := NewDAGStore(config.DAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
DatastoreDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
RootDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
}, mockLotusMount{})
require.NoError(t, err)

View File

@ -189,9 +189,6 @@ func DefaultStorageMiner() *StorageMiner {
DealPublishControl: []string{},
},
// The default DAGStoreConfig doesn't define any paths for transients,
// indices and the datastore. Empty values will lead to these being
// placed under <repo>/dagStore.
DAGStore: DAGStoreConfig{
MaxConcurrentIndex: 5,
MaxConcurrentReadyFetches: 2,

View File

@ -54,23 +54,19 @@ type StorageMiner struct {
}
type DAGStoreConfig struct {
// Path to the transients directory. The transients directory caches
// unsealed deals that have been fetched from the storage subsystem for
// serving retrievals. When empty or omitted, the default value applies.
// Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment)
// or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment)
TransientsDir string
// Path to indices directory. When empty or omitted, the default value applies.
// Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment)
// or $LOTUS_MINER_PATH/dagStore/index (monolith deployment)
IndexDir string
// Path to datastore directory. The datastore is a KV store tracking the
// state of shards known to the DAG store.
// Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment)
// or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment)
DatastoreDir string
// Path to the dagstore root directory. This directory contains three
// subdirectories, which can be symlinked to alternative locations if
// need be:
//
// - ./transients: caches unsealed deals that have been fetched from the
// storage subsystem for serving retrievals.
// - ./indices: stores shard indices.
// - ./datastore: holds the KV store tracking the state of every shard
// known to the DAG store.
//
// Default value: $LOTUS_MARKETS_PATH/dagStore (split deployment) or
// $LOTUS_MINER_PATH/dagStore (monolith deployment)
RootDir string
// The maximum amount of indexing jobs that can run simultaneously.
// Default value: 5.

View File

@ -19,16 +19,6 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi"
@ -47,6 +37,17 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -590,13 +591,20 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return nil, err
}
dagStorePath := filepath.Join(r.Path(), DefaultDAGStoreDir)
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), dagStorePath, dsw, pieceStore, spn)
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dsw, pieceStore,
dataTransfer, spn, address.Address(minerAddress), storedAsk, shardMigrator, opt)
return storageimpl.NewProvider(
net,
namespace.Wrap(ds, datastore.NewKey("/deals/provider")),
store,
dsw,
pieceStore,
dataTransfer,
spn,
address.Address(minerAddress),
storedAsk,
opt,
)
}
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,

View File

@ -8,10 +8,11 @@ import (
"strconv"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
mdagstore "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -68,16 +69,9 @@ func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (
return nil, nil, err
}
// populate default directories if not explicitly set in the config.
defaultDir := filepath.Join(r.Path(), DefaultDAGStoreDir)
if cfg.TransientsDir == "" {
cfg.TransientsDir = filepath.Join(defaultDir, "transients")
}
if cfg.IndexDir == "" {
cfg.IndexDir = filepath.Join(defaultDir, "index")
}
if cfg.DatastoreDir == "" {
cfg.DatastoreDir = filepath.Join(defaultDir, "datastore")
// fall back to default root directory if not explicitly set in the config.
if cfg.RootDir == "" {
cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir)
}
v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)