diff --git a/go.mod b/go.mod index e7110860a..fc85120be 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 - github.com/filecoin-project/dagstore v0.3.2 + github.com/filecoin-project/dagstore v0.4.0 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 @@ -56,7 +56,7 @@ require ( github.com/gdamore/tcell/v2 v2.2.0 github.com/go-kit/kit v0.10.0 github.com/go-ole/go-ole v1.2.4 // indirect - github.com/golang/mock v1.5.0 + github.com/golang/mock v1.6.0 github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.7.4 github.com/gorilla/websocket v1.4.2 diff --git a/go.sum b/go.sum index 20703cb92..d38c6a2c7 100644 --- a/go.sum +++ b/go.sum @@ -257,8 +257,8 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= -github.com/filecoin-project/dagstore v0.3.2 h1:YgLW+0VpbjsWuRDUaPmjaS/KRz1cRbi0zAjZV/xfIwY= -github.com/filecoin-project/dagstore v0.3.2/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= +github.com/filecoin-project/dagstore v0.4.0 h1:ZeoL5Gbgn4BSzKgghobfAO9a5nVqfVC4fQ/lNYn1GQo= +github.com/filecoin-project/dagstore v0.4.0/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -435,8 +435,8 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= -github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g= -github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1989,6 +1989,7 @@ golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/lotusaccessor.go index be895e476..268720fae 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/lotusaccessor.go @@ -33,6 +33,7 @@ var MaxConcurrentStorageCalls = func() int { type LotusAccessor interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) + IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) Start(ctx context.Context) error } @@ -58,6 +59,54 @@ func (m *lotusAccessor) Start(_ context.Context) error { return m.readyMgr.FireReady(nil) } +func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { + err := m.readyMgr.AwaitReady() + if err != nil { + return false, xerrors.Errorf("failed while waiting for accessor to start: %w", err) + } + + var pieceInfo piecestore.PieceInfo + err = m.throttle.Do(ctx, func(ctx context.Context) (err error) { + pieceInfo, err = m.pieceStore.GetPieceInfo(pieceCid) + return err + }) + + if err != nil { + return false, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) + } + + if len(pieceInfo.Deals) == 0 { + return false, xerrors.Errorf("no storage deals found for piece %s", pieceCid) + } + + // check if we have an unsealed deal for the given piece in any of the unsealed sectors. + for _, deal := range pieceInfo.Deals { + deal := deal + + var isUnsealed bool + // Throttle this path to avoid flooding the storage subsystem. + err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { + isUnsealed, err = m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err != nil { + return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorID, deal.DealID, err) + } + return nil + }) + + if err != nil { + log.Warnf("failed to check/retrieve unsealed sector: %s", err) + continue // move on to the next match. + } + + if isUnsealed { + return true, nil + } + } + + // we don't have an unsealed sector containing the piece + return false, nil +} + func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { err := m.readyMgr.AwaitReady() if err != nil { diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/lotusaccessor_test.go index b6a7a4b25..6d8513f56 100644 --- a/markets/dagstore/lotusaccessor_test.go +++ b/markets/dagstore/lotusaccessor_test.go @@ -45,7 +45,9 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { name string deals []abi.SectorNumber fetchedData string - expectErr bool + isUnsealed bool + + expectErr bool }{{ // Expect error if there is no deal info for piece CID name: "no deals", @@ -56,11 +58,13 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { name: "prefer unsealed deal", deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID}, fetchedData: unsealedSectorData, + isUnsealed: true, }, { // Expect the API to unseal the data if there are no unsealed deals name: "unseal if necessary", deals: []abi.SectorNumber{sealedSectorID}, fetchedData: sealedSectorData, + isUnsealed: false, }} for _, tc := range testCases { @@ -95,6 +99,10 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { require.NoError(t, err) require.Equal(t, tc.fetchedData, string(bz)) + + uns, err := api.IsUnsealed(ctx, cid1) + require.NoError(t, err) + require.Equal(t, tc.isUnsealed, uns) }) } } @@ -179,8 +187,14 @@ func getPieceStore(t *testing.T) piecestore.PieceStore { ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore())) require.NoError(t, err) + ch := make(chan struct{}, 1) + ps.OnReady(func(_ error) { + ch <- struct{}{} + }) + err = ps.Start(context.Background()) require.NoError(t, err) + <-ch return ps } diff --git a/markets/dagstore/mocks/mock_lotus_accessor.go b/markets/dagstore/mocks/mock_lotus_accessor.go index edd5f856f..2e19b4482 100644 --- a/markets/dagstore/mocks/mock_lotus_accessor.go +++ b/markets/dagstore/mocks/mock_lotus_accessor.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: markets/dagstore/lotusaccessor.go +// Source: lotusaccessor.go // Package mock_dagstore is a generated GoMock package. package mock_dagstore @@ -31,10 +31,6 @@ func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor { return mock } -func (mr *MockLotusAccessor) Start(_ context.Context) error { - return nil -} - // EXPECT returns an object that allows the caller to indicate expected use. func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder { return m.recorder @@ -69,3 +65,32 @@ func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interf mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid) } + +// IsUnsealed mocks base method. +func (m *MockLotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsUnsealed", ctx, pieceCid) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsUnsealed indicates an expected call of IsUnsealed. +func (mr *MockLotusAccessorMockRecorder) IsUnsealed(ctx, pieceCid interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockLotusAccessor)(nil).IsUnsealed), ctx, pieceCid) +} + +// Start mocks base method. +func (m *MockLotusAccessor) Start(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Start", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Start indicates an expected call of Start. +func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx) +} diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index e87914732..f53c31c7c 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -82,11 +82,16 @@ func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) { if err != nil { return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err) } + isUnsealed, err := l.Api.IsUnsealed(ctx, l.PieceCid) + if err != nil { + return mount.Stat{}, xerrors.Errorf("failed to verify if we have the unsealed piece %s: %w", l.PieceCid, err) + } // TODO Mark false when storage deal expires. return mount.Stat{ Exists: true, Size: int64(size), + Ready: isUnsealed, }, nil } diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 836506ad0..817b91760 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -27,6 +27,9 @@ func TestLotusMount(t *testing.T) { // create a mock lotus api that returns the reader we want mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) + + mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1) + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) @@ -114,8 +117,10 @@ func TestLotusMountRegistration(t *testing.T) { mnt, err := registry.Instantiate(u) require.NoError(t, err) + mockLotusMountAPI.EXPECT().IsUnsealed(ctx, cid).Return(true, nil) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) stat, err := mnt.Stat(context.Background()) require.NoError(t, err) require.EqualValues(t, 100, stat.Size) + require.True(t, stat.Ready) } diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index c775d50a5..4c285be35 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -27,12 +27,12 @@ var log = logging.Logger("dagstore-wrapper") // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. type MarketDAGStoreConfig struct { - TransientsDir string - IndexDir string - Datastore ds.Datastore - MaxConcurrentIndex int - MaxConcurrentCopies int - GCInterval time.Duration + TransientsDir string + IndexDir string + Datastore ds.Datastore + MaxConcurrentIndex int + MaxConcurrentReadyFetches int + GCInterval time.Duration } // DAGStore provides an interface for the DAG store that can be mocked out @@ -91,9 +91,9 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap TraceCh: traceCh, // not limiting fetches globally, as the Lotus mount does // conditional throttling. - MaxConcurrentIndex: cfg.MaxConcurrentIndex, - MaxConcurrentCopies: cfg.MaxConcurrentCopies, - RecoverOnStart: dagstore.RecoverOnAcquire, + MaxConcurrentIndex: cfg.MaxConcurrentIndex, + MaxConcurrentReadyFetches: cfg.MaxConcurrentReadyFetches, + RecoverOnStart: dagstore.RecoverOnAcquire, } dagStore, err := dagstore.NewDAGStore(dcfg) if err != nil { diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index a86b213aa..967fdcd07 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -92,7 +92,8 @@ func TestWrapperBackground(t *testing.T) { w.dagStore = mock // Start up the wrapper - w.Start(ctx) + err = w.Start(ctx) + require.NoError(t, err) // Expect GC to be called automatically tctx, cancel := context.WithTimeout(ctx, time.Second) @@ -127,6 +128,10 @@ type mockDagStore struct { close chan struct{} } +func (m *mockDagStore) Start(_ context.Context) error { + return nil +} + func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error { m.register <- key out <- dagstore.ShardResult{Key: key} @@ -178,6 +183,10 @@ func (m mockLotusMount) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid panic("implement me") } +func (m mockLotusMount) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { + panic("implement me") +} + func getShardAccessor(t *testing.T) *dagstore.ShardAccessor { data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car") require.NoError(t, err) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index bb6dc6101..bc54fef38 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -627,12 +627,12 @@ func DagStoreWrapper( } cfg := dagstore.MarketDAGStoreConfig{ - TransientsDir: filepath.Join(dagStoreDir, "transients"), - IndexDir: filepath.Join(dagStoreDir, "index"), - Datastore: dagStoreDS, - GCInterval: 1 * time.Minute, - MaxConcurrentIndex: 5, - MaxConcurrentCopies: maxCopies, + TransientsDir: filepath.Join(dagStoreDir, "transients"), + IndexDir: filepath.Join(dagStoreDir, "index"), + Datastore: dagStoreDS, + GCInterval: 1 * time.Minute, + MaxConcurrentIndex: 5, + MaxConcurrentReadyFetches: maxCopies, } dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)