Merge pull request #6948 from filecoin-project/feat/dagst-throttle0-fetch-copy
Dagstore throttle fetch and copy
This commit is contained in:
commit
3be2681824
4
go.mod
4
go.mod
@ -26,7 +26,7 @@ require (
|
||||
github.com/elastic/gosigar v0.12.0
|
||||
github.com/etclabscore/go-openrpc-reflect v0.0.36
|
||||
github.com/fatih/color v1.9.0
|
||||
github.com/filecoin-project/dagstore v0.3.2
|
||||
github.com/filecoin-project/dagstore v0.4.0
|
||||
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
|
||||
github.com/filecoin-project/go-address v0.0.5
|
||||
github.com/filecoin-project/go-bitfield v0.2.4
|
||||
@ -56,7 +56,7 @@ require (
|
||||
github.com/gdamore/tcell/v2 v2.2.0
|
||||
github.com/go-kit/kit v0.10.0
|
||||
github.com/go-ole/go-ole v1.2.4 // indirect
|
||||
github.com/golang/mock v1.5.0
|
||||
github.com/golang/mock v1.6.0
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
|
9
go.sum
9
go.sum
@ -257,8 +257,8 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
|
||||
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
|
||||
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
|
||||
github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
|
||||
github.com/filecoin-project/dagstore v0.3.2 h1:YgLW+0VpbjsWuRDUaPmjaS/KRz1cRbi0zAjZV/xfIwY=
|
||||
github.com/filecoin-project/dagstore v0.3.2/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
|
||||
github.com/filecoin-project/dagstore v0.4.0 h1:ZeoL5Gbgn4BSzKgghobfAO9a5nVqfVC4fQ/lNYn1GQo=
|
||||
github.com/filecoin-project/dagstore v0.4.0/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
|
||||
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
||||
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
|
||||
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
||||
@ -435,8 +435,8 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU
|
||||
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
|
||||
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
|
||||
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
|
||||
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
|
||||
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
|
||||
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
|
||||
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
@ -1989,6 +1989,7 @@ golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3/go.mod h1:njjCfa9FT2d7l9Bc
|
||||
golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
@ -33,6 +33,7 @@ var MaxConcurrentStorageCalls = func() int {
|
||||
type LotusAccessor interface {
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
||||
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
|
||||
Start(ctx context.Context) error
|
||||
}
|
||||
|
||||
@ -58,6 +59,54 @@ func (m *lotusAccessor) Start(_ context.Context) error {
|
||||
return m.readyMgr.FireReady(nil)
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
|
||||
err := m.readyMgr.AwaitReady()
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed while waiting for accessor to start: %w", err)
|
||||
}
|
||||
|
||||
var pieceInfo piecestore.PieceInfo
|
||||
err = m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||
pieceInfo, err = m.pieceStore.GetPieceInfo(pieceCid)
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
}
|
||||
|
||||
if len(pieceInfo.Deals) == 0 {
|
||||
return false, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
|
||||
}
|
||||
|
||||
// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
|
||||
for _, deal := range pieceInfo.Deals {
|
||||
deal := deal
|
||||
|
||||
var isUnsealed bool
|
||||
// Throttle this path to avoid flooding the storage subsystem.
|
||||
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||
isUnsealed, err = m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorID, deal.DealID, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
|
||||
continue // move on to the next match.
|
||||
}
|
||||
|
||||
if isUnsealed {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// we don't have an unsealed sector containing the piece
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||
err := m.readyMgr.AwaitReady()
|
||||
if err != nil {
|
||||
|
@ -45,6 +45,8 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
name string
|
||||
deals []abi.SectorNumber
|
||||
fetchedData string
|
||||
isUnsealed bool
|
||||
|
||||
expectErr bool
|
||||
}{{
|
||||
// Expect error if there is no deal info for piece CID
|
||||
@ -56,11 +58,13 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
name: "prefer unsealed deal",
|
||||
deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID},
|
||||
fetchedData: unsealedSectorData,
|
||||
isUnsealed: true,
|
||||
}, {
|
||||
// Expect the API to unseal the data if there are no unsealed deals
|
||||
name: "unseal if necessary",
|
||||
deals: []abi.SectorNumber{sealedSectorID},
|
||||
fetchedData: sealedSectorData,
|
||||
isUnsealed: false,
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -95,6 +99,10 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.fetchedData, string(bz))
|
||||
|
||||
uns, err := api.IsUnsealed(ctx, cid1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.isUnsealed, uns)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -179,8 +187,14 @@ func getPieceStore(t *testing.T) piecestore.PieceStore {
|
||||
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||
require.NoError(t, err)
|
||||
|
||||
ch := make(chan struct{}, 1)
|
||||
ps.OnReady(func(_ error) {
|
||||
ch <- struct{}{}
|
||||
})
|
||||
|
||||
err = ps.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
<-ch
|
||||
return ps
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: markets/dagstore/lotusaccessor.go
|
||||
// Source: lotusaccessor.go
|
||||
|
||||
// Package mock_dagstore is a generated GoMock package.
|
||||
package mock_dagstore
|
||||
@ -31,10 +31,6 @@ func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor {
|
||||
return mock
|
||||
}
|
||||
|
||||
func (mr *MockLotusAccessor) Start(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder {
|
||||
return m.recorder
|
||||
@ -69,3 +65,32 @@ func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interf
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid)
|
||||
}
|
||||
|
||||
// IsUnsealed mocks base method.
|
||||
func (m *MockLotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "IsUnsealed", ctx, pieceCid)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// IsUnsealed indicates an expected call of IsUnsealed.
|
||||
func (mr *MockLotusAccessorMockRecorder) IsUnsealed(ctx, pieceCid interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockLotusAccessor)(nil).IsUnsealed), ctx, pieceCid)
|
||||
}
|
||||
|
||||
// Start mocks base method.
|
||||
func (m *MockLotusAccessor) Start(ctx context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Start", ctx)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Start indicates an expected call of Start.
|
||||
func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx)
|
||||
}
|
||||
|
@ -82,11 +82,16 @@ func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
|
||||
if err != nil {
|
||||
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err)
|
||||
}
|
||||
isUnsealed, err := l.Api.IsUnsealed(ctx, l.PieceCid)
|
||||
if err != nil {
|
||||
return mount.Stat{}, xerrors.Errorf("failed to verify if we have the unsealed piece %s: %w", l.PieceCid, err)
|
||||
}
|
||||
|
||||
// TODO Mark false when storage deal expires.
|
||||
return mount.Stat{
|
||||
Exists: true,
|
||||
Size: int64(size),
|
||||
Ready: isUnsealed,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,9 @@ func TestLotusMount(t *testing.T) {
|
||||
|
||||
// create a mock lotus api that returns the reader we want
|
||||
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
|
||||
|
||||
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
|
||||
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||
@ -114,8 +117,10 @@ func TestLotusMountRegistration(t *testing.T) {
|
||||
mnt, err := registry.Instantiate(u)
|
||||
require.NoError(t, err)
|
||||
|
||||
mockLotusMountAPI.EXPECT().IsUnsealed(ctx, cid).Return(true, nil)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||
stat, err := mnt.Stat(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 100, stat.Size)
|
||||
require.True(t, stat.Ready)
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ type MarketDAGStoreConfig struct {
|
||||
IndexDir string
|
||||
Datastore ds.Datastore
|
||||
MaxConcurrentIndex int
|
||||
MaxConcurrentCopies int
|
||||
MaxConcurrentReadyFetches int
|
||||
GCInterval time.Duration
|
||||
}
|
||||
|
||||
@ -92,7 +92,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
||||
// not limiting fetches globally, as the Lotus mount does
|
||||
// conditional throttling.
|
||||
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
|
||||
MaxConcurrentCopies: cfg.MaxConcurrentCopies,
|
||||
MaxConcurrentReadyFetches: cfg.MaxConcurrentReadyFetches,
|
||||
RecoverOnStart: dagstore.RecoverOnAcquire,
|
||||
}
|
||||
dagStore, err := dagstore.NewDAGStore(dcfg)
|
||||
|
@ -92,7 +92,8 @@ func TestWrapperBackground(t *testing.T) {
|
||||
w.dagStore = mock
|
||||
|
||||
// Start up the wrapper
|
||||
w.Start(ctx)
|
||||
err = w.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Expect GC to be called automatically
|
||||
tctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
@ -127,6 +128,10 @@ type mockDagStore struct {
|
||||
close chan struct{}
|
||||
}
|
||||
|
||||
func (m *mockDagStore) Start(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error {
|
||||
m.register <- key
|
||||
out <- dagstore.ShardResult{Key: key}
|
||||
@ -178,6 +183,10 @@ func (m mockLotusMount) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m mockLotusMount) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func getShardAccessor(t *testing.T) *dagstore.ShardAccessor {
|
||||
data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car")
|
||||
require.NoError(t, err)
|
||||
|
@ -632,7 +632,7 @@ func DagStoreWrapper(
|
||||
Datastore: dagStoreDS,
|
||||
GCInterval: 1 * time.Minute,
|
||||
MaxConcurrentIndex: 5,
|
||||
MaxConcurrentCopies: maxCopies,
|
||||
MaxConcurrentReadyFetches: maxCopies,
|
||||
}
|
||||
|
||||
dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)
|
||||
|
Loading…
Reference in New Issue
Block a user