fix: ensure piece store starts before calling lotus accessor methods
This commit is contained in:
parent
1452a361a9
commit
0dbb1940f0
@ -3,6 +3,7 @@ package dagstore
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
@ -12,26 +13,36 @@ import (
|
||||
)
|
||||
|
||||
type LotusAccessor interface {
|
||||
Start(ctx context.Context) error
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
|
||||
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
||||
}
|
||||
|
||||
type lotusAccessor struct {
|
||||
pieceStore piecestore.PieceStore
|
||||
rm retrievalmarket.RetrievalProviderNode
|
||||
|
||||
startLk sync.Mutex
|
||||
started bool
|
||||
startErr error
|
||||
}
|
||||
|
||||
var _ LotusAccessor = (*lotusAccessor)(nil)
|
||||
|
||||
func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
|
||||
func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
|
||||
return &lotusAccessor{
|
||||
pieceStore: store,
|
||||
rm: rm,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) Start(ctx context.Context) error {
|
||||
func (m *lotusAccessor) start(ctx context.Context) error {
|
||||
m.startLk.Lock()
|
||||
defer m.startLk.Unlock()
|
||||
|
||||
if m.started {
|
||||
return m.startErr
|
||||
}
|
||||
|
||||
// Wait for the piece store to startup
|
||||
ready := make(chan error)
|
||||
m.pieceStore.OnReady(func(err error) {
|
||||
@ -47,6 +58,7 @@ func (m *lotusAccessor) Start(ctx context.Context) error {
|
||||
case err := <-ready:
|
||||
// Piece store has started up, check if there was an error
|
||||
if err != nil {
|
||||
m.startErr = err
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -56,6 +68,11 @@ func (m *lotusAccessor) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||
err := m.start(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
@ -100,7 +117,12 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
func (m *lotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
|
||||
func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||
err := m.start(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
|
@ -65,7 +65,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
rpn := &mockRPN{
|
||||
sectors: mockData,
|
||||
}
|
||||
api := NewLotusMountAPI(ps, rpn)
|
||||
api := NewLotusAccessor(ps, rpn)
|
||||
|
||||
// Add deals to piece store
|
||||
for _, sectorID := range tc.deals {
|
||||
@ -94,12 +94,13 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cid1, err := cid.Parse("bafkqaaa")
|
||||
require.NoError(t, err)
|
||||
|
||||
ps := getPieceStore(t)
|
||||
rpn := &mockRPN{}
|
||||
api := NewLotusMountAPI(ps, rpn)
|
||||
api := NewLotusAccessor(ps, rpn)
|
||||
|
||||
// Add a deal with data Length 10
|
||||
dealInfo := piecestore.DealInfo{
|
||||
@ -109,7 +110,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check that the data length is correct
|
||||
len, err := api.GetUnpaddedCARSize(cid1)
|
||||
len, err := api.GetUnpaddedCARSize(ctx, cid1)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 10, len)
|
||||
}
|
||||
@ -120,14 +121,14 @@ func getPieceStore(t *testing.T) piecestore.PieceStore {
|
||||
|
||||
err = ps.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
ready := make(chan error)
|
||||
ps.OnReady(func(err error) {
|
||||
ready <- err
|
||||
})
|
||||
err = <-ready
|
||||
require.NoError(t, err)
|
||||
|
||||
//
|
||||
//ready := make(chan error)
|
||||
//ps.OnReady(func(err error) {
|
||||
// ready <- err
|
||||
//})
|
||||
//err = <-ready
|
||||
//require.NoError(t, err)
|
||||
//
|
||||
return ps
|
||||
}
|
||||
|
||||
|
@ -52,30 +52,16 @@ func (mr *MockLotusAccessorMockRecorder) FetchUnsealedPiece(ctx, pieceCid interf
|
||||
}
|
||||
|
||||
// GetUnpaddedCARSize mocks base method.
|
||||
func (m *MockLotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
|
||||
func (m *MockLotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", pieceCid)
|
||||
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", ctx, pieceCid)
|
||||
ret0, _ := ret[0].(uint64)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
|
||||
func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(pieceCid interface{}) *gomock.Call {
|
||||
func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), pieceCid)
|
||||
}
|
||||
|
||||
// Start mocks base method.
|
||||
func (m *MockLotusAccessor) Start(ctx context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Start", ctx)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Start indicates an expected call of Start.
|
||||
func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid)
|
||||
}
|
||||
|
@ -75,8 +75,8 @@ func (l *LotusMount) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) {
|
||||
size, err := l.Api.GetUnpaddedCARSize(l.PieceCid)
|
||||
func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
|
||||
size, err := l.Api.GetUnpaddedCARSize(ctx, l.PieceCid)
|
||||
if err != nil {
|
||||
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err)
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ func TestLotusMount(t *testing.T) {
|
||||
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||
|
||||
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
|
||||
require.NoError(t, err)
|
||||
@ -93,6 +93,7 @@ func TestLotusMountDeserialize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLotusMountRegistration(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
bgen := blocksutil.NewBlockGenerator()
|
||||
cid := bgen.Next().Cid()
|
||||
|
||||
@ -113,7 +114,7 @@ func TestLotusMountRegistration(t *testing.T) {
|
||||
mnt, err := registry.Instantiate(u)
|
||||
require.NoError(t, err)
|
||||
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||
stat, err := mnt.Stat(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 100, stat.Size)
|
||||
|
@ -96,20 +96,13 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ds *Wrapper) Start(ctx context.Context) error {
|
||||
func (ds *Wrapper) Start(ctx context.Context) {
|
||||
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
||||
|
||||
err := ds.mountApi.Start(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to start mount API: %w", err)
|
||||
}
|
||||
|
||||
ds.backgroundWg.Add(1)
|
||||
|
||||
// Run a go-routine to handle failures, traces and GC
|
||||
go ds.background()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *Wrapper) background() {
|
||||
|
@ -189,7 +189,7 @@ func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m mockLotusMount) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
|
||||
func (m mockLotusMount) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -591,7 +591,7 @@ func DagStoreWrapper(
|
||||
Datastore: dagStoreDS,
|
||||
GCInterval: 5 * time.Minute,
|
||||
}
|
||||
mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn)
|
||||
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
|
||||
dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
|
||||
@ -599,7 +599,8 @@ func DagStoreWrapper(
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
return dsw.Start(ctx)
|
||||
dsw.Start(ctx)
|
||||
return nil
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
return dsw.Close()
|
||||
|
Loading…
Reference in New Issue
Block a user