From dc6dbc9a11dfabdd9a03f73869044cd15b78af10 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Sat, 22 May 2021 22:40:21 +0530 Subject: [PATCH] dpr changes and test based on new unsealing PR --- cmd/lotus-storage-miner/retrieval-deals.go | 2 +- documentation/en/cli-lotus-miner.md | 4 +- extern/sector-storage/mock/mock.go | 4 + extern/sector-storage/piece_provider.go | 23 ++ extern/sector-storage/piece_provider_test.go | 29 +++ extern/sector-storage/stores/remote.go | 72 +++++ extern/sector-storage/stores/remote_test.go | 260 +++++++++++++++++-- go.mod | 2 +- go.sum | 4 +- markets/pricing/cli.go | 48 ++++ markets/retrievaladapter/provider.go | 66 +++++ markets/retrievaladapter/provider_test.go | 151 +++++++++++ node/builder.go | 20 ++ node/config/def.go | 38 +++ node/modules/dtypes/miner.go | 2 + node/modules/storageminer.go | 19 +- 16 files changed, 715 insertions(+), 29 deletions(-) create mode 100644 markets/pricing/cli.go create mode 100644 markets/retrievaladapter/provider_test.go diff --git a/cmd/lotus-storage-miner/retrieval-deals.go b/cmd/lotus-storage-miner/retrieval-deals.go index 03d397852..0411f7f13 100644 --- a/cmd/lotus-storage-miner/retrieval-deals.go +++ b/cmd/lotus-storage-miner/retrieval-deals.go @@ -235,7 +235,7 @@ var retrievalSetAskCmd = &cli.Command{ var retrievalGetAskCmd = &cli.Command{ Name: "get-ask", - Usage: "Get the provider's current retrieval ask", + Usage: "Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command", Flags: []cli.Flag{}, Action: func(cctx *cli.Context) error { ctx := lcli.DaemonContext(cctx) diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 1b9b80ee9..07dfd9090 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -745,7 +745,7 @@ COMMANDS: selection Configure acceptance criteria for retrieval deal proposals list List all active retrieval deals for this miner set-ask Configure the provider's retrieval ask - get-ask Get the provider's current retrieval ask + get-ask Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command help, h Shows a list of commands or help for one command OPTIONS: @@ -848,7 +848,7 @@ OPTIONS: ### lotus-miner retrieval-deals get-ask ``` NAME: - lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask + lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command USAGE: lotus-miner retrieval-deals get-ask [command options] [arguments...] diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index d3e76e881..9bbfbb099 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -381,6 +381,10 @@ func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, o return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil } +func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { + return false, nil +} + func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) { psize, err := spt.SectorSize() if err != nil { diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index 553dcb952..d73fd26ea 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -24,8 +24,11 @@ type Unsealer interface { type PieceProvider interface { // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) + IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) } +var _ PieceProvider = &pieceProvider{} + type pieceProvider struct { storage *stores.Remote index stores.SectorIndex @@ -40,6 +43,26 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse } } +// IsUnsealed checks if we have the unsealed piece at the given offset in an already +// existing unsealed file either locally or on any of the workers. +func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { + if err := offset.Valid(); err != nil { + return false, xerrors.Errorf("offset is not valid: %w", err) + } + if err := size.Validate(); err != nil { + return false, xerrors.Errorf("size is not a valid piece size: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { + cancel() + return false, xerrors.Errorf("acquiring read sector lock: %w", err) + } + defer cancel() + + return p.storage.CheckIsUnsealed(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded()) +} + // tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it. // It will NOT try to schedule an Unseal of a sealed sector file for the read. // diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index 6a58ad945..173527bbf 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -53,6 +53,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // pre-commit 1 preCommit1 := ppt.preCommit1(t) + // check if IsUnsealed -> true + require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size)) // read piece ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, false, pieceData) @@ -60,6 +62,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // pre-commit 2 ppt.preCommit2(t, preCommit1) + // check if IsUnsealed -> true + require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size)) // read piece ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, false, pieceData) @@ -67,10 +71,14 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // finalize -> nil here will remove unsealed file ppt.finalizeSector(t, nil) + // check if IsUnsealed -> false + require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size)) // Read the piece -> will have to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, true, pieceData) + // check if IsUnsealed -> true + require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size)) // read the piece -> will not have to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, false, pieceData) @@ -118,12 +126,18 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // pre-commit 1 pC1 := ppt.preCommit1(t) + + // check if IsUnsealed -> true + require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size)) // Read the piece -> no need to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1) // pre-commit 2 ppt.preCommit2(t, pC1) + + // check if IsUnsealed -> true + require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size)) // Read the piece -> no need to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1) @@ -133,6 +147,8 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // sending nil here will remove all unsealed files after sector is finalized. ppt.finalizeSector(t, nil) + // check if IsUnsealed -> false + require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size)) // Read the piece -> have to unseal since we removed the file. ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, true, pd1) @@ -142,14 +158,21 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // remove the unsealed file and read again -> will have to unseal. ppt.removeAllUnsealedSectorFiles(t) + // check if IsUnsealed -> false + require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size)) ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, true, pd1) + // check if IsUnsealed -> true + require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size)) // Read Piece 2 -> no unsealing as it got unsealed above. ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2) // remove all unseal files -> Read Piece 2 -> will have to Unseal. ppt.removeAllUnsealedSectorFiles(t) + + // check if IsUnsealed -> false + require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size)) ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2) } @@ -306,6 +329,12 @@ func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreC p.commD = commD } +func (p *pieceProviderTestHarness) isUnealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool { + b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size) + require.NoError(t, err) + return b +} + func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, expectedHadToUnseal bool, expectedBytes []byte) { rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD) diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 18e20ee37..744d16581 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -484,6 +484,78 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa return resp.Body, nil } +// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece +// either locally or on any of the workers. +// Returns true if we have the unsealed piece, false otherwise. +func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (bool, error) { + ft := storiface.FTUnsealed + + paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + if err != nil { + return false, xerrors.Errorf("acquire local: %w", err) + } + + path := storiface.PathByType(paths, ft) + if path != "" { + // if we have the unsealed file locally, check if it has the unsealed piece. + log.Infof("Read local %s (+%d,%d)", path, offset, size) + ssize, err := s.ProofType.SectorSize() + if err != nil { + return false, err + } + + // open the unsealed sector file for the given sector size located at the given path. + pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) + if err != nil { + return false, xerrors.Errorf("opening partial file: %w", err) + } + log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size) + + // even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece + // in the unsealed sector file. That is what `HasAllocated` checks for. + has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded()) + if err != nil { + return false, xerrors.Errorf("has allocated: %w", err) + } + + if err := r.pfHandler.Close(pf); err != nil { + return false, xerrors.Errorf("failed to close partial file: %s", err) + } + log.Debugf("checked if local partial file has the piece %s (+%d,%d), returning answer=%t", path, offset, size, has) + return has, nil + } + + si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) + if err != nil { + return false, xerrors.Errorf("StorageFindSector: %s", err) + } + + if len(si) == 0 { + return false, nil + } + + sort.Slice(si, func(i, j int) bool { + return si[i].Weight < si[j].Weight + }) + + for _, info := range si { + for _, url := range info.URLs { + ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size) + if err != nil { + log.Warnw("check if remote has piece", "url", url, "error", err) + continue + } + if !ok { + continue + } + + return true, nil + } + } + + return false, nil +} + // Reader returns a reader for an unsealed piece at the given offset in the given sector. // If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy. // If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file diff --git a/extern/sector-storage/stores/remote_test.go b/extern/sector-storage/stores/remote_test.go index eb06a713d..6639e1504 100644 --- a/extern/sector-storage/stores/remote_test.go +++ b/extern/sector-storage/stores/remote_test.go @@ -283,8 +283,6 @@ func TestReader(t *testing.T) { expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType), getAllocatedReturnCode: tc.getAllocatedReturnCode, - getSectorReturnCode: tc.getSectorReturnCode, - getSectorBytes: tc.expectedSectorBytes, }) defer ts.Close() tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID)) @@ -326,6 +324,244 @@ func TestReader(t *testing.T) { } } +func TestCheckIsUnsealed(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + pfPath := "path" + ft := storiface.FTUnsealed + emptyPartialFile := &partialfile.PartialFile{} + + sectorRef := storage.SectorRef{ + ID: abi.SectorID{ + Miner: 123, + Number: 123, + }, + ProofType: 1, + } + sectorSize := abi.SealProofInfos[1].SectorSize + + offset := abi.PaddedPieceSize(100) + size := abi.PaddedPieceSize(1000) + ctx := context.Background() + + tcs := map[string]struct { + storeFnc func(s *mocks.MockStore) + pfFunc func(s *mocks.MockpartialFileHandler) + indexFnc func(s *mocks.MockSectorIndex, serverURL string) + + needHttpServer bool + + getAllocatedReturnCode int + + serverUrl string + + // expectation + errStr string + expectedIsUnealed bool + }{ + + // -------- have the unsealed file locally + "fails when error while acquiring unsealed file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error")) + }, + + errStr: "acquire error", + }, + + "fails when error while opening local partial (unsealed) file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error")) + }, + errStr: "pf open error", + }, + + "fails when error while checking if local unsealed file has piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + true, xerrors.New("piece check error")) + }, + + errStr: "piece check error", + }, + + "fails when error while closing local unsealed file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + + mockCheckAllocation(pf, offset, size, emptyPartialFile, + false, nil) + + pf.EXPECT().Close(emptyPartialFile).Return(xerrors.New("close error")).Times(1) + }, + errStr: "close error", + }, + + // ------------------- don't have the unsealed file locally + + "fails when error while finding sector": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, _ string) { + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return(nil, xerrors.New("find sector error")) + }, + errStr: "find sector error", + }, + + "false when no worker has unsealed file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, _ string) { + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return(nil, nil) + }, + }, + + // false when local unsealed file does NOT have unsealed piece + "false when local unsealed file does not have the piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + false, nil) + + pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + }, + }, + + "false when none of the worker has the unsealed piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 500, + }, + + // ---- Success for local unsealed file + "true when local unsealed file has the piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + true, nil) + pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + + }, + + expectedIsUnealed: true, + }, + + // --- Success for remote unsealed file + "successfully fetches reader for piece from remote unsealed piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 200, + expectedIsUnealed: true, + }, + } + + for name, tc := range tcs { + tc := tc + t.Run(name, func(t *testing.T) { + // create go mock controller here + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + // create them mocks + lstore := mocks.NewMockStore(mockCtrl) + pfhandler := mocks.NewMockpartialFileHandler(mockCtrl) + index := mocks.NewMockSectorIndex(mockCtrl) + + if tc.storeFnc != nil { + tc.storeFnc(lstore) + } + if tc.pfFunc != nil { + tc.pfFunc(pfhandler) + } + + if tc.needHttpServer { + // run http server + ts := httptest.NewServer(&mockHttpServer{ + expectedSectorName: storiface.SectorName(sectorRef.ID), + expectedFileType: ft.String(), + expectedOffset: fmt.Sprintf("%d", offset.Unpadded()), + expectedSize: fmt.Sprintf("%d", size.Unpadded()), + expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType), + + getAllocatedReturnCode: tc.getAllocatedReturnCode, + }) + defer ts.Close() + tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID)) + } + if tc.indexFnc != nil { + tc.indexFnc(index, tc.serverUrl) + } + + remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler) + + isUnsealed, err := remoteStore.CheckIsUnsealed(ctx, sectorRef, offset, size) + + if tc.errStr != "" { + require.Error(t, err) + require.False(t, isUnsealed) + require.Contains(t, err.Error(), tc.errStr) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.expectedIsUnealed, isUnsealed) + + }) + } +} + func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) { l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -358,13 +594,10 @@ type mockHttpServer struct { expectedSectorType string getAllocatedReturnCode int - getSectorReturnCode int - getSectorBytes []byte } func (m *mockHttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { mux := mux.NewRouter() - mux.HandleFunc("/remote/{type}/{id}", m.getSector).Methods("GET") mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", m.getAllocated).Methods("GET") mux.ServeHTTP(w, r) } @@ -399,20 +632,3 @@ func (m *mockHttpServer) getAllocated(w http.ResponseWriter, r *http.Request) { w.WriteHeader(m.getAllocatedReturnCode) } - -func (m *mockHttpServer) getSector(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - - if vars["id"] != m.expectedSectorName { - w.WriteHeader(http.StatusBadRequest) - return - } - - if vars["type"] != m.expectedFileType { - w.WriteHeader(http.StatusBadRequest) - return - } - - w.WriteHeader(m.getSectorReturnCode) - _, _ = w.Write(m.getSectorBytes) -} diff --git a/go.mod b/go.mod index 0c2ee70b3..296f3314b 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.5.0 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.3.0 + github.com/filecoin-project/go-fil-markets v1.2.6-0.20210522045113-7d33a6e5f793 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 diff --git a/go.sum b/go.sum index 38a180e01..a04d01b91 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.3.0 h1:yYWHO5x87i+5UlqBwlMVk4oN2GPNfQ0WG6LdORArL/o= -github.com/filecoin-project/go-fil-markets v1.3.0/go.mod h1:v8QjFAGf5h2wKH3saYjGOu3pOFUoVQ1Uhow4gIcUR3I= +github.com/filecoin-project/go-fil-markets v1.2.6-0.20210522045113-7d33a6e5f793 h1:t2u3m3cQM4MFxtQ2EZQkPGtUNUW/NAADbtmTAL44WSw= +github.com/filecoin-project/go-fil-markets v1.2.6-0.20210522045113-7d33a6e5f793/go.mod h1:v8QjFAGf5h2wKH3saYjGOu3pOFUoVQ1Uhow4gIcUR3I= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/pricing/cli.go b/markets/pricing/cli.go new file mode 100644 index 000000000..3c2a5f248 --- /dev/null +++ b/markets/pricing/cli.go @@ -0,0 +1,48 @@ +package pricing + +import ( + "bytes" + "context" + "encoding/json" + "os/exec" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "golang.org/x/xerrors" +) + +func ExternalRetrievalPricingFunc(cmd string) dtypes.RetrievalPricingFunc { + return func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + return runPricingFunc(ctx, cmd, pricingInput) + } +} + +func runPricingFunc(_ context.Context, cmd string, params interface{}) (retrievalmarket.Ask, error) { + j, err := json.Marshal(params) + if err != nil { + return retrievalmarket.Ask{}, err + } + + var out bytes.Buffer + var errb bytes.Buffer + + c := exec.Command("sh", "-c", cmd) + c.Stdin = bytes.NewReader(j) + c.Stdout = &out + c.Stderr = &errb + + switch err := c.Run().(type) { + case nil: + bz := out.Bytes() + resp := retrievalmarket.Ask{} + + if err := json.Unmarshal(bz, &resp); err != nil { + return resp, xerrors.Errorf("failed to parse pricing output %s, err=%w", string(bz), err) + } + return resp, nil + case *exec.ExitError: + return retrievalmarket.Ask{}, xerrors.Errorf("pricing func exited with error: %s", errb.String()) + default: + return retrievalmarket.Ask{}, xerrors.Errorf("pricing func cmd run error: %w", err) + } +} diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index c13a0b03d..748425407 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -99,3 +99,69 @@ func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipS return head.Key().Bytes(), head.Height(), nil } + +func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { + si, err := rpn.miner.GetSectorInfo(sectorID) + if err != nil { + return false, xerrors.Errorf("failed to get sectorinfo, err=%s", err) + } + + mid, err := address.IDFromAddress(rpn.miner.Address()) + if err != nil { + return false, err + } + + ref := specstorage.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(mid), + Number: sectorID, + }, + ProofType: si.SectorType, + } + + log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length) + return rpn.pp.IsUnsealed(ctx, ref, storiface.UnpaddedByteIndex(offset), length) +} + +// `storageDeals` param here is the list of storage deals made for the `payloadCID` the retrieval client is looking for. +// +// `pieceCID` is the CID of the specific Piece we want to retrieve the payload from. The client can either mandate that +// we retrieve the payload from a specific piece or we choose a Piece to retrieve the payload from, prioritizing +// a Piece for which an unsealed sector file already exists if possible. +// +// 1. For the `VerifiedDeal` flag in the response `PricingInput`, we are looking to answer the question "does there exist any verified storage deal for this `payloadCID`" ? +// +// 2. We also want to ensure that we return the `PieceSize` for the actual piece we want to retrieve the deal from. +func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) { + resp := retrievalmarket.PricingInput{} + + head, err := rpn.full.ChainHead(ctx) + if err != nil { + return resp, xerrors.Errorf("failed to get chain head: %w", err) + } + tsk := head.Key() + + for _, dealID := range storageDeals { + ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk) + if err != nil { + return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err) + } + if ds.Proposal.VerifiedDeal { + resp.VerifiedDeal = true + } + + if ds.Proposal.PieceCID.Equals(pieceCID) { + resp.PieceSize = ds.Proposal.PieceSize.Unpadded() + } + + if resp.VerifiedDeal && resp.PieceSize != 0 { + break + } + } + + if resp.PieceSize == 0 { + return resp, xerrors.New("failed to find matching piece, PieceSize is zero") + } + + return resp, nil +} diff --git a/markets/retrievaladapter/provider_test.go b/markets/retrievaladapter/provider_test.go new file mode 100644 index 000000000..5cdf5d060 --- /dev/null +++ b/markets/retrievaladapter/provider_test.go @@ -0,0 +1,151 @@ +package retrievaladapter + +import ( + "context" + "testing" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + testnet "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/mocks" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/types" + "github.com/golang/mock/gomock" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" +) + +func TestGetPricingInput(t *testing.T) { + ctx := context.Background() + tsk := &types.TipSet{} + key := tsk.Key() + + pcid := testnet.GenerateCids(1)[0] + deals := []abi.DealID{1, 2} + paddedSize := abi.PaddedPieceSize(128) + unpaddedSize := paddedSize.Unpadded() + + tcs := map[string]struct { + pieceCid cid.Cid + deals []abi.DealID + fFnc func(node *mocks.MockFullNode) + + expectedErrorStr string + expectedVerified bool + expectedPieceSize abi.UnpaddedPieceSize + }{ + "error when fails to fetch chain head": { + fFnc: func(n *mocks.MockFullNode) { + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, xerrors.New("chain head error")).Times(1) + }, + expectedErrorStr: "chain head error", + }, + + "error when no piece matches": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedErrorStr: "failed to find matching piece", + }, + + "verified is true even if one deal is verified and we get the correct piecesize": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: pcid, + PieceSize: paddedSize, + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + VerifiedDeal: true, + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedPieceSize: unpaddedSize, + expectedVerified: true, + }, + + "verified is false if both deals are unverified and we get the correct piece size": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: pcid, + PieceSize: paddedSize, + VerifiedDeal: false, + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + VerifiedDeal: false, + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedPieceSize: unpaddedSize, + expectedVerified: false, + }, + } + + for name, tc := range tcs { + tc := tc + t.Run(name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + mockFull := mocks.NewMockFullNode(mockCtrl) + rpn := &retrievalProviderNode{ + full: mockFull, + } + if tc.fFnc != nil { + tc.fFnc(mockFull) + } + + resp, err := rpn.GetRetrievalPricingInput(ctx, pcid, deals) + + if tc.expectedErrorStr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErrorStr) + require.Equal(t, retrievalmarket.PricingInput{}, resp) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedPieceSize, resp.PieceSize) + require.Equal(t, tc.expectedVerified, resp.VerifiedDeal) + } + }) + } +} diff --git a/node/builder.go b/node/builder.go index 588ca742d..3ba0c54e9 100644 --- a/node/builder.go +++ b/node/builder.go @@ -407,9 +407,16 @@ var MinerNode = Options( Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), // Markets (retrieval) + Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(config.DealmakingConfig{ + RetrievalPricing: &config.RetrievalPricing{ + Strategy: config.DefaultRetrievalPricing, + Default: &config.RetrievalPricingDefault{}, + }, + })), Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), + Override(HandleRetrievalKey, modules.HandleRetrieval), // Markets (storage) @@ -563,6 +570,17 @@ func ConfigStorageMiner(c interface{}) Option { return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) } + pricingConfig := cfg.Dealmaking.RetrievalPricing + if pricingConfig.Strategy == config.ExternalRetrievalPricing { + if pricingConfig.External == nil { + return Error(xerrors.New("retrieval pricing policy has been to set to external but external policy config is nil")) + } + + if pricingConfig.External.Path == "" { + return Error(xerrors.New("retrieval pricing policy has been to set to external but external script path is empty")) + } + } + return Options( ConfigCommon(&cfg.Common), @@ -574,6 +592,8 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), + Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, diff --git a/node/config/def.go b/node/config/def.go index b4cf5e2fa..00954039d 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -10,6 +10,14 @@ import ( sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" ) +const ( + // DefaultRetrievalPricing configures the node to use the default retrieval pricing policy. + DefaultRetrievalPricing = "default" + // ExternalRetrievalPricing configures the node to use the external retrieval pricing script + // configured by the user. + ExternalRetrievalPricing = "external" +) + // Common is common config between full node and miner type Common struct { API API @@ -66,6 +74,29 @@ type DealmakingConfig struct { Filter string RetrievalFilter string + + RetrievalPricing *RetrievalPricing +} + +type RetrievalPricing struct { + Strategy string // possible values: "default", "external" + + Default *RetrievalPricingDefault + External *RetrievalPricingExternal +} + +type RetrievalPricingExternal struct { + // Path of the external script that will be run to price a retrieval deal. + // This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external". + Path string +} + +type RetrievalPricingDefault struct { + // VerifiedDealsFreeTransfer configures zero fees for data transfer for a retrieval deal + // of a payloadCid that belongs to a verified storage deal. + // This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "default". + // default value is true + VerifiedDealsFreeTransfer bool } type SealingConfig struct { @@ -264,6 +295,13 @@ func DefaultStorageMiner() *StorageMiner { PublishMsgPeriod: Duration(time.Hour), MaxDealsPerPublishMsg: 8, MaxProviderCollateralMultiplier: 2, + + RetrievalPricing: &RetrievalPricing{ + Strategy: DefaultRetrievalPricing, + Default: &RetrievalPricingDefault{ + VerifiedDealsFreeTransfer: true, + }, + }, }, Fees: MinerFeeConfig{ diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index 16af48add..39edd189b 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -90,3 +90,5 @@ type GetExpectedSealDurationFunc func() (time.Duration, error) type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) + +type RetrievalPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index dddadd99f..77966cee0 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -10,6 +10,7 @@ import ( "path/filepath" "time" + "github.com/filecoin-project/lotus/markets/pricing" "go.uber.org/fx" "go.uber.org/multierr" "golang.org/x/xerrors" @@ -632,6 +633,20 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt } } +// RetrievalPricingFunc configures the pricing function to use for retrieval deals. +func RetrievalPricingFunc(cfg config.DealmakingConfig) func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + _ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc { + + return func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + _ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc { + if cfg.RetrievalPricing.Strategy == config.ExternalRetrievalPricing { + return pricing.ExternalRetrievalPricingFunc(cfg.RetrievalPricing.External.Path) + } + + return retrievalimpl.DefaultPricingFunc(cfg.RetrievalPricing.Default.VerifiedDealsFreeTransfer) + } +} + // RetrievalProvider creates a new retrieval provider attached to the provider blockstore func RetrievalProvider(h host.Host, miner *storage.Miner, @@ -641,6 +656,7 @@ func RetrievalProvider(h host.Host, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, pieceProvider sectorstorage.PieceProvider, + pricingFnc dtypes.RetrievalPricingFunc, userFilter dtypes.RetrievalDealFilter, ) (retrievalmarket.RetrievalProvider, error) { adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full) @@ -653,7 +669,8 @@ func RetrievalProvider(h host.Host, netwk := rmnet.NewFromLibp2pHost(h) opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) - return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) + return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), + retrievalimpl.RetrievalPricingFunc(pricingFnc), opt) } var WorkerCallsPrefix = datastore.NewKey("/worker/calls")