diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 25fcc2e04..2aa832034 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index da94dc682..5ebfe9a8e 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -709,7 +709,7 @@ Response: "ID": 3 }, "SectorNumber": 9, - "CARv2FilePath": "string value" + "InboundCAR": "string value" } ``` diff --git a/go.mod b/go.mod index fc85120be..75a67af52 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 - github.com/filecoin-project/dagstore v0.4.0 + github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 @@ -36,7 +36,7 @@ require ( github.com/filecoin-project/go-data-transfer v1.7.2 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1 + github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1 diff --git a/go.sum b/go.sum index d38c6a2c7..17eafbd23 100644 --- a/go.sum +++ b/go.sum @@ -257,8 +257,8 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= -github.com/filecoin-project/dagstore v0.4.0 h1:ZeoL5Gbgn4BSzKgghobfAO9a5nVqfVC4fQ/lNYn1GQo= -github.com/filecoin-project/dagstore v0.4.0/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= +github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18 h1:wn+3OyRwQzS+iYEkc3sO33XU3g5fEzP81Guh73ZHthw= +github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -290,8 +290,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1 h1:xXgCsfEaA3wdof/N5mkD7PphDolhc9MAsIrO/QBH5Pg= -github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 h1:zubk4E8s5KLw5Y2Or39A3Ob8c7DAT6FL/mJBs1dMkrQ= +github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/dagstore/blockstore.go b/markets/dagstore/blockstore.go new file mode 100644 index 000000000..8980d40cf --- /dev/null +++ b/markets/dagstore/blockstore.go @@ -0,0 +1,33 @@ +package dagstore + +import ( + "io" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + bstore "github.com/ipfs/go-ipfs-blockstore" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore" +) + +// Blockstore promotes a dagstore.ReadBlockstore to a full closeable Blockstore, +// stubbing out the write methods with erroring implementations. +type Blockstore struct { + dagstore.ReadBlockstore + io.Closer +} + +var _ bstore.Blockstore = (*Blockstore)(nil) + +func (b *Blockstore) DeleteBlock(c cid.Cid) error { + return xerrors.Errorf("DeleteBlock called but not implemented") +} + +func (b *Blockstore) Put(block blocks.Block) error { + return xerrors.Errorf("Put called but not implemented") +} + +func (b *Blockstore) PutMany(blocks []blocks.Block) error { + return xerrors.Errorf("PutMany called but not implemented") +} diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/miner_api.go similarity index 82% rename from markets/dagstore/lotusaccessor.go rename to markets/dagstore/miner_api.go index 268720fae..0da01c396 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/miner_api.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "os" - "strconv" "github.com/filecoin-project/dagstore/throttle" "github.com/ipfs/go-cid" @@ -16,50 +14,36 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) -// MaxConcurrentStorageCalls caps the amount of concurrent calls to the -// storage, so that we don't spam it during heavy processes like bulk migration. -var MaxConcurrentStorageCalls = func() int { - // TODO replace env with config.toml attribute. - v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY") - if ok { - concurrency, err := strconv.Atoi(v) - if err == nil { - return concurrency - } - } - return 100 -}() - -type LotusAccessor interface { +type MinerAPI interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) Start(ctx context.Context) error } -type lotusAccessor struct { +type minerAPI struct { pieceStore piecestore.PieceStore rm retrievalmarket.RetrievalProviderNode throttle throttle.Throttler readyMgr *shared.ReadyManager } -var _ LotusAccessor = (*lotusAccessor)(nil) +var _ MinerAPI = (*minerAPI)(nil) -func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) LotusAccessor { - return &lotusAccessor{ +func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode, concurrency int) MinerAPI { + return &minerAPI{ pieceStore: store, rm: rm, - throttle: throttle.Fixed(MaxConcurrentStorageCalls), + throttle: throttle.Fixed(concurrency), readyMgr: shared.NewReadyManager(), } } -func (m *lotusAccessor) Start(_ context.Context) error { +func (m *minerAPI) Start(_ context.Context) error { return m.readyMgr.FireReady(nil) } -func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { +func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { err := m.readyMgr.AwaitReady() if err != nil { return false, xerrors.Errorf("failed while waiting for accessor to start: %w", err) @@ -107,7 +91,7 @@ func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, return false, nil } -func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { +func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { err := m.readyMgr.AwaitReady() if err != nil { return nil, err @@ -179,7 +163,7 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid return nil, lastErr } -func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { +func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { err := m.readyMgr.AwaitReady() if err != nil { return 0, err diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/miner_api_test.go similarity index 96% rename from markets/dagstore/lotusaccessor_test.go rename to markets/dagstore/miner_api_test.go index 6d8513f56..4a61c62a8 100644 --- a/markets/dagstore/lotusaccessor_test.go +++ b/markets/dagstore/miner_api_test.go @@ -74,7 +74,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { rpn := &mockRPN{ sectors: mockData, } - api := NewLotusAccessor(ps, rpn) + api := NewMinerAPI(ps, rpn, 100) require.NoError(t, api.Start(ctx)) // Add deals to piece store @@ -114,7 +114,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { ps := getPieceStore(t) rpn := &mockRPN{} - api := NewLotusAccessor(ps, rpn) + api := NewMinerAPI(ps, rpn, 100) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 @@ -131,8 +131,6 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { } func TestThrottle(t *testing.T) { - MaxConcurrentStorageCalls = 3 - ctx := context.Background() cid1, err := cid.Parse("bafkqaaa") require.NoError(t, err) @@ -143,7 +141,7 @@ func TestThrottle(t *testing.T) { unsealedSectorID: "foo", }, } - api := NewLotusAccessor(ps, rpn) + api := NewMinerAPI(ps, rpn, 3) require.NoError(t, api.Start(ctx)) // Add a deal with data Length 10 @@ -170,7 +168,7 @@ func TestThrottle(t *testing.T) { } time.Sleep(500 * time.Millisecond) - require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled + require.EqualValues(t, 3, atomic.LoadInt32(&rpn.calls)) // throttled // allow to proceed. rpn.lk.Unlock() diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index f53c31c7c..c97dcbf86 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -15,28 +15,29 @@ const lotusScheme = "lotus" var _ mount.Mount = (*LotusMount)(nil) -// LotusMount is the Lotus implementation of a Sharded DAG Store Mount. -// A Filecoin Piece is treated as a Shard by this implementation. -type LotusMount struct { - Api LotusAccessor - PieceCid cid.Cid -} - -// NewLotusMountTemplate is called when registering a mount with -// the DAG store registry. +// mountTemplate returns a templated LotusMount containing the supplied API. +// +// It is called when registering a mount type with the mount registry +// of the DAG store. It is used to reinstantiate mounts after a restart. // -// The DAG store registry receives an instance of the mount (a "template"). // When the registry needs to deserialize a mount it clones the template then // calls Deserialize on the cloned instance, which will have a reference to the // lotus mount API supplied here. -func NewLotusMountTemplate(api LotusAccessor) *LotusMount { - return &LotusMount{Api: api} +func mountTemplate(api MinerAPI) *LotusMount { + return &LotusMount{API: api} } -func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) { +// LotusMount is a DAGStore mount implementation that fetches deal data +// from a PieceCID. +type LotusMount struct { + API MinerAPI + PieceCid cid.Cid +} + +func NewLotusMount(pieceCid cid.Cid, api MinerAPI) (*LotusMount, error) { return &LotusMount{ PieceCid: pieceCid, - Api: api, + API: api, }, nil } @@ -51,13 +52,12 @@ func (l *LotusMount) Deserialize(u *url.URL) error { if err != nil { return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err) } - l.PieceCid = pieceCid return nil } func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { - r, err := l.Api.FetchUnsealedPiece(ctx, l.PieceCid) + r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid) if err != nil { return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err) } @@ -78,11 +78,11 @@ func (l *LotusMount) Close() error { } func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) { - size, err := l.Api.GetUnpaddedCARSize(ctx, l.PieceCid) + size, err := l.API.GetUnpaddedCARSize(ctx, l.PieceCid) if err != nil { return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err) } - isUnsealed, err := l.Api.IsUnsealed(ctx, l.PieceCid) + isUnsealed, err := l.API.IsUnsealed(ctx, l.PieceCid) if err != nil { return mount.Stat{}, xerrors.Errorf("failed to verify if we have the unsealed piece %s: %w", l.PieceCid, err) } diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 817b91760..09b255d6a 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -55,7 +55,7 @@ func TestLotusMount(t *testing.T) { // serialize url then deserialize from mount template -> should get back // the same mount url := mnt.Serialize() - mnt2 := NewLotusMountTemplate(mockLotusMountAPI) + mnt2 := mountTemplate(mockLotusMountAPI) err = mnt2.Deserialize(url) require.NoError(t, err) @@ -69,7 +69,7 @@ func TestLotusMount(t *testing.T) { } func TestLotusMountDeserialize(t *testing.T) { - api := &lotusAccessor{} + api := &minerAPI{} bgen := blocksutil.NewBlockGenerator() cid := bgen.Next().Cid() @@ -79,12 +79,12 @@ func TestLotusMountDeserialize(t *testing.T) { u, err := url.Parse(us) require.NoError(t, err) - mnt := NewLotusMountTemplate(api) + mnt := mountTemplate(api) err = mnt.Deserialize(u) require.NoError(t, err) require.Equal(t, cid, mnt.PieceCid) - require.Equal(t, api, mnt.Api) + require.Equal(t, api, mnt.API) // fails if cid is not valid us = lotusScheme + "://" + "rand" @@ -111,7 +111,7 @@ func TestLotusMountRegistration(t *testing.T) { mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) registry := mount.NewRegistry() - err = registry.Register(lotusScheme, NewLotusMountTemplate(mockLotusMountAPI)) + err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI)) require.NoError(t, err) mnt, err := registry.Instantiate(u) diff --git a/markets/dagstore/readonlyblockstore.go b/markets/dagstore/readonlyblockstore.go deleted file mode 100644 index b8f19313a..000000000 --- a/markets/dagstore/readonlyblockstore.go +++ /dev/null @@ -1,33 +0,0 @@ -package dagstore - -import ( - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - bstore "github.com/ipfs/go-ipfs-blockstore" - "golang.org/x/xerrors" - - "github.com/filecoin-project/dagstore" -) - -// ReadOnlyBlockstore stubs out Blockstore mutators with methods that error out -type ReadOnlyBlockstore struct { - dagstore.ReadBlockstore -} - -func NewReadOnlyBlockstore(rbs dagstore.ReadBlockstore) bstore.Blockstore { - return ReadOnlyBlockstore{ReadBlockstore: rbs} -} - -func (r ReadOnlyBlockstore) DeleteBlock(c cid.Cid) error { - return xerrors.Errorf("DeleteBlock called but not implemented") -} - -func (r ReadOnlyBlockstore) Put(block blocks.Block) error { - return xerrors.Errorf("Put called but not implemented") -} - -func (r ReadOnlyBlockstore) PutMany(blocks []blocks.Block) error { - return xerrors.Errorf("PutMany called but not implemented") -} - -var _ bstore.Blockstore = (*ReadOnlyBlockstore)(nil) diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 4c285be35..9a31d68c3 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -3,89 +3,73 @@ package dagstore import ( "context" "errors" - "io" + "os" "sync" "time" - "github.com/filecoin-project/dagstore/index" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - bstore "github.com/ipfs/go-ipfs-blockstore" + levelds "github.com/ipfs/go-ds-leveldb" + measure "github.com/ipfs/go-ds-measure" logging "github.com/ipfs/go-log/v2" + ldbopts "github.com/syndtr/goleveldb/leveldb/opt" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" - "github.com/filecoin-project/go-fil-markets/carstore" - "github.com/filecoin-project/go-fil-markets/shared" + + "github.com/filecoin-project/go-fil-markets/stores" ) const maxRecoverAttempts = 1 var log = logging.Logger("dagstore-wrapper") -// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. -type MarketDAGStoreConfig struct { - TransientsDir string - IndexDir string - Datastore ds.Datastore - MaxConcurrentIndex int - MaxConcurrentReadyFetches int - GCInterval time.Duration -} - -// DAGStore provides an interface for the DAG store that can be mocked out -// by tests -type DAGStore interface { - RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error - AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error - RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error - GC(ctx context.Context) (*dagstore.GCResult, error) - Close() error - Start(ctx context.Context) error -} - -type closableBlockstore struct { - bstore.Blockstore - io.Closer -} - type Wrapper struct { ctx context.Context cancel context.CancelFunc backgroundWg sync.WaitGroup - dagStore DAGStore - mountApi LotusAccessor + dagst dagstore.Interface + mountApi MinerAPI failureCh chan dagstore.ShardResult traceCh chan dagstore.Trace gcInterval time.Duration } -var _ shared.DagStoreWrapper = (*Wrapper)(nil) +var _ stores.DAGStoreWrapper = (*Wrapper)(nil) -func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrapper, error) { +func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGStore, *Wrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() - if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil { - return nil, xerrors.Errorf("failed to create registry: %w", err) + if err := registry.Register(lotusScheme, mountTemplate(mountApi)); err != nil { + return nil, nil, xerrors.Errorf("failed to create registry: %w", err) } // The dagstore will write Shard failures to the `failureCh` here. failureCh := make(chan dagstore.ShardResult, 1) + // The dagstore will write Trace events to the `traceCh` here. traceCh := make(chan dagstore.Trace, 32) + dstore, err := newDatastore(cfg.DatastoreDir) + if err != nil { + return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", cfg.DatastoreDir, err) + } + irepo, err := index.NewFSRepo(cfg.IndexDir) if err != nil { - return nil, xerrors.Errorf("failed to initialise dagstore index repo") + return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo") } dcfg := dagstore.Config{ TransientsDir: cfg.TransientsDir, IndexRepo: irepo, - Datastore: cfg.Datastore, + Datastore: dstore, MountRegistry: registry, FailureCh: failureCh, TraceCh: traceCh, @@ -95,82 +79,109 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap MaxConcurrentReadyFetches: cfg.MaxConcurrentReadyFetches, RecoverOnStart: dagstore.RecoverOnAcquire, } - dagStore, err := dagstore.NewDAGStore(dcfg) + + dagst, err := dagstore.NewDAGStore(dcfg) if err != nil { - return nil, xerrors.Errorf("failed to create DAG store: %w", err) + return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) } - return &Wrapper{ - dagStore: dagStore, + w := &Wrapper{ + dagst: dagst, mountApi: mountApi, failureCh: failureCh, traceCh: traceCh, - gcInterval: cfg.GCInterval, - }, nil -} - -func (ds *Wrapper) Start(ctx context.Context) error { - ds.ctx, ds.cancel = context.WithCancel(ctx) - - // Run a go-routine to do DagStore GC. - ds.backgroundWg.Add(1) - go ds.dagStoreGCLoop() - - // run a go-routine to read the trace for debugging. - ds.backgroundWg.Add(1) - go ds.traceLoop() - - // Run a go-routine for shard recovery - if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok { - ds.backgroundWg.Add(1) - go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done) + gcInterval: time.Duration(cfg.GCInterval), } - return ds.dagStore.Start(ctx) + return dagst, w, nil } -func (ds *Wrapper) traceLoop() { - defer ds.backgroundWg.Done() +// newDatastore creates a datastore under the given base directory +// for dagstore metadata. +func newDatastore(dir string) (ds.Batching, error) { + // Create the datastore directory if it doesn't exist yet. + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dir, err) + } - for ds.ctx.Err() == nil { + // Create a new LevelDB datastore + dstore, err := levelds.NewDatastore(dir, &levelds.Options{ + Compression: ldbopts.NoCompression, + NoSync: false, + Strict: ldbopts.StrictAll, + ReadOnly: false, + }) + if err != nil { + return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) + } + // Keep statistics about the datastore + mds := measure.New("measure.", dstore) + return mds, nil +} + +func (w *Wrapper) Start(ctx context.Context) error { + w.ctx, w.cancel = context.WithCancel(ctx) + + // Run a go-routine to do DagStore GC. + w.backgroundWg.Add(1) + go w.gcLoop() + + // run a go-routine to read the trace for debugging. + w.backgroundWg.Add(1) + go w.traceLoop() + + // Run a go-routine for shard recovery + if dss, ok := w.dagst.(*dagstore.DAGStore); ok { + w.backgroundWg.Add(1) + go dagstore.RecoverImmediately(w.ctx, dss, w.failureCh, maxRecoverAttempts, w.backgroundWg.Done) + } + + return w.dagst.Start(ctx) +} + +func (w *Wrapper) traceLoop() { + defer w.backgroundWg.Done() + + for w.ctx.Err() == nil { select { // Log trace events from the DAG store - case tr := <-ds.traceCh: + case tr := <-w.traceCh: log.Debugw("trace", "shard-key", tr.Key.String(), "op-type", tr.Op.String(), "after", tr.After.String()) - case <-ds.ctx.Done(): + case <-w.ctx.Done(): return } } } -func (ds *Wrapper) dagStoreGCLoop() { - defer ds.backgroundWg.Done() +func (w *Wrapper) gcLoop() { + defer w.backgroundWg.Done() - gcTicker := time.NewTicker(ds.gcInterval) - defer gcTicker.Stop() + ticker := time.NewTicker(w.gcInterval) + defer ticker.Stop() - for ds.ctx.Err() == nil { + for w.ctx.Err() == nil { select { // GC the DAG store on every tick - case <-gcTicker.C: - _, _ = ds.dagStore.GC(ds.ctx) + case <-ticker.C: + _, _ = w.dagst.GC(w.ctx) // Exit when the DAG store wrapper is shutdown - case <-ds.ctx.Done(): + case <-w.ctx.Done(): return } } } -func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { +func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) { log.Debugf("acquiring shard for piece CID %s", pieceCid) + key := shard.KeyFromCID(pieceCid) resch := make(chan dagstore.ShardResult, 1) - err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) + err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) log.Debugf("sent message to acquire shard for piece CID %s", pieceCid) if err != nil { @@ -185,13 +196,13 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl // we already have a transient file. However, we don't have it here // and therefore we pass an empty file path. carPath := "" - if err := shared.RegisterShardSync(ctx, ds, pieceCid, carPath, false); err != nil { + if err := stores.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil { return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) } log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) resch = make(chan dagstore.ShardResult, 1) - if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { + if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err) } } @@ -216,13 +227,13 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl } log.Debugf("successfully loaded blockstore for piece CID %s", pieceCid) - return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil + return &Blockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil } -func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { +func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { // Create a lotus mount with the piece CID key := shard.KeyFromCID(pieceCid) - mt, err := NewLotusMount(pieceCid, ds.mountApi) + mt, err := NewLotusMount(pieceCid, w.mountApi) if err != nil { return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err) } @@ -232,7 +243,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath ExistingTransient: carPath, LazyInitialization: !eagerInit, } - err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts) + err = w.dagst.RegisterShard(ctx, key, mt, resch, opts) if err != nil { return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) } @@ -241,20 +252,20 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath return nil } -func (ds *Wrapper) Close() error { +func (w *Wrapper) Close() error { // Cancel the context - ds.cancel() + w.cancel() // Close the DAG store log.Info("will close the dagstore") - if err := ds.dagStore.Close(); err != nil { + if err := w.dagst.Close(); err != nil { return xerrors.Errorf("failed to close DAG store: %w", err) } log.Info("dagstore closed") // Wait for the background go routine to exit log.Info("waiting for dagstore background wrapper routines to exist") - ds.backgroundWg.Wait() + w.backgroundWg.Wait() log.Info("exited dagstore background warpper routines") return nil diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 967fdcd07..8748e4dbf 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -10,6 +10,8 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" @@ -26,13 +28,16 @@ func TestWrapperAcquireRecovery(t *testing.T) { require.NoError(t, err) // Create a DAG store wrapper - w, err := NewDagStoreWrapper(MarketDAGStoreConfig{ + dagst, w, err := NewDAGStore(config.DAGStoreConfig{ TransientsDir: t.TempDir(), IndexDir: t.TempDir(), - GCInterval: time.Millisecond, + DatastoreDir: t.TempDir(), + GCInterval: config.Duration(1 * time.Millisecond), }, mockLotusMount{}) require.NoError(t, err) + defer dagst.Close() //nolint:errcheck + // Return an error from acquire shard the first time acquireShardErr := make(chan error, 1) acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown) @@ -45,7 +50,7 @@ func TestWrapperAcquireRecovery(t *testing.T) { }, register: make(chan shard.Key, 1), } - w.dagStore = mock + w.dagst = mock mybs, err := w.LoadShard(ctx, pieceCid) require.NoError(t, err) @@ -76,20 +81,23 @@ func TestWrapperBackground(t *testing.T) { ctx := context.Background() // Create a DAG store wrapper - w, err := NewDagStoreWrapper(MarketDAGStoreConfig{ + dagst, w, err := NewDAGStore(config.DAGStoreConfig{ TransientsDir: t.TempDir(), IndexDir: t.TempDir(), - GCInterval: time.Millisecond, + DatastoreDir: t.TempDir(), + GCInterval: config.Duration(1 * time.Millisecond), }, mockLotusMount{}) require.NoError(t, err) + defer dagst.Close() //nolint:errcheck + // Create a mock DAG store in place of the real DAG store mock := &mockDagStore{ gc: make(chan struct{}, 1), recover: make(chan shard.Key, 1), close: make(chan struct{}, 1), } - w.dagStore = mock + w.dagst = mock // Start up the wrapper err = w.Start(ctx) @@ -128,6 +136,18 @@ type mockDagStore struct { close chan struct{} } +func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error { + panic("implement me") +} + +func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) { + panic("implement me") +} + +func (m *mockDagStore) AllShardsInfo() dagstore.AllShardsInfo { + panic("implement me") +} + func (m *mockDagStore) Start(_ context.Context) error { return nil } diff --git a/node/builder.go b/node/builder.go index e0efd8462..397df7d49 100644 --- a/node/builder.go +++ b/node/builder.go @@ -66,6 +66,7 @@ var ( AutoNATSvcKey = special{10} // Libp2p option BandwidthReporterKey = special{11} // Libp2p option ConnGaterKey = special{12} // libp2p option + DAGStoreKey = special{13} // constructor returns multiple values ) type invoke int diff --git a/node/builder_miner.go b/node/builder_miner.go index a9b38200b..0bfcfdd25 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -148,8 +148,8 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), // DAG Store - Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor), - Override(new(*dagstore.Wrapper), modules.DagStoreWrapper), + Override(new(dagstore.MinerAPI), modules.NewMinerAPI), + Override(DAGStoreKey, modules.DAGStore), // Markets (retrieval) Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), diff --git a/node/config/def.go b/node/config/def.go index c5c455c68..bef8fa8cf 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -188,6 +188,16 @@ func DefaultStorageMiner() *StorageMiner { TerminateControl: []string{}, DealPublishControl: []string{}, }, + + // The default DAGStoreConfig doesn't define any paths for transients, + // indices and the datastore. Empty values will lead to these being + // placed under /dagStore. + DAGStore: DAGStoreConfig{ + MaxConcurrentIndex: 5, + MaxConcurrentReadyFetches: 2, + MaxConcurrencyStorageCalls: 100, + GCInterval: Duration(1 * time.Minute), + }, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345" diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 5d4a91d5f..421516625 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -125,6 +125,66 @@ and storage providers`, Comment: ``, }, }, + "DAGStoreConfig": []DocField{ + { + Name: "TransientsDir", + Type: "string", + + Comment: `Path to the transients directory. The transients directory caches +unsealed deals that have been fetched from the storage subsystem for +serving retrievals. When empty or omitted, the default value applies. +Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment) +or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment)`, + }, + { + Name: "IndexDir", + Type: "string", + + Comment: `Path to indices directory. When empty or omitted, the default value applies. +Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment) +or $LOTUS_MINER_PATH/dagStore/index (monolith deployment)`, + }, + { + Name: "DatastoreDir", + Type: "string", + + Comment: `Path to datastore directory. The datastore is a KV store tracking the +state of shards known to the DAG store. +Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment) +or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment)`, + }, + { + Name: "MaxConcurrentIndex", + Type: "int", + + Comment: `The maximum amount of indexing jobs that can run simultaneously. +Default value: 5.`, + }, + { + Name: "MaxConcurrentReadyFetches", + Type: "int", + + Comment: `The maximum amount of unsealed deals that can be fetched simultaneously +from the storage subsystem. +Default value: 2.`, + }, + { + Name: "MaxConcurrencyStorageCalls", + Type: "int", + + Comment: `The maximum number of simultaneous inflight API calls to the storage +subsystem. +Default value: 100.`, + }, + { + Name: "GCInterval", + Type: "Duration", + + Comment: `The time between calls to periodic dagstore GC, in time.Duration string +representation, e.g. 1m, 5m, 1h. +Default value: 1 minute.`, + }, + }, "DealmakingConfig": []DocField{ { Name: "ConsiderOnlineStorageDeals", @@ -753,6 +813,12 @@ Default is 20 (about once a week).`, Name: "Addresses", Type: "MinerAddressConfig", + Comment: ``, + }, + { + Name: "DAGStore", + Type: "DAGStoreConfig", + Comment: ``, }, }, diff --git a/node/config/types.go b/node/config/types.go index fe42aa27e..9f77ee7a4 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -50,6 +50,46 @@ type StorageMiner struct { Storage sectorstorage.SealerConfig Fees MinerFeeConfig Addresses MinerAddressConfig + DAGStore DAGStoreConfig +} + +type DAGStoreConfig struct { + // Path to the transients directory. The transients directory caches + // unsealed deals that have been fetched from the storage subsystem for + // serving retrievals. When empty or omitted, the default value applies. + // Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment) + // or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment) + TransientsDir string + + // Path to indices directory. When empty or omitted, the default value applies. + // Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment) + // or $LOTUS_MINER_PATH/dagStore/index (monolith deployment) + IndexDir string + + // Path to datastore directory. The datastore is a KV store tracking the + // state of shards known to the DAG store. + // Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment) + // or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment) + DatastoreDir string + + // The maximum amount of indexing jobs that can run simultaneously. + // Default value: 5. + MaxConcurrentIndex int + + // The maximum amount of unsealed deals that can be fetched simultaneously + // from the storage subsystem. + // Default value: 2. + MaxConcurrentReadyFetches int + + // The maximum number of simultaneous inflight API calls to the storage + // subsystem. + // Default value: 100. + MaxConcurrencyStorageCalls int + + // The time between calls to periodic dagstore GC, in time.Duration string + // representation, e.g. 1m, 5m, 1h. + // Default value: 1 minute. + GCInterval Duration } type MinerSubsystemConfig struct { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index f4401324b..9dfb71019 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -10,8 +10,7 @@ import ( "sort" "time" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/ipld/go-car" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" @@ -26,7 +25,7 @@ import ( files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" - "github.com/ipld/go-car" + basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" @@ -47,6 +46,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" @@ -54,6 +54,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/markets/utils" @@ -188,17 +189,17 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs) result, err := a.SMDealClient.ProposeStorageDeal(ctx, storagemarket.ProposeStorageDealParams{ - Addr: params.Wallet, - Info: &providerInfo, - Data: params.Data, - StartEpoch: dealStart, - EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), - Price: params.EpochPrice, - Collateral: params.ProviderCollateral, - Rt: st, - FastRetrieval: params.FastRetrieval, - VerifiedDeal: params.VerifiedDeal, - FilestoreCARv2FilePath: CARV2FilePath, + Addr: params.Wallet, + Info: &providerInfo, + Data: params.Data, + StartEpoch: dealStart, + EndEpoch: calcDealExpiration(params.MinBlocksDuration, md, dealStart), + Price: params.EpochPrice, + Collateral: params.ProviderCollateral, + Rt: st, + FastRetrieval: params.FastRetrieval, + VerifiedDeal: params.VerifiedDeal, + IndexedCAR: CARV2FilePath, }) if err != nil { @@ -519,7 +520,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor } }() - root, err = a.importNormalFileToFilestoreCARv2(ctx, id, ref.Path, carFile) + root, err = a.doImport(ctx, ref.Path, carFile) if err != nil { return nil, xerrors.Errorf("failed to import normal file to CARv2: %w", err) } @@ -1031,21 +1032,21 @@ func (w *lenWriter) Write(p []byte) (n int, err error) { } func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) { - fc, err := a.imgr().FilestoreCARV2FilePathFor(root) + path, err := a.imgr().FilestoreCARV2FilePathFor(root) if err != nil { return api.DataSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err) } - if len(fc) == 0 { + if len(path) == 0 { return api.DataSize{}, xerrors.New("no CARv2 file for root") } - rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(fc) + fs, err := stores.ReadOnlyFilestore(path) if err != nil { - return api.DataSize{}, xerrors.Errorf("failed to open read only filestore: %w", err) + return api.DataSize{}, xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", path, err) } - defer rdOnly.Close() //nolint:errcheck + defer fs.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly))) + dag := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) w := lenWriter(0) err = car.WriteCar(ctx, dag, []cid.Cid{root}, &w) @@ -1062,21 +1063,21 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e } func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) { - fc, err := a.imgr().FilestoreCARV2FilePathFor(root) + path, err := a.imgr().FilestoreCARV2FilePathFor(root) if err != nil { return api.DataCIDSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err) } - if len(fc) == 0 { + if len(path) == 0 { return api.DataCIDSize{}, xerrors.New("no CARv2 file for root") } - rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(fc) + fs, err := stores.ReadOnlyFilestore(path) if err != nil { - return api.DataCIDSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err) + return api.DataCIDSize{}, xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", path, err) } - defer rdOnly.Close() //nolint:errcheck + defer fs.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly))) + dag := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) w := &writer.Writer{} bw := bufio.NewWriterSize(w, int(writer.CommPBuf)) @@ -1095,22 +1096,20 @@ func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCID func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error { id := importmgr.ImportID(rand.Uint64()) - tmpCARv2File, err := a.imgr().NewTempFile(id) + tmp, err := a.imgr().NewTempFile(id) if err != nil { return xerrors.Errorf("failed to create temp file: %w", err) } - defer os.Remove(tmpCARv2File) //nolint:errcheck + defer os.Remove(tmp) //nolint:errcheck - root, err := a.importNormalFileToFilestoreCARv2(ctx, id, ref.Path, tmpCARv2File) + root, err := a.doImport(ctx, ref.Path, tmp) if err != nil { return xerrors.Errorf("failed to import normal file to CARv2") } - // generate a deterministic CARv1 payload from the UnixFS DAG by doing an IPLD - // traversal over the Unixfs DAG in the CARv2 file using the "all selector" i.e the entire DAG selector. - fs, err := filestorecaradapter.NewReadOnlyFileStore(tmpCARv2File) + fs, err := stores.ReadOnlyFilestore(tmp) if err != nil { - return xerrors.Errorf("failed to open read only CARv2 blockstore: %w", err) + return xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", tmp, err) } defer fs.Close() //nolint:errcheck diff --git a/node/impl/client/import.go b/node/impl/client/import.go index eb0ed5472..d44d73ac5 100644 --- a/node/impl/client/import.go +++ b/node/impl/client/import.go @@ -2,18 +2,13 @@ package client import ( "context" + "io/ioutil" "os" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" - bstore "github.com/filecoin-project/lotus/blockstore" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/node/repo/importmgr" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-filestore" chunker "github.com/ipfs/go-ipfs-chunker" offline "github.com/ipfs/go-ipfs-exchange-offline" files2 "github.com/ipfs/go-ipfs-files" @@ -21,56 +16,83 @@ import ( "github.com/ipfs/go-merkledag" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" ) -// importNormalFileToFilestoreCARv2 transforms the client's "normal file" to a Unixfs IPLD DAG and writes out the DAG to a CARv2 file -// that can be used to back a filestore. -func (a *API) importNormalFileToFilestoreCARv2(ctx context.Context, importID importmgr.ImportID, inputFilePath string, outputCARv2Path string) (c cid.Cid, finalErr error) { - - // TODO: We've currently put in a hack to create the Unixfs DAG as a CARv2 without using Badger. - // We first create the Unixfs DAG using a filestore to get the root of the Unixfs DAG. - // We can't create the UnixfsDAG right away using a CARv2 read-write blockstore as the blockstore - // needs the root of the DAG during instantiation to write out a valid CARv2 file. +// doImport takes a standard file (src), forms a UnixFS DAG, and writes a +// CARv2 file with positional mapping (backed by the go-filestore library). +func (a *API) doImport(ctx context.Context, src string, dst string) (cid.Cid, error) { + // This method uses a two-phase approach with a staging CAR blockstore and + // a final CAR blockstore. // - // In the second pass, we create a CARv2 file with the root present using the root node we get in the above step. - // This hack should be fixed when CARv2 allows specifying the root AFTER finishing the CARv2 streaming write. - fm := filestore.NewFileManager(ds_sync.MutexWrap(datastore.NewMapDatastore()), "/") - fm.AllowFiles = true - fstore := filestore.NewFilestore(bstore.NewMemorySync(), fm) + // This is necessary because of https://github.com/ipld/go-car/issues/196 + // + // TODO: do we need to chunk twice? Isn't the first output already in the + // right order? Can't we just copy the CAR file and replace the header? + f, err := ioutil.TempFile("", "") + if err != nil { + return cid.Undef, xerrors.Errorf("failed to create temp file: %w", err) + } + _ = f.Close() // close; we only want the path. + tmp := f.Name() + defer os.Remove(tmp) //nolint:errcheck + + // Step 1. Compute the UnixFS DAG and write it to a CARv2 file to get + // the root CID of the DAG. + fstore, err := stores.ReadWriteFilestore(tmp) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to create temporary filestore: %w", err) + } + bsvc := blockservice.New(fstore, offline.Exchange(fstore)) - defer bsvc.Close() //nolint:errcheck + dags := merkledag.NewDAGService(bsvc) - // ---- First Pass --- Write out the UnixFS DAG to a rootless CARv2 file by instantiating a read-write CARv2 blockstore without the root. - root, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc)) + root, err := buildUnixFS(ctx, src, dags) if err != nil { - return cid.Undef, xerrors.Errorf("failed to import file to store: %w", err) + _ = fstore.Close() + return cid.Undef, xerrors.Errorf("failed to import file to store to compute root: %w", err) } - //------ Second Pass --- Now that we have the root of the Unixfs DAG -> write out the Unixfs DAG to a CARv2 file with the root present by using a - // filestore backed by a read-write CARv2 blockstore. - fsb, err := filestorecaradapter.NewReadWriteFileStore(outputCARv2Path, []cid.Cid{root}) - if err != nil { - return cid.Undef, xerrors.Errorf("failed to create a CARv2 read-write blockstore: %w", err) - } - defer fsb.Close() //nolint:errcheck - - bsvc = blockservice.New(fsb, offline.Exchange(fsb)) - root2, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc)) - if err != nil { - return cid.Undef, xerrors.Errorf("failed to create Unixfs DAG with CARv2 blockstore: %w", err) + if err := fstore.Close(); err != nil { + return cid.Undef, xerrors.Errorf("failed to finalize car filestore: %w", err) } - if root != root2 { + // Step 2. We now have the root of the UnixFS DAG, and we can write the + // final CAR for real under `dst`. + bs, err := blockstore.OpenReadWrite(dst, []cid.Cid{root}, + car.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true)) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to create a carv2 read/write blockstore: %w", err) + } + + bsvc = blockservice.New(bs, offline.Exchange(bs)) + dags = merkledag.NewDAGService(bsvc) + finalRoot, err := buildUnixFS(ctx, src, dags) + if err != nil { + _ = bs.Close() + return cid.Undef, xerrors.Errorf("failed to create UnixFS DAG with carv2 blockstore: %w", err) + } + + if err := bs.Finalize(); err != nil { + return cid.Undef, xerrors.Errorf("failed to finalize car blockstore: %w", err) + } + + if root != finalRoot { return cid.Undef, xerrors.New("roots do not match") } return root, nil } -// importNormalFileToUnixfsDAG transforms a client's normal file to a UnixfsDAG and imports the DAG to the given DAG service. -func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag ipld.DAGService) (cid.Cid, error) { - f, err := os.Open(inputFilePath) +// buildUnixFS builds a UnixFS DAG out of the supplied ordinary file, +// and imports the DAG into the supplied service. +func buildUnixFS(ctx context.Context, src string, dag ipld.DAGService) (cid.Cid, error) { + f, err := os.Open(src) if err != nil { return cid.Undef, xerrors.Errorf("failed to open input file: %w", err) } @@ -81,7 +103,7 @@ func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag return cid.Undef, xerrors.Errorf("failed to stat file :%w", err) } - file, err := files2.NewReaderPathFile(inputFilePath, f, stat) + file, err := files2.NewReaderPathFile(src, f, stat) if err != nil { return cid.Undef, xerrors.Errorf("failed to create reader path file: %w", err) } diff --git a/node/impl/client/import_test.go b/node/impl/client/import_test.go index aea8b6adb..a6d580e5c 100644 --- a/node/impl/client/import_test.go +++ b/node/impl/client/import_test.go @@ -4,12 +4,11 @@ import ( "context" "io" "io/ioutil" - "math/rand" "os" "strings" "testing" - "github.com/filecoin-project/go-fil-markets/filestorecaradapter" + "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -22,39 +21,50 @@ import ( "github.com/stretchr/testify/require" ) -func TestImportNormalFileToUnixfsDAG(t *testing.T) { +// This test uses a full "dense" CARv2, and not a filestore (positional mapping). +func TestRoundtripUnixFS_Dense(t *testing.T) { ctx := context.Background() - inputPath, inputContents := genNormalInputFile(t) + + inputPath, inputContents := genInputFile(t) defer os.Remove(inputPath) //nolint:errcheck - carv2File := genTmpFile(t) + + carv2File := newTmpFile(t) defer os.Remove(carv2File) //nolint:errcheck - // import a normal file to a Unixfs DAG using a CARv2 read-write blockstore and flush it out to a CARv2 file. - tempCARv2Store, err := blockstore.OpenReadWrite(carv2File, []cid.Cid{}, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) + // import a file to a Unixfs DAG using a CARv2 read/write blockstore. + path, err := blockstore.OpenReadWrite(carv2File, nil, + carv2.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true)) require.NoError(t, err) - bsvc := blockservice.New(tempCARv2Store, offline.Exchange(tempCARv2Store)) - root, err := importNormalFileToUnixfsDAG(ctx, inputPath, merkledag.NewDAGService(bsvc)) + + bsvc := blockservice.New(path, offline.Exchange(path)) + dags := merkledag.NewDAGService(bsvc) + + root, err := buildUnixFS(ctx, inputPath, dags) require.NoError(t, err) require.NotEqual(t, cid.Undef, root) - require.NoError(t, tempCARv2Store.Finalize()) + require.NoError(t, path.Finalize()) - // convert the CARv2 file to a normal file again and ensure the contents match. - readOnly, err := blockstore.OpenReadOnly(carv2File, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true)) + // reconstruct the file. + readOnly, err := blockstore.OpenReadOnly(carv2File, + carv2.ZeroLengthSectionAsEOF(true), + blockstore.UseWholeCIDs(true)) require.NoError(t, err) defer readOnly.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly))) - nd, err := dag.Get(ctx, root) - require.NoError(t, err) - file, err := unixfile.NewUnixfsFile(ctx, dag, nd) + dags = merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly))) + + nd, err := dags.Get(ctx, root) require.NoError(t, err) - tmpOutput := genTmpFile(t) + file, err := unixfile.NewUnixfsFile(ctx, dags, nd) + require.NoError(t, err) + + tmpOutput := newTmpFile(t) defer os.Remove(tmpOutput) //nolint:errcheck require.NoError(t, files.WriteTo(file, tmpOutput)) // ensure contents of the initial input file and the output file are identical. - fo, err := os.Open(tmpOutput) require.NoError(t, err) bz2, err := ioutil.ReadAll(fo) @@ -63,35 +73,36 @@ func TestImportNormalFileToUnixfsDAG(t *testing.T) { require.Equal(t, inputContents, bz2) } -func TestImportNormalFileToCARv2(t *testing.T) { +func TestRoundtripUnixFS_Filestore(t *testing.T) { ctx := context.Background() a := &API{ Imports: &importmgr.Mgr{}, } - importID := importmgr.ImportID(rand.Uint64()) - inputFilePath, inputContents := genNormalInputFile(t) + inputFilePath, inputContents := genInputFile(t) defer os.Remove(inputFilePath) //nolint:errcheck - outputCARv2 := genTmpFile(t) - defer os.Remove(outputCARv2) //nolint:errcheck + path := newTmpFile(t) + defer os.Remove(path) //nolint:errcheck - root, err := a.importNormalFileToFilestoreCARv2(ctx, importID, inputFilePath, outputCARv2) + root, err := a.doImport(ctx, inputFilePath, path) require.NoError(t, err) require.NotEqual(t, cid.Undef, root) // convert the CARv2 to a normal file again and ensure the contents match - readOnly, err := filestorecaradapter.NewReadOnlyFileStore(outputCARv2) + fs, err := stores.ReadOnlyFilestore(path) require.NoError(t, err) - defer readOnly.Close() //nolint:errcheck - dag := merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly))) + defer fs.Close() //nolint:errcheck - nd, err := dag.Get(ctx, root) - require.NoError(t, err) - file, err := unixfile.NewUnixfsFile(ctx, dag, nd) + dags := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs))) + + nd, err := dags.Get(ctx, root) require.NoError(t, err) - tmpOutput := genTmpFile(t) + file, err := unixfile.NewUnixfsFile(ctx, dags, nd) + require.NoError(t, err) + + tmpOutput := newTmpFile(t) defer os.Remove(tmpOutput) //nolint:errcheck require.NoError(t, files.WriteTo(file, tmpOutput)) @@ -104,14 +115,14 @@ func TestImportNormalFileToCARv2(t *testing.T) { require.Equal(t, inputContents, bz2) } -func genTmpFile(t *testing.T) string { +func newTmpFile(t *testing.T) string { f, err := os.CreateTemp("", "") require.NoError(t, err) require.NoError(t, f.Close()) return f.Name() } -func genNormalInputFile(t *testing.T) (filepath string, contents []byte) { +func genInputFile(t *testing.T) (filepath string, contents []byte) { s := strings.Repeat("abcde", 100) tmp, err := os.CreateTemp("", "") require.NoError(t, err) diff --git a/node/modules/client.go b/node/modules/client.go index 44b555556..b7757f8df 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -182,7 +182,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) { - carsPath := filepath.Join(r.Path(), dagStore, "retrieval-cars") + carsPath := filepath.Join(r.Path(), DefaultDAGStoreDir, "retrieval-cars") if err := os.MkdirAll(carsPath, 0755); err != nil { return nil, xerrors.Errorf("failed to create dir") diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index bc54fef38..f9107006e 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -7,9 +7,7 @@ import ( "fmt" "net/http" "os" - "path" "path/filepath" - "strconv" "strings" "time" @@ -17,22 +15,6 @@ import ( "go.uber.org/multierr" "golang.org/x/xerrors" - "github.com/ipfs/go-bitswap" - "github.com/ipfs/go-bitswap/network" - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - levelds "github.com/ipfs/go-ds-leveldb" - measure "github.com/ipfs/go-ds-measure" - graphsync "github.com/ipfs/go-graphsync/impl" - gsnet "github.com/ipfs/go-graphsync/network" - "github.com/ipfs/go-graphsync/storeutil" - "github.com/ipfs/go-merkledag" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" - ldbopts "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/filecoin-project/go-address" dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtnet "github.com/filecoin-project/go-data-transfer/network" @@ -52,6 +34,18 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + graphsync "github.com/ipfs/go-graphsync/impl" + gsnet "github.com/ipfs/go-graphsync/network" + "github.com/ipfs/go-graphsync/storeutil" + "github.com/ipfs/go-merkledag" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/routing" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" @@ -82,7 +76,6 @@ import ( ) var StorageCounterDSPrefix = "/storage/nextid" -var dagStore = "dagStore" func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { maddrb, err := ds.Get(datastore.NewKey("miner-address")) @@ -581,101 +574,6 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside } } -func NewLotusAccessor(lc fx.Lifecycle, - pieceStore dtypes.ProviderPieceStore, - rpn retrievalmarket.RetrievalProviderNode, -) (dagstore.LotusAccessor, error) { - mountApi := dagstore.NewLotusAccessor(pieceStore, rpn) - ready := make(chan error, 1) - pieceStore.OnReady(func(err error) { - ready <- err - }) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - if err := <-ready; err != nil { - return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err) - } - return mountApi.Start(ctx) - }, - OnStop: func(context.Context) error { - return nil - }, - }) - - return mountApi, nil -} - -func DagStoreWrapper( - lc fx.Lifecycle, - r repo.LockedRepo, - lotusAccessor dagstore.LotusAccessor, -) (*dagstore.Wrapper, error) { - dagStoreDir := filepath.Join(r.Path(), dagStore) - dagStoreDS, err := createDAGStoreDatastore(dagStoreDir) - if err != nil { - return nil, err - } - - var maxCopies = 2 - // TODO replace env with config.toml attribute. - v, ok := os.LookupEnv("LOTUS_DAGSTORE_COPY_CONCURRENCY") - if ok { - concurrency, err := strconv.Atoi(v) - if err == nil { - maxCopies = concurrency - } - } - - cfg := dagstore.MarketDAGStoreConfig{ - TransientsDir: filepath.Join(dagStoreDir, "transients"), - IndexDir: filepath.Join(dagStoreDir, "index"), - Datastore: dagStoreDS, - GCInterval: 1 * time.Minute, - MaxConcurrentIndex: 5, - MaxConcurrentReadyFetches: maxCopies, - } - - dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor) - if err != nil { - return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err) - } - - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return dsw.Start(ctx) - }, - OnStop: func(context.Context) error { - return dsw.Close() - }, - }) - return dsw, nil -} - -// createDAGStoreDatastore creates a datastore under the given base directory -// for DAG store metadata -func createDAGStoreDatastore(baseDir string) (datastore.Batching, error) { - // Create a datastore directory under the base dir if it doesn't already exist - dsDir := path.Join(baseDir, "datastore") - if err := os.MkdirAll(dsDir, 0755); err != nil { - return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dsDir, err) - } - - // Create a new LevelDB datastore - ds, err := levelds.NewDatastore(dsDir, &levelds.Options{ - Compression: ldbopts.NoCompression, - NoSync: false, - Strict: ldbopts.StrictAll, - ReadOnly: false, - }) - if err != nil { - return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) - } - - // Keep statistics about the datastore - mds := measure.New("measure.", ds) - return mds, nil -} - func StorageProvider(minerAddress dtypes.MinerAddress, storedAsk *storedask.StoredAsk, h host.Host, ds dtypes.MetadataDS, @@ -692,7 +590,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, return nil, err } - dagStorePath := filepath.Join(r.Path(), dagStore) + dagStorePath := filepath.Join(r.Path(), DefaultDAGStoreDir) opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), dagStorePath, dsw, pieceStore, spn) diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go new file mode 100644 index 000000000..6f032ed34 --- /dev/null +++ b/node/modules/storageminer_dagstore.go @@ -0,0 +1,118 @@ +package modules + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "go.uber.org/fx" + "golang.org/x/xerrors" + + mdagstore "github.com/filecoin-project/lotus/markets/dagstore" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" +) + +const ( + EnvDAGStoreCopyConcurrency = "LOTUS_DAGSTORE_COPY_CONCURRENCY" + DefaultDAGStoreDir = "dagStore" +) + +// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts. +func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode) (mdagstore.MinerAPI, error) { + cfg, err := extractDAGStoreConfig(r) + if err != nil { + return nil, err + } + + // caps the amount of concurrent calls to the storage, so that we don't + // spam it during heavy processes like bulk migration. + if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + cfg.MaxConcurrencyStorageCalls = concurrency + } + } + + mountApi := mdagstore.NewMinerAPI(pieceStore, rpn, cfg.MaxConcurrencyStorageCalls) + ready := make(chan error, 1) + pieceStore.OnReady(func(err error) { + ready <- err + }) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := <-ready; err != nil { + return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err) + } + return mountApi.Start(ctx) + }, + OnStop: func(context.Context) error { + return nil + }, + }) + + return mountApi, nil +} + +// DAGStore constructs a DAG store using the supplied minerAPI, and the +// user configuration. It returns both the DAGStore and the Wrapper suitable for +// passing to markets. +func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { + cfg, err := extractDAGStoreConfig(r) + if err != nil { + return nil, nil, err + } + + // populate default directories if not explicitly set in the config. + defaultDir := filepath.Join(r.Path(), DefaultDAGStoreDir) + if cfg.TransientsDir == "" { + cfg.TransientsDir = filepath.Join(defaultDir, "transients") + } + if cfg.IndexDir == "" { + cfg.IndexDir = filepath.Join(defaultDir, "index") + } + if cfg.DatastoreDir == "" { + cfg.DatastoreDir = filepath.Join(defaultDir, "datastore") + } + + v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency) + if ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + cfg.MaxConcurrentReadyFetches = concurrency + } + } + + dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI) + if err != nil { + return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) + } + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return w.Start(ctx) + }, + OnStop: func(context.Context) error { + return w.Close() + }, + }) + + return dagst, w, nil +} + +func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) { + cfg, err := r.Config() + if err != nil { + return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err) + } + mcfg, ok := cfg.(*config.StorageMiner) + if !ok { + return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg) + } + return mcfg.DAGStore, nil +}