From 7e02868ce2b5e4364ccb2fda79482fb9f8997dea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 12:10:12 +0100 Subject: [PATCH 01/14] consolidate blockstore utils. --- markets/dagstore/blockstore.go | 33 ++++++++++++++++++++++++++ markets/dagstore/readonlyblockstore.go | 33 -------------------------- markets/dagstore/wrapper.go | 9 +------ 3 files changed, 34 insertions(+), 41 deletions(-) create mode 100644 markets/dagstore/blockstore.go delete mode 100644 markets/dagstore/readonlyblockstore.go diff --git a/markets/dagstore/blockstore.go b/markets/dagstore/blockstore.go new file mode 100644 index 000000000..8980d40cf --- /dev/null +++ b/markets/dagstore/blockstore.go @@ -0,0 +1,33 @@ +package dagstore + +import ( + "io" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + bstore "github.com/ipfs/go-ipfs-blockstore" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore" +) + +// Blockstore promotes a dagstore.ReadBlockstore to a full closeable Blockstore, +// stubbing out the write methods with erroring implementations. +type Blockstore struct { + dagstore.ReadBlockstore + io.Closer +} + +var _ bstore.Blockstore = (*Blockstore)(nil) + +func (b *Blockstore) DeleteBlock(c cid.Cid) error { + return xerrors.Errorf("DeleteBlock called but not implemented") +} + +func (b *Blockstore) Put(block blocks.Block) error { + return xerrors.Errorf("Put called but not implemented") +} + +func (b *Blockstore) PutMany(blocks []blocks.Block) error { + return xerrors.Errorf("PutMany called but not implemented") +} diff --git a/markets/dagstore/readonlyblockstore.go b/markets/dagstore/readonlyblockstore.go deleted file mode 100644 index b8f19313a..000000000 --- a/markets/dagstore/readonlyblockstore.go +++ /dev/null @@ -1,33 +0,0 @@ -package dagstore - -import ( - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - bstore "github.com/ipfs/go-ipfs-blockstore" - "golang.org/x/xerrors" - - "github.com/filecoin-project/dagstore" -) - -// ReadOnlyBlockstore stubs out Blockstore mutators with methods that error out -type ReadOnlyBlockstore struct { - dagstore.ReadBlockstore -} - -func NewReadOnlyBlockstore(rbs dagstore.ReadBlockstore) bstore.Blockstore { - return ReadOnlyBlockstore{ReadBlockstore: rbs} -} - -func (r ReadOnlyBlockstore) DeleteBlock(c cid.Cid) error { - return xerrors.Errorf("DeleteBlock called but not implemented") -} - -func (r ReadOnlyBlockstore) Put(block blocks.Block) error { - return xerrors.Errorf("Put called but not implemented") -} - -func (r ReadOnlyBlockstore) PutMany(blocks []blocks.Block) error { - return xerrors.Errorf("PutMany called but not implemented") -} - -var _ bstore.Blockstore = (*ReadOnlyBlockstore)(nil) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 4c285be35..7ee4aad89 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -3,14 +3,12 @@ package dagstore import ( "context" "errors" - "io" "sync" "time" "github.com/filecoin-project/dagstore/index" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - bstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" @@ -46,11 +44,6 @@ type DAGStore interface { Start(ctx context.Context) error } -type closableBlockstore struct { - bstore.Blockstore - io.Closer -} - type Wrapper struct { ctx context.Context cancel context.CancelFunc @@ -216,7 +209,7 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl } log.Debugf("successfully loaded blockstore for piece CID %s", pieceCid) - return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil + return &Blockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil } func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { From f1d98361a6e4efbf13a56119dd825685385e0b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 12:17:50 +0100 Subject: [PATCH 02/14] small DAGStoreWrapper code reorg. --- node/builder_miner.go | 2 +- node/modules/storageminer.go | 95 --------------------- node/modules/storageminer_dagstore.go | 115 ++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 96 deletions(-) create mode 100644 node/modules/storageminer_dagstore.go diff --git a/node/builder_miner.go b/node/builder_miner.go index a9b38200b..469b7dc21 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -149,7 +149,7 @@ func ConfigStorageMiner(c interface{}) Option { // DAG Store Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor), - Override(new(*dagstore.Wrapper), modules.DagStoreWrapper), + Override(new(*dagstore.Wrapper), modules.DAGStoreWrapper), // Markets (retrieval) Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index bc54fef38..6207a5ff4 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -581,101 +581,6 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside } } -func NewLotusAccessor(lc fx.Lifecycle, - pieceStore dtypes.ProviderPieceStore, - rpn retrievalmarket.RetrievalProviderNode, -) (dagstore.LotusAccessor, error) { - mountApi := dagstore.NewLotusAccessor(pieceStore, rpn) - ready := make(chan error, 1) - pieceStore.OnReady(func(err error) { - ready <- err - }) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - if err := <-ready; err != nil { - return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err) - } - return mountApi.Start(ctx) - }, - OnStop: func(context.Context) error { - return nil - }, - }) - - return mountApi, nil -} - -func DagStoreWrapper( - lc fx.Lifecycle, - r repo.LockedRepo, - lotusAccessor dagstore.LotusAccessor, -) (*dagstore.Wrapper, error) { - dagStoreDir := filepath.Join(r.Path(), dagStore) - dagStoreDS, err := createDAGStoreDatastore(dagStoreDir) - if err != nil { - return nil, err - } - - var maxCopies = 2 - // TODO replace env with config.toml attribute. - v, ok := os.LookupEnv("LOTUS_DAGSTORE_COPY_CONCURRENCY") - if ok { - concurrency, err := strconv.Atoi(v) - if err == nil { - maxCopies = concurrency - } - } - - cfg := dagstore.MarketDAGStoreConfig{ - TransientsDir: filepath.Join(dagStoreDir, "transients"), - IndexDir: filepath.Join(dagStoreDir, "index"), - Datastore: dagStoreDS, - GCInterval: 1 * time.Minute, - MaxConcurrentIndex: 5, - MaxConcurrentReadyFetches: maxCopies, - } - - dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor) - if err != nil { - return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) - } - - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return dsw.Start(ctx) - }, - OnStop: func(context.Context) error { - return dsw.Close() - }, - }) - return dsw, nil -} - -// createDAGStoreDatastore creates a datastore under the given base directory -// for DAG store metadata -func createDAGStoreDatastore(baseDir string) (datastore.Batching, error) { - // Create a datastore directory under the base dir if it doesn't already exist - dsDir := path.Join(baseDir, "datastore") - if err := os.MkdirAll(dsDir, 0755); err != nil { - return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dsDir, err) - } - - // Create a new LevelDB datastore - ds, err := levelds.NewDatastore(dsDir, &levelds.Options{ - Compression: ldbopts.NoCompression, - NoSync: false, - Strict: ldbopts.StrictAll, - ReadOnly: false, - }) - if err != nil { - return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) - } - - // Keep statistics about the datastore - mds := measure.New("measure.", ds) - return mds, nil -} - func StorageProvider(minerAddress dtypes.MinerAddress, storedAsk *storedask.StoredAsk, h host.Host, ds dtypes.MetadataDS, diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go new file mode 100644 index 000000000..a6cf921e3 --- /dev/null +++ b/node/modules/storageminer_dagstore.go @@ -0,0 +1,115 @@ +package modules + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/lotus/markets/dagstore" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" + "github.com/ipfs/go-datastore" + levelds "github.com/ipfs/go-ds-leveldb" + measure "github.com/ipfs/go-ds-measure" + ldbopts "github.com/syndtr/goleveldb/leveldb/opt" + "go.uber.org/fx" + "golang.org/x/xerrors" +) + +func NewLotusAccessor(lc fx.Lifecycle, + pieceStore dtypes.ProviderPieceStore, + rpn retrievalmarket.RetrievalProviderNode, +) (dagstore.LotusAccessor, error) { + mountApi := dagstore.NewLotusAccessor(pieceStore, rpn) + ready := make(chan error, 1) + pieceStore.OnReady(func(err error) { + ready <- err + }) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := <-ready; err != nil { + return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err) + } + return mountApi.Start(ctx) + }, + OnStop: func(context.Context) error { + return nil + }, + }) + + return mountApi, nil +} + +func DAGStoreWrapper( + lc fx.Lifecycle, + r repo.LockedRepo, + lotusAccessor dagstore.LotusAccessor, +) (*dagstore.Wrapper, error) { + dir := filepath.Join(r.Path(), dagStore) + ds, err := newDAGStoreDatastore(dir) + if err != nil { + return nil, err + } + + var maxCopies = 2 + // TODO replace env with config.toml attribute. + v, ok := os.LookupEnv("LOTUS_DAGSTORE_COPY_CONCURRENCY") + if ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + maxCopies = concurrency + } + } + + cfg := dagstore.MarketDAGStoreConfig{ + TransientsDir: filepath.Join(dir, "transients"), + IndexDir: filepath.Join(dir, "index"), + Datastore: ds, + GCInterval: 1 * time.Minute, + MaxConcurrentIndex: 5, + MaxConcurrentReadyFetches: maxCopies, + } + + dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor) + if err != nil { + return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) + } + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return dsw.Start(ctx) + }, + OnStop: func(context.Context) error { + return dsw.Close() + }, + }) + return dsw, nil +} + +// newDAGStoreDatastore creates a datastore under the given base directory +// for dagstore metadata. +func newDAGStoreDatastore(baseDir string) (datastore.Batching, error) { + // Create a datastore directory under the base dir if it doesn't already exist + dsDir := filepath.Join(baseDir, "datastore") + if err := os.MkdirAll(dsDir, 0755); err != nil { + return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dsDir, err) + } + + // Create a new LevelDB datastore + ds, err := levelds.NewDatastore(dsDir, &levelds.Options{ + Compression: ldbopts.NoCompression, + NoSync: false, + Strict: ldbopts.StrictAll, + ReadOnly: false, + }) + if err != nil { + return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) + } + // Keep statistics about the datastore + mds := measure.New("measure.", ds) + return mds, nil +} From 407674614181ed11618a49a64e4fbab9c1575a46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 12:22:40 +0100 Subject: [PATCH 03/14] minor renaming. --- markets/dagstore/lotusaccessor.go | 6 +- markets/dagstore/lotusaccessor_test.go | 6 +- markets/dagstore/mount.go | 16 +++--- markets/dagstore/mount_test.go | 2 +- markets/dagstore/wrapper.go | 77 +++++++++++++------------- markets/dagstore/wrapper_test.go | 8 +-- node/builder_miner.go | 2 +- node/modules/storageminer_dagstore.go | 8 +-- 8 files changed, 63 insertions(+), 62 deletions(-) diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/lotusaccessor.go index 268720fae..fb624aef6 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/lotusaccessor.go @@ -30,7 +30,7 @@ var MaxConcurrentStorageCalls = func() int { return 100 }() -type LotusAccessor interface { +type MinerAPI interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) @@ -44,9 +44,9 @@ type lotusAccessor struct { readyMgr *shared.ReadyManager } -var _ LotusAccessor = (*lotusAccessor)(nil) +var _ MinerAPI = (*lotusAccessor)(nil) -func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) LotusAccessor { +func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) MinerAPI { return &lotusAccessor{ pieceStore: store, rm: rm, diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/lotusaccessor_test.go index 6d8513f56..38a865ebd 100644 --- a/markets/dagstore/lotusaccessor_test.go +++ b/markets/dagstore/lotusaccessor_test.go @@ -74,7 +74,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { rpn := &mockRPN{ sectors: mockData, } - api := NewLotusAccessor(ps, rpn) + api := NewMinerAPI(ps, rpn) require.NoError(t, api.Start(ctx)) // Add deals to piece store @@ -114,7 +114,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { ps := getPieceStore(t) rpn := &mockRPN{} - api := NewLotusAccessor(ps, rpn) + api := NewMinerAPI(ps, rpn) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 @@ -143,7 +143,7 @@ func TestThrottle(t *testing.T) { unsealedSectorID: "foo", }, } - api := NewLotusAccessor(ps, rpn) + api := NewMinerAPI(ps, rpn) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index f53c31c7c..3d3c2142f 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -18,7 +18,7 @@ var _ mount.Mount = (*LotusMount)(nil) // LotusMount is the Lotus implementation of a Sharded DAG Store Mount. // A Filecoin Piece is treated as a Shard by this implementation. type LotusMount struct { - Api LotusAccessor + API MinerAPI PieceCid cid.Cid } @@ -29,14 +29,14 @@ type LotusMount struct { // When the registry needs to deserialize a mount it clones the template then // calls Deserialize on the cloned instance, which will have a reference to the // lotus mount API supplied here. -func NewLotusMountTemplate(api LotusAccessor) *LotusMount { - return &LotusMount{Api: api} +func NewLotusMountTemplate(api MinerAPI) *LotusMount { + return &LotusMount{API: api} } -func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) { +func NewLotusMount(pieceCid cid.Cid, api MinerAPI) (*LotusMount, error) { return &LotusMount{ PieceCid: pieceCid, - Api: api, + API: api, }, nil } @@ -57,7 +57,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error { } func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { - r, err := l.Api.FetchUnsealedPiece(ctx, l.PieceCid) + r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid) if err != nil { return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err) } @@ -78,11 +78,11 @@ func (l *LotusMount) Close() error { } func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) { - size, err := l.Api.GetUnpaddedCARSize(ctx, l.PieceCid) + size, err := l.API.GetUnpaddedCARSize(ctx, l.PieceCid) if err != nil { return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err) } - isUnsealed, err := l.Api.IsUnsealed(ctx, l.PieceCid) + isUnsealed, err := l.API.IsUnsealed(ctx, l.PieceCid) if err != nil { return mount.Stat{}, xerrors.Errorf("failed to verify if we have the unsealed piece %s: %w", l.PieceCid, err) } diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 817b91760..1b3fa2c89 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -84,7 +84,7 @@ func TestLotusMountDeserialize(t *testing.T) { require.NoError(t, err) require.Equal(t, cid, mnt.PieceCid) - require.Equal(t, api, mnt.Api) + require.Equal(t, api, mnt.API) // fails if cid is not valid us = lotusScheme + "://" + "rand" diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 7ee4aad89..e2d8922f3 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -49,8 +49,8 @@ type Wrapper struct { cancel context.CancelFunc backgroundWg sync.WaitGroup - dagStore DAGStore - mountApi LotusAccessor + dagst DAGStore + mountApi MinerAPI failureCh chan dagstore.ShardResult traceCh chan dagstore.Trace gcInterval time.Duration @@ -58,7 +58,7 @@ type Wrapper struct { var _ shared.DagStoreWrapper = (*Wrapper)(nil) -func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrapper, error) { +func NewWrapper(cfg MarketDAGStoreConfig, mountApi MinerAPI) (*Wrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil { @@ -94,7 +94,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap } return &Wrapper{ - dagStore: dagStore, + dagst: dagStore, mountApi: mountApi, failureCh: failureCh, traceCh: traceCh, @@ -102,68 +102,69 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap }, nil } -func (ds *Wrapper) Start(ctx context.Context) error { - ds.ctx, ds.cancel = context.WithCancel(ctx) +func (w *Wrapper) Start(ctx context.Context) error { + w.ctx, w.cancel = context.WithCancel(ctx) // Run a go-routine to do DagStore GC. - ds.backgroundWg.Add(1) - go ds.dagStoreGCLoop() + w.backgroundWg.Add(1) + go w.gcLoop() // run a go-routine to read the trace for debugging. - ds.backgroundWg.Add(1) - go ds.traceLoop() + w.backgroundWg.Add(1) + go w.traceLoop() // Run a go-routine for shard recovery - if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok { - ds.backgroundWg.Add(1) - go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done) + if dss, ok := w.dagst.(*dagstore.DAGStore); ok { + w.backgroundWg.Add(1) + go dagstore.RecoverImmediately(w.ctx, dss, w.failureCh, maxRecoverAttempts, w.backgroundWg.Done) } - return ds.dagStore.Start(ctx) + return w.dagst.Start(ctx) } -func (ds *Wrapper) traceLoop() { - defer ds.backgroundWg.Done() +func (w *Wrapper) traceLoop() { + defer w.backgroundWg.Done() - for ds.ctx.Err() == nil { + for w.ctx.Err() == nil { select { // Log trace events from the DAG store - case tr := <-ds.traceCh: + case tr := <-w.traceCh: log.Debugw("trace", "shard-key", tr.Key.String(), "op-type", tr.Op.String(), "after", tr.After.String()) - case <-ds.ctx.Done(): + case <-w.ctx.Done(): return } } } -func (ds *Wrapper) dagStoreGCLoop() { - defer ds.backgroundWg.Done() +func (w *Wrapper) gcLoop() { + defer w.backgroundWg.Done() - gcTicker := time.NewTicker(ds.gcInterval) - defer gcTicker.Stop() + ticker := time.NewTicker(w.gcInterval) + defer ticker.Stop() - for ds.ctx.Err() == nil { + for w.ctx.Err() == nil { select { // GC the DAG store on every tick - case <-gcTicker.C: - _, _ = ds.dagStore.GC(ds.ctx) + case <-ticker.C: + _, _ = w.dagst.GC(w.ctx) // Exit when the DAG store wrapper is shutdown - case <-ds.ctx.Done(): + case <-w.ctx.Done(): return } } } -func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { +func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { log.Debugf("acquiring shard for piece CID %s", pieceCid) + key := shard.KeyFromCID(pieceCid) resch := make(chan dagstore.ShardResult, 1) - err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) + err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) log.Debugf("sent message to acquire shard for piece CID %s", pieceCid) if err != nil { @@ -178,13 +179,13 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl // we already have a transient file. However, we don't have it here // and therefore we pass an empty file path. carPath := "" - if err := shared.RegisterShardSync(ctx, ds, pieceCid, carPath, false); err != nil { + if err := shared.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil { return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) } log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) resch = make(chan dagstore.ShardResult, 1) - if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { + if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err) } } @@ -212,10 +213,10 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl return &Blockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil } -func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { +func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { // Create a lotus mount with the piece CID key := shard.KeyFromCID(pieceCid) - mt, err := NewLotusMount(pieceCid, ds.mountApi) + mt, err := NewLotusMount(pieceCid, w.mountApi) if err != nil { return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err) } @@ -225,7 +226,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath ExistingTransient: carPath, LazyInitialization: !eagerInit, } - err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts) + err = w.dagst.RegisterShard(ctx, key, mt, resch, opts) if err != nil { return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) } @@ -234,20 +235,20 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath return nil } -func (ds *Wrapper) Close() error { +func (w *Wrapper) Close() error { // Cancel the context - ds.cancel() + w.cancel() // Close the DAG store log.Info("will close the dagstore") - if err := ds.dagStore.Close(); err != nil { + if err := w.dagst.Close(); err != nil { return xerrors.Errorf("failed to close DAG store: %w", err) } log.Info("dagstore closed") // Wait for the background go routine to exit log.Info("waiting for dagstore background wrapper routines to exist") - ds.backgroundWg.Wait() + w.backgroundWg.Wait() log.Info("exited dagstore background warpper routines") return nil diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 967fdcd07..4aad4eb2d 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -26,7 +26,7 @@ func TestWrapperAcquireRecovery(t *testing.T) { require.NoError(t, err) // Create a DAG store wrapper - w, err := NewDagStoreWrapper(MarketDAGStoreConfig{ + w, err := NewWrapper(MarketDAGStoreConfig{ TransientsDir: t.TempDir(), IndexDir: t.TempDir(), GCInterval: time.Millisecond, @@ -45,7 +45,7 @@ func TestWrapperAcquireRecovery(t *testing.T) { }, register: make(chan shard.Key, 1), } - w.dagStore = mock + w.dagst = mock mybs, err := w.LoadShard(ctx, pieceCid) require.NoError(t, err) @@ -76,7 +76,7 @@ func TestWrapperBackground(t *testing.T) { ctx := context.Background() // Create a DAG store wrapper - w, err := NewDagStoreWrapper(MarketDAGStoreConfig{ + w, err := NewWrapper(MarketDAGStoreConfig{ TransientsDir: t.TempDir(), IndexDir: t.TempDir(), GCInterval: time.Millisecond, @@ -89,7 +89,7 @@ func TestWrapperBackground(t *testing.T) { recover: make(chan shard.Key, 1), close: make(chan struct{}, 1), } - w.dagStore = mock + w.dagst = mock // Start up the wrapper err = w.Start(ctx) diff --git a/node/builder_miner.go b/node/builder_miner.go index 469b7dc21..5b9b21084 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -148,7 +148,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), // DAG Store - Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor), + Override(new(dagstore.MinerAPI), modules.NewLotusAccessor), Override(new(*dagstore.Wrapper), modules.DAGStoreWrapper), // Markets (retrieval) diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index a6cf921e3..f251df1eb 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -23,8 +23,8 @@ import ( func NewLotusAccessor(lc fx.Lifecycle, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode, -) (dagstore.LotusAccessor, error) { - mountApi := dagstore.NewLotusAccessor(pieceStore, rpn) +) (dagstore.MinerAPI, error) { + mountApi := dagstore.NewMinerAPI(pieceStore, rpn) ready := make(chan error, 1) pieceStore.OnReady(func(err error) { ready <- err @@ -47,7 +47,7 @@ func NewLotusAccessor(lc fx.Lifecycle, func DAGStoreWrapper( lc fx.Lifecycle, r repo.LockedRepo, - lotusAccessor dagstore.LotusAccessor, + lotusAccessor dagstore.MinerAPI, ) (*dagstore.Wrapper, error) { dir := filepath.Join(r.Path(), dagStore) ds, err := newDAGStoreDatastore(dir) @@ -74,7 +74,7 @@ func DAGStoreWrapper( MaxConcurrentReadyFetches: maxCopies, } - dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor) + dsw, err := dagstore.NewWrapper(cfg, lotusAccessor) if err != nil { return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) } From bd3811e652dd1b2df91d26341d8d8656e283a300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 12:23:04 +0100 Subject: [PATCH 04/14] minor renaming. --- markets/dagstore/{lotusaccessor.go => miner_api.go} | 0 markets/dagstore/{lotusaccessor_test.go => miner_api_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename markets/dagstore/{lotusaccessor.go => miner_api.go} (100%) rename markets/dagstore/{lotusaccessor_test.go => miner_api_test.go} (100%) diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/miner_api.go similarity index 100% rename from markets/dagstore/lotusaccessor.go rename to markets/dagstore/miner_api.go diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/miner_api_test.go similarity index 100% rename from markets/dagstore/lotusaccessor_test.go rename to markets/dagstore/miner_api_test.go From 691da1499663980bd56ecf26ad8b5c442e0e914e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 12:23:36 +0100 Subject: [PATCH 05/14] minor renaming. --- markets/dagstore/miner_api.go | 14 +++++++------- markets/dagstore/mount_test.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/markets/dagstore/miner_api.go b/markets/dagstore/miner_api.go index fb624aef6..3b4c1e5e7 100644 --- a/markets/dagstore/miner_api.go +++ b/markets/dagstore/miner_api.go @@ -37,17 +37,17 @@ type MinerAPI interface { Start(ctx context.Context) error } -type lotusAccessor struct { +type minerAPI struct { pieceStore piecestore.PieceStore rm retrievalmarket.RetrievalProviderNode throttle throttle.Throttler readyMgr *shared.ReadyManager } -var _ MinerAPI = (*lotusAccessor)(nil) +var _ MinerAPI = (*minerAPI)(nil) func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) MinerAPI { - return &lotusAccessor{ + return &minerAPI{ pieceStore: store, rm: rm, throttle: throttle.Fixed(MaxConcurrentStorageCalls), @@ -55,11 +55,11 @@ func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProvid } } -func (m *lotusAccessor) Start(_ context.Context) error { +func (m *minerAPI) Start(_ context.Context) error { return m.readyMgr.FireReady(nil) } -func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { +func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { err := m.readyMgr.AwaitReady() if err != nil { return false, xerrors.Errorf("failed while waiting for accessor to start: %w", err) @@ -107,7 +107,7 @@ func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, return false, nil } -func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { +func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { err := m.readyMgr.AwaitReady() if err != nil { return nil, err @@ -179,7 +179,7 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid return nil, lastErr } -func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { +func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { err := m.readyMgr.AwaitReady() if err != nil { return 0, err diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 1b3fa2c89..5f3705d82 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -69,7 +69,7 @@ func TestLotusMount(t *testing.T) { } func TestLotusMountDeserialize(t *testing.T) { - api := &lotusAccessor{} + api := &minerAPI{} bgen := blocksutil.NewBlockGenerator() cid := bgen.Next().Cid() From c06c8541f9f8022be3c91e4392e82714dfe4c327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 22:46:37 +0100 Subject: [PATCH 06/14] refactor dagstore + carv2 integration (+). - Integrate with config.toml. - Export DAGStore to DI context in preparation for JSON-RPC and cli commands. - Renames. --- go.mod | 4 +- go.sum | 4 + markets/dagstore/miner_api.go | 20 +---- markets/dagstore/mount.go | 24 +++--- markets/dagstore/mount_test.go | 6 +- markets/dagstore/wrapper.go | 93 ++++++++++++--------- markets/dagstore/wrapper_test.go | 23 ++++- node/builder.go | 1 + node/builder_miner.go | 4 +- node/config/def.go | 9 ++ node/config/types.go | 14 +++- node/impl/client/client.go | 67 ++++++++------- node/impl/client/import.go | 101 +++++++++++++--------- node/impl/client/import_test.go | 77 +++++++++-------- node/modules/client.go | 2 +- node/modules/storageminer.go | 33 +++----- node/modules/storageminer_dagstore.go | 116 +++++++++++++------------- 17 files changed, 335 insertions(+), 263 deletions(-) diff --git a/go.mod b/go.mod index fc85120be..a18c34d33 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 - github.com/filecoin-project/dagstore v0.4.0 + github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 @@ -36,7 +36,7 @@ require ( github.com/filecoin-project/go-data-transfer v1.7.2 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1 + github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1 diff --git a/go.sum b/go.sum index d38c6a2c7..123eeca79 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6 github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/dagstore v0.4.0 h1:ZeoL5Gbgn4BSzKgghobfAO9a5nVqfVC4fQ/lNYn1GQo= github.com/filecoin-project/dagstore v0.4.0/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= +github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18 h1:wn+3OyRwQzS+iYEkc3sO33XU3g5fEzP81Guh73ZHthw= +github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -292,6 +294,8 @@ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0 github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1 h1:xXgCsfEaA3wdof/N5mkD7PphDolhc9MAsIrO/QBH5Pg= github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d h1:Izq16s1N4E2q+sSteCTT7fZsXC0ShxYjYNx9kI21AvE= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/dagstore/miner_api.go b/markets/dagstore/miner_api.go index 3b4c1e5e7..0da01c396 100644 --- a/markets/dagstore/miner_api.go +++ b/markets/dagstore/miner_api.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "os" - "strconv" "github.com/filecoin-project/dagstore/throttle" "github.com/ipfs/go-cid" @@ -16,20 +14,6 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) -// MaxConcurrentStorageCalls caps the amount of concurrent calls to the -// storage, so that we don't spam it during heavy processes like bulk migration. -var MaxConcurrentStorageCalls = func() int { - // TODO replace env with config.toml attribute. - v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY") - if ok { - concurrency, err := strconv.Atoi(v) - if err == nil { - return concurrency - } - } - return 100 -}() - type MinerAPI interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) @@ -46,11 +30,11 @@ type minerAPI struct { var _ MinerAPI = (*minerAPI)(nil) -func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) MinerAPI { +func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode, concurrency int) MinerAPI { return &minerAPI{ pieceStore: store, rm: rm, - throttle: throttle.Fixed(MaxConcurrentStorageCalls), + throttle: throttle.Fixed(concurrency), readyMgr: shared.NewReadyManager(), } } diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index 3d3c2142f..c97dcbf86 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -15,24 +15,25 @@ const lotusScheme = "lotus" var _ mount.Mount = (*LotusMount)(nil) -// LotusMount is the Lotus implementation of a Sharded DAG Store Mount. -// A Filecoin Piece is treated as a Shard by this implementation. -type LotusMount struct { - API MinerAPI - PieceCid cid.Cid -} - -// NewLotusMountTemplate is called when registering a mount with -// the DAG store registry. +// mountTemplate returns a templated LotusMount containing the supplied API. +// +// It is called when registering a mount type with the mount registry +// of the DAG store. It is used to reinstantiate mounts after a restart. // -// The DAG store registry receives an instance of the mount (a "template"). // When the registry needs to deserialize a mount it clones the template then // calls Deserialize on the cloned instance, which will have a reference to the // lotus mount API supplied here. -func NewLotusMountTemplate(api MinerAPI) *LotusMount { +func mountTemplate(api MinerAPI) *LotusMount { return &LotusMount{API: api} } +// LotusMount is a DAGStore mount implementation that fetches deal data +// from a PieceCID. +type LotusMount struct { + API MinerAPI + PieceCid cid.Cid +} + func NewLotusMount(pieceCid cid.Cid, api MinerAPI) (*LotusMount, error) { return &LotusMount{ PieceCid: pieceCid, @@ -51,7 +52,6 @@ func (l *LotusMount) Deserialize(u *url.URL) error { if err != nil { return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err) } - l.PieceCid = pieceCid return nil } diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 5f3705d82..09b255d6a 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -55,7 +55,7 @@ func TestLotusMount(t *testing.T) { // serialize url then deserialize from mount template -> should get back // the same mount url := mnt.Serialize() - mnt2 := NewLotusMountTemplate(mockLotusMountAPI) + mnt2 := mountTemplate(mockLotusMountAPI) err = mnt2.Deserialize(url) require.NoError(t, err) @@ -79,7 +79,7 @@ func TestLotusMountDeserialize(t *testing.T) { u, err := url.Parse(us) require.NoError(t, err) - mnt := NewLotusMountTemplate(api) + mnt := mountTemplate(api) err = mnt.Deserialize(u) require.NoError(t, err) @@ -111,7 +111,7 @@ func TestLotusMountRegistration(t *testing.T) { mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) registry := mount.NewRegistry() - err = registry.Register(lotusScheme, NewLotusMountTemplate(mockLotusMountAPI)) + err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI)) require.NoError(t, err) mnt, err := registry.Instantiate(u) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index e2d8922f3..5875aa275 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -3,82 +3,73 @@ package dagstore import ( "context" "errors" + "os" "sync" "time" - "github.com/filecoin-project/dagstore/index" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" + levelds "github.com/ipfs/go-ds-leveldb" + measure "github.com/ipfs/go-ds-measure" logging "github.com/ipfs/go-log/v2" + ldbopts "github.com/syndtr/goleveldb/leveldb/opt" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" - "github.com/filecoin-project/go-fil-markets/carstore" - "github.com/filecoin-project/go-fil-markets/shared" + + "github.com/filecoin-project/go-fil-markets/stores" ) const maxRecoverAttempts = 1 var log = logging.Logger("dagstore-wrapper") -// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. -type MarketDAGStoreConfig struct { - TransientsDir string - IndexDir string - Datastore ds.Datastore - MaxConcurrentIndex int - MaxConcurrentReadyFetches int - GCInterval time.Duration -} - -// DAGStore provides an interface for the DAG store that can be mocked out -// by tests -type DAGStore interface { - RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error - AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error - RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error - GC(ctx context.Context) (*dagstore.GCResult, error) - Close() error - Start(ctx context.Context) error -} - type Wrapper struct { ctx context.Context cancel context.CancelFunc backgroundWg sync.WaitGroup - dagst DAGStore + dagst dagstore.Interface mountApi MinerAPI failureCh chan dagstore.ShardResult traceCh chan dagstore.Trace gcInterval time.Duration } -var _ shared.DagStoreWrapper = (*Wrapper)(nil) +var _ stores.DAGStoreWrapper = (*Wrapper)(nil) -func NewWrapper(cfg MarketDAGStoreConfig, mountApi MinerAPI) (*Wrapper, error) { +func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGStore, *Wrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() - if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil { - return nil, xerrors.Errorf("failed to create registry: %w", err) + if err := registry.Register(lotusScheme, mountTemplate(mountApi)); err != nil { + return nil, nil, xerrors.Errorf("failed to create registry: %w", err) } // The dagstore will write Shard failures to the `failureCh` here. failureCh := make(chan dagstore.ShardResult, 1) + // The dagstore will write Trace events to the `traceCh` here. traceCh := make(chan dagstore.Trace, 32) + dstore, err := newDatastore(cfg.DatastoreDir) + if err != nil { + return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", cfg.DatastoreDir, err) + } + irepo, err := index.NewFSRepo(cfg.IndexDir) if err != nil { - return nil, xerrors.Errorf("failed to initialise dagstore index repo") + return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo") } dcfg := dagstore.Config{ TransientsDir: cfg.TransientsDir, IndexRepo: irepo, - Datastore: cfg.Datastore, + Datastore: dstore, MountRegistry: registry, FailureCh: failureCh, TraceCh: traceCh, @@ -88,18 +79,44 @@ func NewWrapper(cfg MarketDAGStoreConfig, mountApi MinerAPI) (*Wrapper, error) { MaxConcurrentReadyFetches: cfg.MaxConcurrentReadyFetches, RecoverOnStart: dagstore.RecoverOnAcquire, } - dagStore, err := dagstore.NewDAGStore(dcfg) + + dagst, err := dagstore.NewDAGStore(dcfg) if err != nil { - return nil, xerrors.Errorf("failed to create DAG store: %w", err) + return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) } - return &Wrapper{ - dagst: dagStore, + w := &Wrapper{ + dagst: dagst, mountApi: mountApi, failureCh: failureCh, traceCh: traceCh, gcInterval: cfg.GCInterval, - }, nil + } + + return dagst, w, nil +} + +// newDatastore creates a datastore under the given base directory +// for dagstore metadata. +func newDatastore(dir string) (ds.Batching, error) { + // Create the datastore directory if it doesn't exist yet. + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dir, err) + } + + // Create a new LevelDB datastore + dstore, err := levelds.NewDatastore(dir, &levelds.Options{ + Compression: ldbopts.NoCompression, + NoSync: false, + Strict: ldbopts.StrictAll, + ReadOnly: false, + }) + if err != nil { + return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) + } + // Keep statistics about the datastore + mds := measure.New("measure.", dstore) + return mds, nil } func (w *Wrapper) Start(ctx context.Context) error { @@ -159,7 +176,7 @@ func (w *Wrapper) gcLoop() { } } -func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { +func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) { log.Debugf("acquiring shard for piece CID %s", pieceCid) key := shard.KeyFromCID(pieceCid) @@ -179,7 +196,7 @@ func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Clo // we already have a transient file. However, we don't have it here // and therefore we pass an empty file path. carPath := "" - if err := shared.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil { + if err := stores.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil { return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) } log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 4aad4eb2d..1278253f0 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/filecoin-project/lotus/node/config" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore" @@ -26,13 +27,16 @@ func TestWrapperAcquireRecovery(t *testing.T) { require.NoError(t, err) // Create a DAG store wrapper - w, err := NewWrapper(MarketDAGStoreConfig{ + dagst, w, err := NewDAGStore(config.DAGStoreConfig{ TransientsDir: t.TempDir(), IndexDir: t.TempDir(), + DatastoreDir: t.TempDir(), GCInterval: time.Millisecond, }, mockLotusMount{}) require.NoError(t, err) + defer dagst.Close() + // Return an error from acquire shard the first time acquireShardErr := make(chan error, 1) acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown) @@ -76,13 +80,16 @@ func TestWrapperBackground(t *testing.T) { ctx := context.Background() // Create a DAG store wrapper - w, err := NewWrapper(MarketDAGStoreConfig{ + dagst, w, err := NewDAGStore(config.DAGStoreConfig{ TransientsDir: t.TempDir(), IndexDir: t.TempDir(), + DatastoreDir: t.TempDir(), GCInterval: time.Millisecond, }, mockLotusMount{}) require.NoError(t, err) + defer dagst.Close() + // Create a mock DAG store in place of the real DAG store mock := &mockDagStore{ gc: make(chan struct{}, 1), @@ -128,6 +135,18 @@ type mockDagStore struct { close chan struct{} } +func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error { + panic("implement me") +} + +func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) { + panic("implement me") +} + +func (m *mockDagStore) AllShardsInfo() dagstore.AllShardsInfo { + panic("implement me") +} + func (m *mockDagStore) Start(_ context.Context) error { return nil } diff --git a/node/builder.go b/node/builder.go index e0efd8462..397df7d49 100644 --- a/node/builder.go +++ b/node/builder.go @@ -66,6 +66,7 @@ var ( AutoNATSvcKey = special{10} // Libp2p option BandwidthReporterKey = special{11} // Libp2p option ConnGaterKey = special{12} // libp2p option + DAGStoreKey = special{13} // constructor returns multiple values ) type invoke int diff --git a/node/builder_miner.go b/node/builder_miner.go index 5b9b21084..0bfcfdd25 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -148,8 +148,8 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), // DAG Store - Override(new(dagstore.MinerAPI), modules.NewLotusAccessor), - Override(new(*dagstore.Wrapper), modules.DAGStoreWrapper), + Override(new(dagstore.MinerAPI), modules.NewMinerAPI), + Override(DAGStoreKey, modules.DAGStore), // Markets (retrieval) Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), diff --git a/node/config/def.go b/node/config/def.go index c5c455c68..06e48c731 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -188,6 +188,15 @@ func DefaultStorageMiner() *StorageMiner { TerminateControl: []string{}, DealPublishControl: []string{}, }, + + DAGStore: DAGStoreConfig{ + // zero value paths for transients, indices, datastore. + // by default they'll end up under the node repo. + MaxConcurrentIndex: 5, + MaxConcurrentReadyFetches: 2, + MaxConcurrencyStorageCalls: 100, + GCInterval: time.Minute, + }, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345" diff --git a/node/config/types.go b/node/config/types.go index fe42aa27e..ddcb27031 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -1,10 +1,11 @@ package config import ( - "github.com/ipfs/go-cid" + "time" "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/ipfs/go-cid" ) // // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE @@ -50,6 +51,17 @@ type StorageMiner struct { Storage sectorstorage.SealerConfig Fees MinerFeeConfig Addresses MinerAddressConfig + DAGStore DAGStoreConfig +} + +type DAGStoreConfig struct { + TransientsDir string + IndexDir string + DatastoreDir string + MaxConcurrentIndex int + MaxConcurrentReadyFetches int + MaxConcurrencyStorageCalls int + GCInterval time.Duration } type MinerSubsystemConfig struct { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index f4401324b..9dfb71019 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -10,8 +10,7 @@ import ( "sort" "time" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/ipld/go-car" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" @@ -26,7 +25,7 @@ import ( files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" - "github.com/ipld/go-car" + basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" @@ -47,6 +46,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" @@ -54,6 +54,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/markets/utils" @@ -188,17 +189,17 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs) result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{ - Addr: params.Wallet, - Info: &providerInfo, - Data: params.Data, - StartEpoch: dealStart, - EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), - Price: params.EpochPrice, - Collateral: params.ProviderCollateral, - Rt: st, - FastRetrieval: params.FastRetrieval, - VerifiedDeal: params.VerifiedDeal, - FilestoreCARv2FilePath: CARV2FilePath, + Addr: params.Wallet, + Info: &providerInfo, + Data: params.Data, + StartEpoch: dealStart, + EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), + Price: params.EpochPrice, + Collateral: params.ProviderCollateral, + Rt: st, + FastRetrieval: params.FastRetrieval, + VerifiedDeal: params.VerifiedDeal, + IndexedCAR: CARV2FilePath, }) if err != nil { @@ -519,7 +520,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor } }() - root, err = a.importNormalFileToFilestoreCARv2(ctx, id, ref.Path, carFile) + root, err = a.doImport(ctx, ref.Path, carFile) if err != nil { return nil, xerrors.Errorf("failed to import normal file to CARv2: %w", err) } @@ -1031,21 +1032,21 @@ func (w *lenWriter) Write(p []byte) (n int, err error) { } func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) { - fc, err := a.imgr().FilestoreCARV2FilePathFor(root) + path, err := a.imgr().FilestoreCARV2FilePathFor(root) if err != nil { return api.DataSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err) } - if len(fc) == 0 { + if len(path) == 0 { return api.DataSize{}, xerrors.New("no CARv2 file for root") } - rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(fc) + fs, err := stores.ReadOnlyFilestore(path) if err != nil { - return api.DataSize{}, xerrors.Errorf("failed to open read only filestore: %w", err) + return api.DataSize{}, xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", path, err) } - defer rdOnly.Close() //nolint:errcheck + defer fs.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly))) + dag := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) w := lenWriter(0) err = car.WriteCar(ctx, dag, []cid.Cid{root}, &w) @@ -1062,21 +1063,21 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e } func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) { - fc, err := a.imgr().FilestoreCARV2FilePathFor(root) + path, err := a.imgr().FilestoreCARV2FilePathFor(root) if err != nil { return api.DataCIDSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err) } - if len(fc) == 0 { + if len(path) == 0 { return api.DataCIDSize{}, xerrors.New("no CARv2 file for root") } - rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(fc) + fs, err := stores.ReadOnlyFilestore(path) if err != nil { - return api.DataCIDSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err) + return api.DataCIDSize{}, xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", path, err) } - defer rdOnly.Close() //nolint:errcheck + defer fs.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly))) + dag := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) w := &writer.Writer{} bw := bufio.NewWriterSize(w, int(writer.CommPBuf)) @@ -1095,22 +1096,20 @@ func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCID func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { id := importmgr.ImportID(rand.Uint64()) - tmpCARv2File, err := a.imgr().NewTempFile(id) + tmp, err := a.imgr().NewTempFile(id) if err != nil { return xerrors.Errorf("failed to create temp file: %w", err) } - defer os.Remove(tmpCARv2File) //nolint:errcheck + defer os.Remove(tmp) //nolint:errcheck - root, err := a.importNormalFileToFilestoreCARv2(ctx, id, ref.Path, tmpCARv2File) + root, err := a.doImport(ctx, ref.Path, tmp) if err != nil { return xerrors.Errorf("failed to import normal file to CARv2") } - // generate a deterministic CARv1 payload from the UnixFS DAG by doing an IPLD - // traversal over the Unixfs DAG in the CARv2 file using the "all selector" i.e the entire DAG selector. - fs, err := filestorecaradapter.NewReadOnlyFileStore(tmpCARv2File) + fs, err := stores.ReadOnlyFilestore(tmp) if err != nil { - return xerrors.Errorf("failed to open read only CARv2 blockstore: %w", err) + return xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", tmp, err) } defer fs.Close() //nolint:errcheck diff --git a/node/impl/client/import.go b/node/impl/client/import.go index eb0ed5472..07db53bb1 100644 --- a/node/impl/client/import.go +++ b/node/impl/client/import.go @@ -2,18 +2,14 @@ package client import ( "context" + "io/ioutil" "os" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" - bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-filestore" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" files2 "github.com/ipfs/go-ipfs-files" @@ -21,56 +17,81 @@ import ( "github.com/ipfs/go-merkledag" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" "golang.org/x/xerrors" ) -// importNormalFileToFilestoreCARv2 transforms the client's "normal file" to a Unixfs IPLD DAG and writes out the DAG to a CARv2 file -// that can be used to back a filestore. -func (a *API) importNormalFileToFilestoreCARv2(ctx context.Context, importID importmgr.ImportID, inputFilePath string, outputCARv2Path string) (c cid.Cid, finalErr error) { - - // TODO: We've currently put in a hack to create the Unixfs DAG as a CARv2 without using Badger. - // We first create the Unixfs DAG using a filestore to get the root of the Unixfs DAG. - // We can't create the UnixfsDAG right away using a CARv2 read-write blockstore as the blockstore - // needs the root of the DAG during instantiation to write out a valid CARv2 file. +// doImport takes a standard file (src), forms a UnixFS DAG, and writes a +// CARv2 file with positional mapping (backed by the go-filestore library). +func (a *API) doImport(ctx context.Context, src string, dst string) (cid.Cid, error) { + // This method uses a two-phase approach with a staging CAR blockstore and + // a final CAR blockstore. // - // In the second pass, we create a CARv2 file with the root present using the root node we get in the above step. - // This hack should be fixed when CARv2 allows specifying the root AFTER finishing the CARv2 streaming write. - fm := filestore.NewFileManager(ds_sync.MutexWrap(datastore.NewMapDatastore()), "/") - fm.AllowFiles = true - fstore := filestore.NewFilestore(bstore.NewMemorySync(), fm) + // This is necessary because of https://github.com/ipld/go-car/issues/196 + // + // TODO: do we need to chunk twice? Isn't the first output already in the + // right order? Can't we just copy the CAR file and replace the header? + f, err := ioutil.TempFile("", "") + if err != nil { + return cid.Undef, xerrors.Errorf("failed to create temp file: %w", err) + } + _ = f.Close() // close; we only want the path. + tmp := f.Name() + defer os.Remove(tmp) + + // Step 1. Compute the UnixFS DAG and write it to a CARv2 file to get + // the root CID of the DAG. + fstore, err := stores.ReadWriteFilestore(tmp) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to create temporary filestore: %w", err) + } + bsvc := blockservice.New(fstore, offline.Exchange(fstore)) - defer bsvc.Close() //nolint:errcheck + dags := merkledag.NewDAGService(bsvc) - // ---- First Pass --- Write out the UnixFS DAG to a rootless CARv2 file by instantiating a read-write CARv2 blockstore without the root. - root, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc)) + root, err := buildUnixFS(ctx, src, dags) if err != nil { - return cid.Undef, xerrors.Errorf("failed to import file to store: %w", err) + _ = fstore.Close() + return cid.Undef, xerrors.Errorf("failed to import file to store to compute root: %w", err) } - //------ Second Pass --- Now that we have the root of the Unixfs DAG -> write out the Unixfs DAG to a CARv2 file with the root present by using a - // filestore backed by a read-write CARv2 blockstore. - fsb, err := filestorecaradapter.NewReadWriteFileStore(outputCARv2Path, []cid.Cid{root}) - if err != nil { - return cid.Undef, xerrors.Errorf("failed to create a CARv2 read-write blockstore: %w", err) - } - defer fsb.Close() //nolint:errcheck - - bsvc = blockservice.New(fsb, offline.Exchange(fsb)) - root2, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc)) - if err != nil { - return cid.Undef, xerrors.Errorf("failed to create Unixfs DAG with CARv2 blockstore: %w", err) + if err := fstore.Close(); err != nil { + return cid.Undef, xerrors.Errorf("failed to finalize car filestore: %w", err) } - if root != root2 { + // Step 2. We now have the root of the UnixFS DAG, and we can write the + // final CAR for real under `dst`. + bs, err := blockstore.OpenReadWrite(dst, []cid.Cid{root}, + car.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true)) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to create a carv2 read/write blockstore: %w", err) + } + + bsvc = blockservice.New(bs, offline.Exchange(bs)) + dags = merkledag.NewDAGService(bsvc) + finalRoot, err := buildUnixFS(ctx, src, dags) + if err != nil { + _ = bs.Close() + return cid.Undef, xerrors.Errorf("failed to create UnixFS DAG with carv2 blockstore: %w", err) + } + + if err := bs.Finalize(); err != nil { + return cid.Undef, xerrors.Errorf("failed to finalize car blockstore: %w", err) + } + + if root != finalRoot { return cid.Undef, xerrors.New("roots do not match") } return root, nil } -// importNormalFileToUnixfsDAG transforms a client's normal file to a UnixfsDAG and imports the DAG to the given DAG service. -func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag ipld.DAGService) (cid.Cid, error) { - f, err := os.Open(inputFilePath) +// buildUnixFS builds a UnixFS DAG out of the supplied ordinary file, +// and imports the DAG into the supplied service. +func buildUnixFS(ctx context.Context, src string, dag ipld.DAGService) (cid.Cid, error) { + f, err := os.Open(src) if err != nil { return cid.Undef, xerrors.Errorf("failed to open input file: %w", err) } @@ -81,7 +102,7 @@ func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag return cid.Undef, xerrors.Errorf("failed to stat file :%w", err) } - file, err := files2.NewReaderPathFile(inputFilePath, f, stat) + file, err := files2.NewReaderPathFile(src, f, stat) if err != nil { return cid.Undef, xerrors.Errorf("failed to create reader path file: %w", err) } diff --git a/node/impl/client/import_test.go b/node/impl/client/import_test.go index aea8b6adb..a6d580e5c 100644 --- a/node/impl/client/import_test.go +++ b/node/impl/client/import_test.go @@ -4,12 +4,11 @@ import ( "context" "io" "io/ioutil" - "math/rand" "os" "strings" "testing" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -22,39 +21,50 @@ import ( "github.com/stretchr/testify/require" ) -func TestImportNormalFileToUnixfsDAG(t *testing.T) { +// This test uses a full "dense" CARv2, and not a filestore (positional mapping). +func TestRoundtripUnixFS_Dense(t *testing.T) { ctx := context.Background() - inputPath, inputContents := genNormalInputFile(t) + + inputPath, inputContents := genInputFile(t) defer os.Remove(inputPath) //nolint:errcheck - carv2File := genTmpFile(t) + + carv2File := newTmpFile(t) defer os.Remove(carv2File) //nolint:errcheck - // import a normal file to a Unixfs DAG using a CARv2 read-write blockstore and flush it out to a CARv2 file. - tempCARv2Store, err := blockstore.OpenReadWrite(carv2File, []cid.Cid{}, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) + // import a file to a Unixfs DAG using a CARv2 read/write blockstore. + path, err := blockstore.OpenReadWrite(carv2File, nil, + carv2.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true)) require.NoError(t, err) - bsvc := blockservice.New(tempCARv2Store, offline.Exchange(tempCARv2Store)) - root, err := importNormalFileToUnixfsDAG(ctx, inputPath, merkledag.NewDAGService(bsvc)) + + bsvc := blockservice.New(path, offline.Exchange(path)) + dags := merkledag.NewDAGService(bsvc) + + root, err := buildUnixFS(ctx, inputPath, dags) require.NoError(t, err) require.NotEqual(t, cid.Undef, root) - require.NoError(t, tempCARv2Store.Finalize()) + require.NoError(t, path.Finalize()) - // convert the CARv2 file to a normal file again and ensure the contents match. - readOnly, err := blockstore.OpenReadOnly(carv2File, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) + // reconstruct the file. + readOnly, err := blockstore.OpenReadOnly(carv2File, + carv2.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true)) require.NoError(t, err) defer readOnly.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly))) - nd, err := dag.Get(ctx, root) - require.NoError(t, err) - file, err := unixfile.NewUnixfsFile(ctx, dag, nd) + dags = merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly))) + + nd, err := dags.Get(ctx, root) require.NoError(t, err) - tmpOutput := genTmpFile(t) + file, err := unixfile.NewUnixfsFile(ctx, dags, nd) + require.NoError(t, err) + + tmpOutput := newTmpFile(t) defer os.Remove(tmpOutput) //nolint:errcheck require.NoError(t, files.WriteTo(file, tmpOutput)) // ensure contents of the initial input file and the output file are identical. - fo, err := os.Open(tmpOutput) require.NoError(t, err) bz2, err := ioutil.ReadAll(fo) @@ -63,35 +73,36 @@ func TestImportNormalFileToUnixfsDAG(t *testing.T) { require.Equal(t, inputContents, bz2) } -func TestImportNormalFileToCARv2(t *testing.T) { +func TestRoundtripUnixFS_Filestore(t *testing.T) { ctx := context.Background() a := &API{ Imports: &importmgr.Mgr{}, } - importID := importmgr.ImportID(rand.Uint64()) - inputFilePath, inputContents := genNormalInputFile(t) + inputFilePath, inputContents := genInputFile(t) defer os.Remove(inputFilePath) //nolint:errcheck - outputCARv2 := genTmpFile(t) - defer os.Remove(outputCARv2) //nolint:errcheck + path := newTmpFile(t) + defer os.Remove(path) //nolint:errcheck - root, err := a.importNormalFileToFilestoreCARv2(ctx, importID, inputFilePath, outputCARv2) + root, err := a.doImport(ctx, inputFilePath, path) require.NoError(t, err) require.NotEqual(t, cid.Undef, root) // convert the CARv2 to a normal file again and ensure the contents match - readOnly, err := filestorecaradapter.NewReadOnlyFileStore(outputCARv2) + fs, err := stores.ReadOnlyFilestore(path) require.NoError(t, err) - defer readOnly.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly))) + defer fs.Close() //nolint:errcheck - nd, err := dag.Get(ctx, root) - require.NoError(t, err) - file, err := unixfile.NewUnixfsFile(ctx, dag, nd) + dags := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) + + nd, err := dags.Get(ctx, root) require.NoError(t, err) - tmpOutput := genTmpFile(t) + file, err := unixfile.NewUnixfsFile(ctx, dags, nd) + require.NoError(t, err) + + tmpOutput := newTmpFile(t) defer os.Remove(tmpOutput) //nolint:errcheck require.NoError(t, files.WriteTo(file, tmpOutput)) @@ -104,14 +115,14 @@ func TestImportNormalFileToCARv2(t *testing.T) { require.Equal(t, inputContents, bz2) } -func genTmpFile(t *testing.T) string { +func newTmpFile(t *testing.T) string { f, err := os.CreateTemp("", "") require.NoError(t, err) require.NoError(t, f.Close()) return f.Name() } -func genNormalInputFile(t *testing.T) (filepath string, contents []byte) { +func genInputFile(t *testing.T) (filepath string, contents []byte) { s := strings.Repeat("abcde", 100) tmp, err := os.CreateTemp("", "") require.NoError(t, err) diff --git a/node/modules/client.go b/node/modules/client.go index 44b555556..b7757f8df 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -182,7 +182,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) { - carsPath := filepath.Join(r.Path(), dagStore, "retrieval-cars") + carsPath := filepath.Join(r.Path(), DefaultDAGStoreDir, "retrieval-cars") if err := os.MkdirAll(carsPath, 0755); err != nil { return nil, xerrors.Errorf("failed to create dir") diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 6207a5ff4..f9107006e 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -7,9 +7,7 @@ import ( "fmt" "net/http" "os" - "path" "path/filepath" - "strconv" "strings" "time" @@ -17,22 +15,6 @@ import ( "go.uber.org/multierr" "golang.org/x/xerrors" - "github.com/ipfs/go-bitswap" - "github.com/ipfs/go-bitswap/network" - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - levelds "github.com/ipfs/go-ds-leveldb" - measure "github.com/ipfs/go-ds-measure" - graphsync "github.com/ipfs/go-graphsync/impl" - gsnet "github.com/ipfs/go-graphsync/network" - "github.com/ipfs/go-graphsync/storeutil" - "github.com/ipfs/go-merkledag" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" - ldbopts "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/filecoin-project/go-address" dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtnet "github.com/filecoin-project/go-data-transfer/network" @@ -52,6 +34,18 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + graphsync "github.com/ipfs/go-graphsync/impl" + gsnet "github.com/ipfs/go-graphsync/network" + "github.com/ipfs/go-graphsync/storeutil" + "github.com/ipfs/go-merkledag" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/routing" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" @@ -82,7 +76,6 @@ import ( ) var StorageCounterDSPrefix = "/storage/nextid" -var dagStore = "dagStore" func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { maddrb, err := ds.Get(datastore.NewKey("miner-address")) @@ -597,7 +590,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, return nil, err } - dagStorePath := filepath.Join(r.Path(), dagStore) + dagStorePath := filepath.Join(r.Path(), DefaultDAGStoreDir) opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), dagStorePath, dsw, pieceStore, spn) diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index f251df1eb..88b9db0e8 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -6,25 +6,39 @@ import ( "os" "path/filepath" "strconv" - "time" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/lotus/markets/dagstore" + mdagstore "github.com/filecoin-project/lotus/markets/dagstore" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" - "github.com/ipfs/go-datastore" - levelds "github.com/ipfs/go-ds-leveldb" - measure "github.com/ipfs/go-ds-measure" - ldbopts "github.com/syndtr/goleveldb/leveldb/opt" "go.uber.org/fx" "golang.org/x/xerrors" ) -func NewLotusAccessor(lc fx.Lifecycle, - pieceStore dtypes.ProviderPieceStore, - rpn retrievalmarket.RetrievalProviderNode, -) (dagstore.MinerAPI, error) { - mountApi := dagstore.NewMinerAPI(pieceStore, rpn) +const ( + EnvDAGStoreCopyConcurrency = "LOTUS_DAGSTORE_COPY_CONCURRENCY" + DefaultDAGStoreDir = "dagStore" +) + +// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts. +func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode) (mdagstore.MinerAPI, error) { + cfg, err := extractDAGStoreConfig(r) + if err != nil { + return nil, err + } + + // caps the amount of concurrent calls to the storage, so that we don't + // spam it during heavy processes like bulk migration. + if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + cfg.MaxConcurrencyStorageCalls = concurrency + } + } + + mountApi := mdagstore.NewMinerAPI(pieceStore, rpn, cfg.MaxConcurrencyStorageCalls) ready := make(chan error, 1) pieceStore.OnReady(func(err error) { ready <- err @@ -44,72 +58,60 @@ func NewLotusAccessor(lc fx.Lifecycle, return mountApi, nil } -func DAGStoreWrapper( - lc fx.Lifecycle, - r repo.LockedRepo, - lotusAccessor dagstore.MinerAPI, -) (*dagstore.Wrapper, error) { - dir := filepath.Join(r.Path(), dagStore) - ds, err := newDAGStoreDatastore(dir) +// DAGStore constructs a DAG store using the supplied minerAPI, and the +// user configuration. It returns both the DAGStore and the Wrapper suitable for +// passing to markets. +func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { + cfg, err := extractDAGStoreConfig(r) if err != nil { - return nil, err + return nil, nil, err } - var maxCopies = 2 - // TODO replace env with config.toml attribute. - v, ok := os.LookupEnv("LOTUS_DAGSTORE_COPY_CONCURRENCY") + // populate default directories if not explicitly set in the config. + defaultDir := filepath.Join(r.Path(), DefaultDAGStoreDir) + if cfg.TransientsDir == "" { + cfg.TransientsDir = filepath.Join(defaultDir, "transients") + } + if cfg.IndexDir == "" { + cfg.IndexDir = filepath.Join(defaultDir, "index") + } + if cfg.DatastoreDir == "" { + cfg.DatastoreDir = filepath.Join(defaultDir, "datastore") + } + + v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency) if ok { concurrency, err := strconv.Atoi(v) if err == nil { - maxCopies = concurrency + cfg.MaxConcurrentReadyFetches = concurrency } } - cfg := dagstore.MarketDAGStoreConfig{ - TransientsDir: filepath.Join(dir, "transients"), - IndexDir: filepath.Join(dir, "index"), - Datastore: ds, - GCInterval: 1 * time.Minute, - MaxConcurrentIndex: 5, - MaxConcurrentReadyFetches: maxCopies, - } - - dsw, err := dagstore.NewWrapper(cfg, lotusAccessor) + dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI) if err != nil { - return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) + return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) } lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - return dsw.Start(ctx) + return w.Start(ctx) }, OnStop: func(context.Context) error { - return dsw.Close() + return w.Close() }, }) - return dsw, nil + + return dagst, w, nil } -// newDAGStoreDatastore creates a datastore under the given base directory -// for dagstore metadata. -func newDAGStoreDatastore(baseDir string) (datastore.Batching, error) { - // Create a datastore directory under the base dir if it doesn't already exist - dsDir := filepath.Join(baseDir, "datastore") - if err := os.MkdirAll(dsDir, 0755); err != nil { - return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dsDir, err) - } - - // Create a new LevelDB datastore - ds, err := levelds.NewDatastore(dsDir, &levelds.Options{ - Compression: ldbopts.NoCompression, - NoSync: false, - Strict: ldbopts.StrictAll, - ReadOnly: false, - }) +func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) { + cfg, err := r.Config() if err != nil { - return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) + return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err) } - // Keep statistics about the datastore - mds := measure.New("measure.", ds) - return mds, nil + mcfg, ok := cfg.(config.StorageMiner) + if !ok { + return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg) + } + return mcfg.DAGStore, nil } From da28416598ad907209742651e5735eb50dcaba43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 23:22:46 +0100 Subject: [PATCH 07/14] add docs to config parameters. --- markets/dagstore/wrapper.go | 2 +- markets/dagstore/wrapper_test.go | 19 ++++++------ node/config/def.go | 2 +- node/config/types.go | 43 ++++++++++++++++++++++----- node/modules/storageminer_dagstore.go | 7 +++-- 5 files changed, 51 insertions(+), 22 deletions(-) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 5875aa275..620c97abe 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -90,7 +90,7 @@ func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGSto mountApi: mountApi, failureCh: failureCh, traceCh: traceCh, - gcInterval: cfg.GCInterval, + gcInterval: time.Duration(cfg.GCIntervalMillis) * time.Millisecond, } return dagst, w, nil diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 1278253f0..c6e4e2f38 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -8,9 +8,10 @@ import ( "testing" "time" - "github.com/filecoin-project/lotus/node/config" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" @@ -28,10 +29,10 @@ func TestWrapperAcquireRecovery(t *testing.T) { // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ - TransientsDir: t.TempDir(), - IndexDir: t.TempDir(), - DatastoreDir: t.TempDir(), - GCInterval: time.Millisecond, + TransientsDir: t.TempDir(), + IndexDir: t.TempDir(), + DatastoreDir: t.TempDir(), + GCIntervalMillis: 1, }, mockLotusMount{}) require.NoError(t, err) @@ -81,10 +82,10 @@ func TestWrapperBackground(t *testing.T) { // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ - TransientsDir: t.TempDir(), - IndexDir: t.TempDir(), - DatastoreDir: t.TempDir(), - GCInterval: time.Millisecond, + TransientsDir: t.TempDir(), + IndexDir: t.TempDir(), + DatastoreDir: t.TempDir(), + GCIntervalMillis: 1, }, mockLotusMount{}) require.NoError(t, err) diff --git a/node/config/def.go b/node/config/def.go index 06e48c731..9b76a2226 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -195,7 +195,7 @@ func DefaultStorageMiner() *StorageMiner { MaxConcurrentIndex: 5, MaxConcurrentReadyFetches: 2, MaxConcurrencyStorageCalls: 100, - GCInterval: time.Minute, + GCIntervalMillis: 60000, }, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" diff --git a/node/config/types.go b/node/config/types.go index ddcb27031..b3f038df4 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -1,11 +1,10 @@ package config import ( - "time" + "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" - "github.com/ipfs/go-cid" ) // // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE @@ -55,13 +54,41 @@ type StorageMiner struct { } type DAGStoreConfig struct { - TransientsDir string - IndexDir string - DatastoreDir string - MaxConcurrentIndex int - MaxConcurrentReadyFetches int + // Path to the transients directory. The transients directory caches + // unsealed deals that have been fetched from the storage subsystem for + // serving retrievals. When empty or omitted, the default value applies. + // Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment) + // or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment) + TransientsDir string + + // Path to indices directory. When empty or omitted, the default value applies. + // Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment) + // or $LOTUS_MINER_PATH/dagStore/index (monolith deployment) + IndexDir string + + // Path to datastore directory. The datastore is a KV store tracking the + // state of shards known to the DAG store. + // Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment) + // or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment) + DatastoreDir string + + // The maximum amount of indexing jobs that can run simultaneously. + // Default value: 5. + MaxConcurrentIndex int + + // The maximum amount of unsealed deals that can be fetched simultaneously + // from the storage subsystem. + // Default value: 2. + MaxConcurrentReadyFetches int + + // The maximum number of simultaneous inflight API calls to the storage + // subsystem. + // Default value: 100. MaxConcurrencyStorageCalls int - GCInterval time.Duration + + // The number of milliseconds between calls to periodic dagstore GC. + // Default value: 60000 (60 seconds = 1 minute). + GCIntervalMillis int } type MinerSubsystemConfig struct { diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index 88b9db0e8..6f032ed34 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -9,12 +9,13 @@ import ( "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "go.uber.org/fx" + "golang.org/x/xerrors" + mdagstore "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" - "go.uber.org/fx" - "golang.org/x/xerrors" ) const ( @@ -109,7 +110,7 @@ func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) { if err != nil { return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err) } - mcfg, ok := cfg.(config.StorageMiner) + mcfg, ok := cfg.(*config.StorageMiner) if !ok { return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg) } From be24acbfbe1f83574a821bf285c8af6583f084f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 23:43:37 +0100 Subject: [PATCH 08/14] go mod tidy. --- go.sum | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go.sum b/go.sum index 123eeca79..7993576d9 100644 --- a/go.sum +++ b/go.sum @@ -257,8 +257,6 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= -github.com/filecoin-project/dagstore v0.4.0 h1:ZeoL5Gbgn4BSzKgghobfAO9a5nVqfVC4fQ/lNYn1GQo= -github.com/filecoin-project/dagstore v0.4.0/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18 h1:wn+3OyRwQzS+iYEkc3sO33XU3g5fEzP81Guh73ZHthw= github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -292,8 +290,6 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1 h1:xXgCsfEaA3wdof/N5mkD7PphDolhc9MAsIrO/QBH5Pg= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d h1:Izq16s1N4E2q+sSteCTT7fZsXC0ShxYjYNx9kI21AvE= github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= From 970f3eff693a99b5b454cf25c8b17176e88b0649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 23:46:48 +0100 Subject: [PATCH 09/14] fix gen. --- build/openrpc/miner.json.gz | Bin 9603 -> 9605 bytes documentation/en/api-v0-methods-miner.md | 2 +- node/config/doc_gen.go | 65 +++++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 25fcc2e04995338f50b220c44495a2778691aebf..2aa83203441ea516d0d707d23a1157c18907d9a3 100644 GIT binary patch delta 6461 zcmV-D8N%j+ONC3Yf((BqQfnC~;?{Q-@#%>T5Ozq_9T2p2jqbP5qXo;$zu`6Vkb5UA z^K#f+kcG1&*6i$B`i1f~rtcAj7NqbB(zSd*ctDR--7BB#?5ECtdLH&uK#7%PWN>C@ zm1nQfvLy8f>AJ1c&NckPQ-Vu8;W)HBT7knMl++3*LL$wf+}VFKWew1nL9OpF@qL#U z#ll&39$N8xWRAx;A$eOT<#<#t;SXR>j}$5Yk&W0u$*~HU%9>8c@&hT$9^xdYXIJYK z9{Xl>;TtO&*iX|GTmyfJrLFT!yEbBNaBxfJn0Wb=ua{9@PeOU12p-8d9epGZ3hGx0hKYtpjvBv47AO5doerGNH@tyhw zJHJ)uw|WkKD=l*-$>>^(FOef(jiuvmAQg3#=z!u4%CUj=BLjMRlv_#{u2VA#Gz(3t zM+=gkC18J=$$XaM*NU$E;qqB0iY~}v<*$*)n6ROGbqTa5DOi`hw;95ePe`|9U4|B< z6m7QD1JnVoxZl;UclGOC{rZy#!)b11;=FGC`dn4N&P5(uNAK`8UR@B%f|iA>(9G?D?UC$yp_rWKRKS0im5kw39FwI=R)ZGQtIe=MCqiVa$v#c0^TB#yq@NpvdYax^_rbY>65 zVnfC4^TOhGu2f_t+&YS&xVpW9Mo1ytdZ)LpetqBT*E^AOIXda9isdoEWc8YT{Yr8s zd|ZFgIYZbmH+VZoG&iIY0&1pDMjWqpnq$OgmX#{0yE@`zhOIDG!ZX{#U%w!{T2te+ zIQX$o?#+{dqKukbINm!%XBX$vV3iK#rhybRc3`x;<&RLuMTh=Fc@&*~K_hf6!Mb$~ z*>!jLhTR>cq9;p;Spg&+qa}t!{H%#d`*D9YF3c_+lNT%aLan5kQ$47cetES>aLG*L z>Ve2>Wb0=am8j0%6O}Mj)H8>2G0HQmr4&Li<n3iG)M>ne);<$Ip;YU^uYGzQ4 z(%SaMgj+LOB95LJ&e>mcz`>+vY%qU`{Yq~Y z=T!>y+La;odzI(qQze*4mq21Cku>f2C>!dp>{QluTsN@l+59k>pwbORI2JNa`>_h@ zWRvQ$K?Dv?>r@nF#dep3P(!yKE9wY@E_jP5&mXL+So!m%v}z?C%3srr0u3;Jk|zC~ zhU_$Ck%sJKbtkKRS$(1?bMSv97beigf7fv%$N)Wo^iI65Au*nwLnqHWdEUwMPM$v> zc|Pv<6*UJ!guQp}kT0Zb2=f?}_9c1jWzC53B=(&Q?__uKZn!x`v(V>QvWrR9&29I<3`dt^L$m zM~W)PNIqY=sF{sd1gN6!LQQSIB2dk#vBc~;P1b3$XQ;_gw-4(y+P-VFiK28^BC6|d z#UenJG@6h^Gi-Scsj~)mWr?YE>a0^|&rY31+@nb)VLZ68)=z))M&Xqtyy)fS>M7&P z@FY!^I=$WLZ4JGBoZGv=Z@pbJLof!V{alXN%bL;4N$k5Ej!q#z3x(Xt@J@#NGJLA4 zk>3%DH)y_Fcn=)Wd3auQ znXVxP&_w_dxUhfO+c5!jjXi*gMltQibHqo6C)EPmY zh8OZ?ps2+4d2(*9k#&yYs*cPFfvDt;I1~2;W&(ln<=215JSCro+DOBC394#2l)=E- zZMIBN3t+$#_W`3AYiA=Dcob69Sbw>1cfe}w6J@YU-qVn%Y8fa{smCvie#Ka3V95J-{!69$^A@n3feki}i$0OyZ6+kK zXUkdqc@jhV$y+>=b2x70KJNqI-X4#cp9^Z#~0SaHd*t_`}h7G!(xW zrj94hP;CB;i--jLZj%2*E?#0}u~!ExF|#ou)HpSk{lRcFHhRXD>)c_B*=f=RxkoEA}koGJzFjf>)$9e1u5le(%>Paadj!I7Ho~a@MnJb+tAFd+Dy;vp%NCiPT2yE<8 z;4Hz`zvIiMKpatr?a!HX0X#m306~BEG?G$*9BM%J2WYkF;EDwU9O4aUBD_4Mj?N2wh;L!fS@n-!-4OQ_POK}`2sQJ=i+hqHLwu0J+MGa z_Jb))w&(YPFj&V-&zqT!6$LheMBYR9_O0W7*sfe?Rm;_t0m_R_iSgo#5eUw?cTo00L^tW;%bzYxuDsoN~DcC6nsqgqn zgKEb2lZM)+`$sYZE#Xq^1d4=AfggyAo6ZN+`G6u09uAZ#-R#3%qm6%aSI4dv02;HW z1-NGH=n%EMRLa;(UH%`xG(tFJmael2)6&2N1(t)jhNj5fU;-)Ma606`TtiHHK%B;I zLh^|$5U?EL+XLb~H^D=PfWMOQHY64xP6Uu$v+*5@Wq{c>FyYQaAmSD#8lcTsquC^_ zZWHkA`FW^JndU_=jr)Ie6%bV%pL8D;vAY6zGyJc{yhtFZ(DGXH;pVaBcjI8{xbt#6 z9OdR&&zo@#&+}cw)%dW$H(Y`F;y7w>T!eu499+R2Pwop(j&rlCc!pq9J6f41V_+Jy zhT}jRV#_3&g=`r|AgB(2wv8;{kkCZM1iaa7=>u?F@CU^7oO6F6FFNu*5$XWy;?)Yd zVPz76uKmy%u0IwV9_1zwg?pnKJQ%`gi~&Q4&2V2F!6Md+ULEc=6I;_@(+OU%Z6R4P zbw?clqn)Hfe235yyB>YnV^5P$1n#J_ffO@7eh4?cm?k#69drIz%sI);6bfmvZfGe9 zNbckP$;8&-3|oK2-rXd$h6-_|XlgO4I!Y9IiwdJeG46e$I8m_P*-kvsjRKaHMrD$~ zX4nEQBYlRm$G6M&@<5DPaO=W{0WHf#o`)=eLulOfhPj=;K^9m%00{nxT#x6yKrnMG zWP%sM`G$#wO)e}U*z9bIO?LNS3(Z?(g>PZ3`;t4~Ab5YzMy;W@K1@m!-&-AUxgTEo zzWMw(H+3l_<@)iQG^l3cIccbE#&aYy&=Ld1@tnX@6tQ|2(di;O+B`hX&6x5ou~)T6 zPxyQ>w~8-P%&{DtCY$<3d3jb1aF+8Y#{k+k+mWt9-;viSH~l6zTBSoYx>^l}quyXR z?iyAV@xp&eZb}rViwiR6o`||U5x`MxEm`;Dq`N_23@5idoFV1GzDFeF*>k$ca3syfn-LY_)VX@y-a?iLVq{lmN~ zmERuD|HNeJXmGrEO<~U|-qxxDpNF%^@7nziQQi7!3y7n&8EQO0bHE_K(5~0 z?oNN-uB}}}42s4bCAvFBlIVnYKcVaeytxc_=ggeV<~->8#C}`{Kg7v|r^dgC-T!m` zdTo+_tkM7e{IemtwRpj}8*JVRvG`BkY{|cz>TNVfqFYMdm@uuC=XggbB`#be>T-cR z9`=kICnyB+*Y^^7^o{Fg8$ew;Ap~*$44;48v}c@|6yJ%seb|}lev94YB{t(f5MHgR z-}oussYqokWY^7=Jto5O=sET6r?jgMKom>Af$Rpy#y!xY6j1*m{{@}3}t(JCJui# z$fIzx)tmW^3(ed<`ASnKVk^ByT&MseRxAmhx^4(jRo}3WE?{? zdO8$YdX{4-omwz7eJNa~5=XzcmM`0%sKzGQ=ZvbTF;Gr>HszS+leHj|si$&MW#4uZ zquexUZ|<8?b+!@$gxXgS^JIO-M5BLUZ2+>sSCE2RX#)}BuHHHLzW^pnhwUAja+^!0 zAoU_-$7ppo^-bN=!LVQ86lj2T_l;}Dx#`PZ-D9(|VDip_h>qpsZZ^L`v?x&xx(UC2 zpy+~F=$`kBR<)KM`#XDRUqfO!8-hHq2ewBbNO34k1oIj^eoMLv@vhMfV}^fT>&H+e zR#=n4gzcd`gM!;KH=HM?l}?#>HUZ>hbJk!|EjFhtYCWx>{1*FYkNcx(6RotfjjO-Q zj^Ja3%PTHczUz~)kGdSN26Mo=#L~<0=qR`N!uznb>g9a<5K4LU6LI=N#n0h4rSk)1 zpX9jB@7y6@HWMJU{X2%O!LWZ5Rhfoh!86OcbWE6C6D^ia!cZIk?4#weW)e^7=&I!g#k=CF+*%kKLA z{Y)GthON$~SIj!*tj<78hgc%t@i*D!COAR&&}X=XCZ-QO;C*%nu?-gt^OK(OcAorw zjXdPuFl(X{u+7o%(fk~j#>(V6d%1SL_EXd=3y(2<%_lvzB_dk8-E4q`i5_$Uwr&5Q*b#! zj}&tGZ6GtC%f&si3sHPdU*iY<{%$)Bi-#Kbo^N2U#3B>1_GYDoYp*!WzWr}PQG zFv`+Zb z5-anhW^pIU^fSetyCf@25c}@DP0z__R&#ughe9Bv5?lVtT+p^aUWyVLUf;OM9>!z}*@>nwrE@ZC`XR9+>C9LhI%2wfNAM+<* z>M+?gkDE(m#OkuK@ydb}zS=^%HVRl&)sA3K*kGcn!U$>Mwk8+cHsd3!9#ddAG!#wn zr}JcSgwCn;gE+OmpJ{YahRgBzIKLF;8Zm+L3xa<(tyMG-hocvY^?!*+#N)*RGX9!| z1Z4{ICn;)sPOgtanU(rTx;@+|q{5TXOlo`2kn~6^sVh|drUG2M-1(Zh^R+wibRzq5 zJf7xPc=>fDl|?msUdKb7C}h#t^s!+%d%k3aScT1=kM|GnmrfTLb>&C`ykrhvcFh3j*c_iM@KFI#dj$$@}fC*`}l4j-|gcgCFtj*e2YYtXDV`! z`ig3qKOFZKxig0l#sF0$ZURE(h8GDabvN&6l+;#X^0gG%wm5>6I2HobVRI389Pot? z^}zMjtijd*U6YKb=Y@fy_&^ZDKb%Fi0~mjz$JODo-40uaA3lV!T_Iz`C-Y2+2z){8 ztl%A`c2|ixwH8P#z*rhrD}F}DIzh{+b#elS2b1aK=wLjUo*YbWZtGoUy##6lY_bX<9T<^#q5L4r{&wW)cHBOCTf7m}53=RgvoBqk^aCq83 z{?q6gzc?QCmwoMc)MzEHAH784Oe@_1Q#{aYGwKbzzYqWilb*4`B=#$vHxzxJXJf8r z{b;ddhmacdd#yju$%VxH)mS?21~PxLwjf0Z6mO8MrM+O13VrwYYA?JRMEO)1EQA`rsL-lX1^@$N&0hz;{<4z+f~wHa;_UeM8{Fj#%?O z+I;Yk^&S(h1cRUOp8tHe-3dRNkM)fIHctDG1nN1~d0BpJh^#bu*8f6{ zDdY5amAy1^yUddCPp7jW@BB59`pLWrTGR61#z{@{0YS1&>E$?4Y9bKQ^i#&zo~u$D X_l?KL&yW8<009600?zqLjl=-}pvBIm delta 6459 zcmV-B8N}v=OM^?Wf((CVc6NO?WMq2<=~}8bBE_e;^_|6pdSZ)&9a43V1Z`cj{4Mlo z!3W@9`5Jl1y%V;3Ic)OwrtcAj7Nk)5X`?fGq#9rOTxUXcCe-sVp#n;*BqoDnJF7f< zjg}>;Kgif^op!F_7oHVd;t9v0<Fw)GYt7Db0v3AT=rFr#gy5@Tq3ffn%ejG<b~mFtG0nEFK8aVUpEdmi`;3WqnG zx3y=M&^Wgq>IxC|>Fe4+aWHc0CpJQyB3fkW#6jlRI4OU)73`T5?yXodDJ(N9oxQJ< z5uQ!@z?pS zI=|I(@LOq_Gf76*T6~Ed`D!d3cLS*?r9=l5Z%~d6v>!>()1%yKx^SJEQJ`69QaxIb z_bdU^O!9xT9KTj{^JjR3#m8?skJxR^FOTaK-(uguN?a?@HL8L>NwUixcN{OW5bC5_T@~*gATLukq@FP%h_E7e)Xs@ZoF? zG2v!j4{XQ$(iOBUWQAsK4{VQQ*9*nuJf{f0tYm)-ry3WA3}c2}t2ImURnuejzEQOA ztY%r}$)e|oWmfbg39dD9&&&H85cy;21X66!;w(nP1}1Uz-A9&4 zZ=V;|w{xW;tK!yC1jPmJ6*NK$;nthIbtUZkUc%mqoXgQkUsW%U2_~!8?CV#OGvVWk z<{5v&hPlDpIik5Cl@L%ftuo?xwbLIXKC`S;N!`^ECo^n?u@auy7XJDL;nkWNr^UgK zeR6M}3>1~r+`{qRAv(J_mj0BB;sdHOxlmDabbUU>6pA4!53;J&7A5%z4XhgMS@Fa8dncQ zUL#vSyQoBg_MWJOp`xTYl#5ZGSuLdyf+?pK!pF1}LpZuw)ey(MQw~3}a!@maYLwQt zHzwSg(GsyVp{Gpm>C!{@HZ|g8q^MXHQt}debcQWP^wn}AvS4bQ*_#E{_<6lje7k?M zdiUg5h3CqIV?~WqM2+IFnk*>(YJ>sBUoEmOPf8lgQ6Gdtvs8b-c6r|n$B81R_L@`InI+=ul^7=pLgiUtn=-tF zCOVu!+xF8Ld3Zl^E^J5Or{uUdg2|LY+vL0JMr$)ODNbjM(EY=4Z;>lNG?smsUNud= znukSd3Jw#Mpho{#{Qp4mui*biNp=*Qv~c69k%Ljpw;OnWApj00J!6AO>{ow!t2nPx zpx3Spso$dD8jLjaoUenP$&CT zmklCta9RhWC@Z$RB!n8e^;l6yAauc7M0x&TRmIAmFQrv0=}`WfW)x_E@ssrF?=)nm zA&WF*C#ySI?aS&DMVW&yxiEi$HvYSg8$kx>38Z)8eGQ56^c*^Q-pTV$o_F&6`N;Ee zzptn{5F+fobBBB(T|=11ptLW^V=rq)j3=@0WOygTI~m@|@aH7M2Z|yNA-}E>tR0*P zVU&{J3&*jm2PMLHHY3L4Qz#Bgbb`DSuEkb2b}Hvw}un zp2wh+lI8vhQ-}Vuzwc=E9YYL6I(v5NEaDzbDhcDkjkSK7Hwu5RB;iFbFIP_)Uxp`X zvefDAPH$`I?c?0u1%B)8ni+yIDDCHR#9r2nUQS})<#2Qg`B^CBPKI|f+?U}~RgL_P zP`p9&-NJkDIO|lB;b|!H22SFca8Oplj>_OgsvSi4JQwf=W#}^U<>Ri<$nTA&xgNg!vr&#bjnw!zB(> z@r=*U$NTg=Id0HbR66@>d$w@g>;Vrt%oYcrYb)iLL~(F+t)UcZt+$EpjG)d4>NLEN zHv>f_uFsQmbB(NX3|DnzP6$LLcf^^vFEA4bj4!`F<|%*qG}J~K)=N-T%b^Sg)^4+9 zidp~zp12Pf#aKHVxxk~4qQ?5meY*ozW1lF4Rq~#OL{-Z`fl{@t%`;`Rp`u(;NYC0e9YwLQ{Um8U=ZCiD|UYi8++>+wt_R&g2Nw<#-pM5#V~a| zafV{^XIw-i;CGY!Cvx!;Ba6K{Sc#d95uwJZvFs0qqp{I5u3YC1Tg*<2u3f?fveSN^ zo-wD;rQ#6YQ+`@*&v-+uXt`S z3g3VK*fV}bXd8u3I6;&@@k95}j`UY8;|KGM zRlNg20}?GeRT;^Zg%PJZtpiu298mVHN#+e#Kd|+{Y4&pj|i^@a*&cXB?eopKsEyxAZNLyGJN;2~mp z{3H!_A8bSrN0{7!MGRLf7p)k*@G}v_(8EvU+IOQf0}9|}^f2_u|4;^+*7ldHmrH+g zq)E=u3YnQ@$+YfUh;3wvZ@`CcA!VQND)Ic`PkCXpm>_=dt;*}~*xQ?RUnX6Y zGJ&*bp@FfYpgPWDSBO|51XE8+`EXQnO7~0^3CLXORQYffIqtoc;P1f@PCz^anF>kCWc<~<4cc& zX^MeDfbDWk9Pa{Fi7*QJgR_OmM*#$Vu^$e6f3(jfpU4-8DL)sFyRU(TpzVPLTCyKZ zS+YI97lgq&W_sStbgU?_5hU^+y0>o~_rrGOLaSP?t_)CKY)Xt5UnDmLB@4EGDvA`< z)>Bf_({7E9PpVRHmo0HQ9!!4~Iql-pg*8@nJw-`C#m=H|yuQCE`3g;!QRY1rv{Pno zdHN}ZrObB}4w1W$77SD0cN7l&2JWNG(&c)PWTU^86RGq1d{dEgqDa9`F-U#KPa0G+ zzMnMIHr+px8E6TYVkb}}WD5L1T-Q#v08gX?2@` zXV1?=Wy&-!dTHFJtAKx~;`pTdsEFMaz?;ZLTrZn;s_S8Ui9j4ubJ4I2AfXsf^7@Qim5y5 z02u8g9pXELme}>^%N~20d?Ij1oeiXz@$o~r>BTg$+3lF~$70S&Zl+L3i*-XwNkDQR z?@uPS4rkaZ_U?Zsp*2*9D@9X_QPokR$XiqxC5mzH6UB*w_0D$UiEb3ItTZZ<1UADK za2e?{oISo>wwDKD%z|4NJ`8ACF7iBN0USc(t~bo>{0*|e;sHSLPvm+$?*)RHV<8i~ z5Y9JDENpUN3BhJ(Q*5%k2U}>~A}f3gW8IhB`3AvzHfnzjz4c*IqWIqGfXn^x()Z2h z$GNFXAt~37=cGY36VFLQZ8M%DnSquVD30d@o}!4=yNFH~(b4ANX>P`pcZt2KJ$l0D zi@8;Nkz$VJ;56CPH_FSiYJjtxKRE`_w%Lw!75a|6M!D%XxzQ>eqS4iAFdX#;!*SQJ zs)!d(a#MezI9*(jIrl`=<%s}}YIFJVbLcans1COTFM1NLiOovABjp?v^Y(0^OR>4L zp&NR68rm)<7O~=~0%?1#BFT3w|14R4)Xz;x@~+Xc7OS)8$<%-bo)ujs$h#V5Wi>M9_A)1c}y$pYH_!Kpz9yzU8(%` zaQ-JIOGks_#cK+CPVu%@75F@yMSj=rcZdp)vB1Vz0ti<4E^Iah77yU!{0DON=5}}b zc5Q#{B4SW9?kLgSDUw7dy!#1dFW}8(xI1U&Y&PdX-zWCtI`|<@COkF%MeP2c^Ve&W z{9}#&_vfDt(XGV`#@%4^R*1!a@@7l^qi#TVX(tyM4Q+lNrfqo0V=7b<=ZzbTy`Ap0c8 zZGPtt`LdY+q3z!>Yz>B;sLFpd1Ph*7)}>>@?3!q?Y!Zgr_-7w2k2RBUYBOgBttU7H z&J_U}4YE#zHagm_LEFcwDhHncD1~Ol@`gBsyiG0gg13oK@^%Q8-i~7X@GTGPYEV6Y zM^iK(Ae^s|0?hqrq=oJqiyCJarNl&q~J8Td!X!=CXT-Ur8o^4~K^=J7LVAYr#rrPEnLSTcug1X*_1_wQ%o zFfnX(HoaojF=urKVmibU`HsKIE;qpmx`#f)Ei^HG-~sQmJBV$#V3?oujJNaT?`z~C z_m1Im!WWbO_k+hTlnsB(Mu4^AvLTW>qZ4>*oj=XyqBAgB0WJFR^hS0^VAjw{Q&mNd z;PltXqi445n03@r2&DMvT_NHb7B>%DaV%f7jqu&6quBT>NY^)fBmLszXPJV_33{ZE z%Wngj0bMTckzI)5bNU)T@He@*?YX;J*^VX2pf%)2RnX{USRHruU$1`5`EKB}GRU{_9!8tN*=pw<-byQ0NP{GFM;yk5K_=QoP z_8I%>w?&(x1R4$PjnY#~hv-^Y8hpw0q?6V@sB$s0o?6g8%3cvXEN6i~S@mdx8s|IG z|JhjAx66tHS9YX-O1=DmC>Dm zyc~}Qsv1xs4&Gv7)fbQvl;Y^JFC%j-*-_Sy5*W8J_l6(xEaN|k(A09Eq~B1*jgnZI zCpC*ZNv5AE_S_{|VS?Cq=WTjUMzfmZb39Cvq2HUI>B@h_0${NX#DZ6E zrWQS)*GvfjAt!P^|w;JR%-17Lf7RG$bff zm_JET+jDY#6w0jBN7C)#Mj;iRgl1CPdxoS(T1j1@>NgeO+U3sI%$={@iKi3Um*ep? zzrxF}E2%81+4DLc>O>)n#-@)A%h~fKE5s^n_I$j5c)xVIz^E%f+9wgZC!6(?{EDC( z_o098e%%Mr_Llv{!f|w***-dQ0Vuvpd65^*vD?RY`}l4jA1OgUC*@ltsytJXd(>A{ z%lzTEx5%A2gfIrEB5@NCDmT1HK&iWVPot!^3X`v;$hO51q{OihpbndhxZ{8?e5ePm zuVxLl2I!h(JUuTA6vYRE82;fbsvW=(J+6Nam+f}gI{feK zD7Cvv%&E0NS^>t=xLWZuI@SqVPOXy@I6Rn4Cr1b4!Sv){x?IA8lO8D()+CvWdeLu&=IG*&Ug9Yp#Oa=?9oasB8e5@%ZJ4w&MBW}8uO;QfUFIGFT|4JNT)>Aa!n`#c+SHS0%< zB|C)FpxX19 zHS(ODYsTb4?8;Z;J=$EYA3SVA+c+H_kEh4SC;iFEc-&*R=3n5}I2{b9)A4^ve=wa+ z`}}g`E*lQU$K&Z}GM@H~+0_TnIGv1p#ykGkM+3gQ`Tz!_(XsKFvFjTG7k0#&@6qOi zhphLQa3vW0g!laCyX{W+*?g>L{I_x1e=JabWin!Ym|91v1jx(sV?$)6$+P|!YD^iY zzpLz}iQ8qCgnv4n1$pO>)K4?!P0*T_|29r)nhyw)ZAvf4iBc1Rkfxt9#`av5+PH5# VK7M}u{{a91|NpluC5n*50Ra6smiPbw diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index da94dc682..5ebfe9a8e 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -709,7 +709,7 @@ Response: "ID": 3 }, "SectorNumber": 9, - "CARv2FilePath": "string value" + "InboundCAR": "string value" } ``` diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 5d4a91d5f..006ff601b 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -125,6 +125,65 @@ and storage providers`, Comment: ``, }, }, + "DAGStoreConfig": []DocField{ + { + Name: "TransientsDir", + Type: "string", + + Comment: `Path to the transients directory. The transients directory caches +unsealed deals that have been fetched from the storage subsystem for +serving retrievals. When empty or omitted, the default value applies. +Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment) +or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment)`, + }, + { + Name: "IndexDir", + Type: "string", + + Comment: `Path to indices directory. When empty or omitted, the default value applies. +Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment) +or $LOTUS_MINER_PATH/dagStore/index (monolith deployment)`, + }, + { + Name: "DatastoreDir", + Type: "string", + + Comment: `Path to datastore directory. The datastore is a KV store tracking the +state of shards known to the DAG store. +Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment) +or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment)`, + }, + { + Name: "MaxConcurrentIndex", + Type: "int", + + Comment: `The maximum amount of indexing jobs that can run simultaneously. +Default value: 5.`, + }, + { + Name: "MaxConcurrentReadyFetches", + Type: "int", + + Comment: `The maximum amount of unsealed deals that can be fetched simultaneously +from the storage subsystem. +Default value: 2.`, + }, + { + Name: "MaxConcurrencyStorageCalls", + Type: "int", + + Comment: `The maximum number of simultaneous inflight API calls to the storage +subsystem. +Default value: 100.`, + }, + { + Name: "GCIntervalMillis", + Type: "int", + + Comment: `The number of milliseconds between calls to periodic dagstore GC. +Default value: 60000 (60 seconds = 1 minute).`, + }, + }, "DealmakingConfig": []DocField{ { Name: "ConsiderOnlineStorageDeals", @@ -753,6 +812,12 @@ Default is 20 (about once a week).`, Name: "Addresses", Type: "MinerAddressConfig", + Comment: ``, + }, + { + Name: "DAGStore", + Type: "DAGStoreConfig", + Comment: ``, }, }, From 35895e897a6e7c9744d659203b4028d31a04743a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 3 Aug 2021 23:52:11 +0100 Subject: [PATCH 10/14] fix test. --- markets/dagstore/miner_api_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/markets/dagstore/miner_api_test.go b/markets/dagstore/miner_api_test.go index 38a865ebd..4a61c62a8 100644 --- a/markets/dagstore/miner_api_test.go +++ b/markets/dagstore/miner_api_test.go @@ -74,7 +74,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { rpn := &mockRPN{ sectors: mockData, } - api := NewMinerAPI(ps, rpn) + api := NewMinerAPI(ps, rpn, 100) require.NoError(t, api.Start(ctx)) // Add deals to piece store @@ -114,7 +114,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { ps := getPieceStore(t) rpn := &mockRPN{} - api := NewMinerAPI(ps, rpn) + api := NewMinerAPI(ps, rpn, 100) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 @@ -131,8 +131,6 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { } func TestThrottle(t *testing.T) { - MaxConcurrentStorageCalls = 3 - ctx := context.Background() cid1, err := cid.Parse("bafkqaaa") require.NoError(t, err) @@ -143,7 +141,7 @@ func TestThrottle(t *testing.T) { unsealedSectorID: "foo", }, } - api := NewMinerAPI(ps, rpn) + api := NewMinerAPI(ps, rpn, 3) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 @@ -170,7 +168,7 @@ func TestThrottle(t *testing.T) { } time.Sleep(500 * time.Millisecond) - require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled + require.EqualValues(t, 3, atomic.LoadInt32(&rpn.calls)) // throttled // allow to proceed. rpn.lk.Unlock() From 583a3c62774fb2102514673527800d3acbf773e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 4 Aug 2021 12:20:51 +0100 Subject: [PATCH 11/14] switch DAGStoreConfig#GCInterval to Duration type. --- markets/dagstore/wrapper.go | 2 +- markets/dagstore/wrapper_test.go | 16 ++++++++-------- node/config/def.go | 2 +- node/config/types.go | 7 ++++--- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 620c97abe..9a31d68c3 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -90,7 +90,7 @@ func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGSto mountApi: mountApi, failureCh: failureCh, traceCh: traceCh, - gcInterval: time.Duration(cfg.GCIntervalMillis) * time.Millisecond, + gcInterval: time.Duration(cfg.GCInterval), } return dagst, w, nil diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index c6e4e2f38..a48497d85 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -29,10 +29,10 @@ func TestWrapperAcquireRecovery(t *testing.T) { // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ - TransientsDir: t.TempDir(), - IndexDir: t.TempDir(), - DatastoreDir: t.TempDir(), - GCIntervalMillis: 1, + TransientsDir: t.TempDir(), + IndexDir: t.TempDir(), + DatastoreDir: t.TempDir(), + GCInterval: config.Duration(1 * time.Millisecond), }, mockLotusMount{}) require.NoError(t, err) @@ -82,10 +82,10 @@ func TestWrapperBackground(t *testing.T) { // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ - TransientsDir: t.TempDir(), - IndexDir: t.TempDir(), - DatastoreDir: t.TempDir(), - GCIntervalMillis: 1, + TransientsDir: t.TempDir(), + IndexDir: t.TempDir(), + DatastoreDir: t.TempDir(), + GCInterval: config.Duration(1 * time.Millisecond), }, mockLotusMount{}) require.NoError(t, err) diff --git a/node/config/def.go b/node/config/def.go index 9b76a2226..7637142cd 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -195,7 +195,7 @@ func DefaultStorageMiner() *StorageMiner { MaxConcurrentIndex: 5, MaxConcurrentReadyFetches: 2, MaxConcurrencyStorageCalls: 100, - GCIntervalMillis: 60000, + GCInterval: Duration(1 * time.Minute), }, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" diff --git a/node/config/types.go b/node/config/types.go index b3f038df4..9f77ee7a4 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -86,9 +86,10 @@ type DAGStoreConfig struct { // Default value: 100. MaxConcurrencyStorageCalls int - // The number of milliseconds between calls to periodic dagstore GC. - // Default value: 60000 (60 seconds = 1 minute). - GCIntervalMillis int + // The time between calls to periodic dagstore GC, in time.Duration string + // representation, e.g. 1m, 5m, 1h. + // Default value: 1 minute. + GCInterval Duration } type MinerSubsystemConfig struct { From 6a244ee8f0fc2c5846ecb70c2c3d93f706237ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 4 Aug 2021 12:22:23 +0100 Subject: [PATCH 12/14] clarify comment. --- node/config/def.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/node/config/def.go b/node/config/def.go index 7637142cd..bef8fa8cf 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -189,9 +189,10 @@ func DefaultStorageMiner() *StorageMiner { DealPublishControl: []string{}, }, + // The default DAGStoreConfig doesn't define any paths for transients, + // indices and the datastore. Empty values will lead to these being + // placed under /dagStore. DAGStore: DAGStoreConfig{ - // zero value paths for transients, indices, datastore. - // by default they'll end up under the node repo. MaxConcurrentIndex: 5, MaxConcurrentReadyFetches: 2, MaxConcurrencyStorageCalls: 100, From 07ffbf85e4bcf7d0585bd1fdfdd3ac1b4da70011 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 4 Aug 2021 12:48:53 +0100 Subject: [PATCH 13/14] fix lint; run gen. --- markets/dagstore/wrapper_test.go | 4 ++-- node/config/doc_gen.go | 9 +++++---- node/impl/client/import.go | 5 +++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index a48497d85..8748e4dbf 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -36,7 +36,7 @@ func TestWrapperAcquireRecovery(t *testing.T) { }, mockLotusMount{}) require.NoError(t, err) - defer dagst.Close() + defer dagst.Close() //nolint:errcheck // Return an error from acquire shard the first time acquireShardErr := make(chan error, 1) @@ -89,7 +89,7 @@ func TestWrapperBackground(t *testing.T) { }, mockLotusMount{}) require.NoError(t, err) - defer dagst.Close() + defer dagst.Close() //nolint:errcheck // Create a mock DAG store in place of the real DAG store mock := &mockDagStore{ diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 006ff601b..421516625 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -177,11 +177,12 @@ subsystem. Default value: 100.`, }, { - Name: "GCIntervalMillis", - Type: "int", + Name: "GCInterval", + Type: "Duration", - Comment: `The number of milliseconds between calls to periodic dagstore GC. -Default value: 60000 (60 seconds = 1 minute).`, + Comment: `The time between calls to periodic dagstore GC, in time.Duration string +representation, e.g. 1m, 5m, 1h. +Default value: 1 minute.`, }, }, "DealmakingConfig": []DocField{ diff --git a/node/impl/client/import.go b/node/impl/client/import.go index 07db53bb1..d44d73ac5 100644 --- a/node/impl/client/import.go +++ b/node/impl/client/import.go @@ -6,7 +6,6 @@ import ( "os" "github.com/filecoin-project/go-fil-markets/stores" - "github.com/filecoin-project/lotus/build" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" @@ -20,6 +19,8 @@ import ( "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" ) // doImport takes a standard file (src), forms a UnixFS DAG, and writes a @@ -38,7 +39,7 @@ func (a *API) doImport(ctx context.Context, src string, dst string) (cid.Cid, er } _ = f.Close() // close; we only want the path. tmp := f.Name() - defer os.Remove(tmp) + defer os.Remove(tmp) //nolint:errcheck // Step 1. Compute the UnixFS DAG and write it to a CARv2 file to get // the root CID of the DAG. From 8f5e6a864f8a69b4d4c16bd97267e19b20eb5ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 4 Aug 2021 13:22:24 +0100 Subject: [PATCH 14/14] upgrade go-fil-markets. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a18c34d33..75a67af52 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/filecoin-project/go-data-transfer v1.7.2 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d + github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1 diff --git a/go.sum b/go.sum index 7993576d9..17eafbd23 100644 --- a/go.sum +++ b/go.sum @@ -290,8 +290,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d h1:Izq16s1N4E2q+sSteCTT7fZsXC0ShxYjYNx9kI21AvE= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210803210132-4d3607a44c8d/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 h1:zubk4E8s5KLw5Y2Or39A3Ob8c7DAT6FL/mJBs1dMkrQ= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=