diff --git a/cmd/lotus-storage-miner/allinfo_test.go b/cmd/lotus-storage-miner/allinfo_test.go index cbe65524e..9f2dc626e 100644 --- a/cmd/lotus-storage-miner/allinfo_test.go +++ b/cmd/lotus-storage-miner/allinfo_test.go @@ -61,7 +61,14 @@ func TestMinerAllInfo(t *testing.T) { t.Run("pre-info-all", run) dh := kit.NewDealHarness(t, client, miner) - dh.MakeFullDeal(context.Background(), 6, false, false, 0) + _, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: context.Background(), + Rseed: 6, + CarExport: false, + FastRet: false, + StartEpoch: 0, + DoRetrieval: true, + }) t.Run("post-info-all", run) } diff --git a/cmd/lotus-storage-miner/retrieval-deals.go b/cmd/lotus-storage-miner/retrieval-deals.go index 03d397852..0411f7f13 100644 --- a/cmd/lotus-storage-miner/retrieval-deals.go +++ b/cmd/lotus-storage-miner/retrieval-deals.go @@ -235,7 +235,7 @@ var retrievalSetAskCmd = &cli.Command{ var retrievalGetAskCmd = &cli.Command{ Name: "get-ask", - Usage: "Get the provider's current retrieval ask", + Usage: "Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command", Flags: []cli.Flag{}, Action: func(cctx *cli.Context) error { ctx := lcli.DaemonContext(cctx) diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index b4b245514..c8abb574b 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -745,7 +745,7 @@ COMMANDS: selection Configure acceptance criteria for retrieval deal proposals list List all active retrieval deals for this miner set-ask Configure the provider's retrieval ask - get-ask Get the provider's current retrieval ask + get-ask Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command help, h Shows a list of commands or help for one command OPTIONS: @@ -848,7 +848,7 @@ OPTIONS: ### lotus-miner retrieval-deals get-ask ``` NAME: - lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask + lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command USAGE: lotus-miner retrieval-deals get-ask [command options] [arguments...] diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index 1d8a317f1..40db3999f 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -119,6 +119,10 @@ func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) { return id, nil } +func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { + return false, nil +} + func (mgr *SectorMgr) ForceState(sid storage.SectorRef, st int) error { mgr.lk.Lock() ss, ok := mgr.sectors[sid.ID] diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index 553dcb952..ad3a2543e 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -24,8 +24,11 @@ type Unsealer interface { type PieceProvider interface { // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) + IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) } +var _ PieceProvider = &pieceProvider{} + type pieceProvider struct { storage *stores.Remote index stores.SectorIndex @@ -40,6 +43,26 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse } } +// IsUnsealed checks if we have the unsealed piece at the given offset in an already +// existing unsealed file either locally or on any of the workers. +func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { + if err := offset.Valid(); err != nil { + return false, xerrors.Errorf("offset is not valid: %w", err) + } + if err := size.Validate(); err != nil { + return false, xerrors.Errorf("size is not a valid piece size: %w", err) + } + + ctxLock, cancel := context.WithCancel(ctx) + defer cancel() + + if err := p.index.StorageLock(ctxLock, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { + return false, xerrors.Errorf("acquiring read sector lock: %w", err) + } + + return p.storage.CheckIsUnsealed(ctxLock, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded()) +} + // tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it. // It will NOT try to schedule an Unseal of a sealed sector file for the read. // diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index 6a58ad945..d6fa14574 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -53,6 +53,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // pre-commit 1 preCommit1 := ppt.preCommit1(t) + // check if IsUnsealed -> true + require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size)) // read piece ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, false, pieceData) @@ -60,6 +62,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // pre-commit 2 ppt.preCommit2(t, preCommit1) + // check if IsUnsealed -> true + require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size)) // read piece ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, false, pieceData) @@ -67,10 +71,14 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { // finalize -> nil here will remove unsealed file ppt.finalizeSector(t, nil) + // check if IsUnsealed -> false + require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size)) // Read the piece -> will have to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, true, pieceData) + // check if IsUnsealed -> true + require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size)) // read the piece -> will not have to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, false, pieceData) @@ -118,12 +126,18 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // pre-commit 1 pC1 := ppt.preCommit1(t) + + // check if IsUnsealed -> true + require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size)) // Read the piece -> no need to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1) // pre-commit 2 ppt.preCommit2(t, pC1) + + // check if IsUnsealed -> true + require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size)) // Read the piece -> no need to unseal ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1) @@ -133,6 +147,8 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // sending nil here will remove all unsealed files after sector is finalized. ppt.finalizeSector(t, nil) + // check if IsUnsealed -> false + require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size)) // Read the piece -> have to unseal since we removed the file. ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, true, pd1) @@ -142,14 +158,21 @@ func TestReadPieceRemoteWorkers(t *testing.T) { // remove the unsealed file and read again -> will have to unseal. ppt.removeAllUnsealedSectorFiles(t) + // check if IsUnsealed -> false + require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size)) ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, true, pd1) + // check if IsUnsealed -> true + require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size)) // Read Piece 2 -> no unsealing as it got unsealed above. ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2) // remove all unseal files -> Read Piece 2 -> will have to Unseal. ppt.removeAllUnsealedSectorFiles(t) + + // check if IsUnsealed -> false + require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size)) ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2) } @@ -306,6 +329,12 @@ func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreC p.commD = commD } +func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool { + b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size) + require.NoError(t, err) + return b +} + func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, expectedHadToUnseal bool, expectedBytes []byte) { rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD) diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 18e20ee37..1e1a54d47 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -484,6 +484,87 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa return resp.Body, nil } +// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece +// either locally or on any of the workers. +// Returns true if we have the unsealed piece, false otherwise. +func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (bool, error) { + ft := storiface.FTUnsealed + + paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + if err != nil { + return false, xerrors.Errorf("acquire local: %w", err) + } + + path := storiface.PathByType(paths, ft) + if path != "" { + // if we have the unsealed file locally, check if it has the unsealed piece. + log.Infof("Read local %s (+%d,%d)", path, offset, size) + ssize, err := s.ProofType.SectorSize() + if err != nil { + return false, err + } + + // open the unsealed sector file for the given sector size located at the given path. + pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) + if err != nil { + return false, xerrors.Errorf("opening partial file: %w", err) + } + log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size) + + // even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece + // in the unsealed sector file. That is what `HasAllocated` checks for. + has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded()) + if err != nil { + return false, xerrors.Errorf("has allocated: %w", err) + } + + // close the local unsealed file. + if err := r.pfHandler.Close(pf); err != nil { + return false, xerrors.Errorf("failed to close partial file: %s", err) + } + log.Debugf("checked if local partial file has the piece %s (+%d,%d), returning answer=%t", path, offset, size, has) + + // Sector files can technically not have a piece unsealed locally, but have it unsealed in remote storage, so we probably + // want to return only if has is true + if has { + return has, nil + } + } + + // --- We don't have the unsealed piece in an unsealed sector file locally + // Check if we have it in a remote cluster. + + si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) + if err != nil { + return false, xerrors.Errorf("StorageFindSector: %s", err) + } + + if len(si) == 0 { + return false, nil + } + + sort.Slice(si, func(i, j int) bool { + return si[i].Weight < si[j].Weight + }) + + for _, info := range si { + for _, url := range info.URLs { + ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size) + if err != nil { + log.Warnw("check if remote has piece", "url", url, "error", err) + continue + } + if !ok { + continue + } + + return true, nil + } + } + + return false, nil +} + // Reader returns a reader for an unsealed piece at the given offset in the given sector. // If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy. // If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file @@ -507,7 +588,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a if path != "" { // if we have the unsealed file locally, return a reader that can be used to read the contents of the // unsealed piece. - log.Infof("Read local %s (+%d,%d)", path, offset, size) + log.Debugf("Check local %s (+%d,%d)", path, offset, size) ssize, err := s.ProofType.SectorSize() if err != nil { return nil, err @@ -529,19 +610,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a } log.Debugf("check if partial file is allocated %s (+%d,%d)", path, offset, size) - if !has { - log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size) - if err := r.pfHandler.Close(pf); err != nil { - return nil, xerrors.Errorf("close partial file: %w", err) - } - return nil, nil + if has { + log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) + return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size) } - log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) - return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size) + log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size) + if err := r.pfHandler.Close(pf); err != nil { + return nil, xerrors.Errorf("close partial file: %w", err) + } } - // --- We don't have the unsealed sector file locally + // --- We don't have the unsealed piece in an unsealed sector file locally // if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index // to determine which workers have the unsealed file and then query those workers to know diff --git a/extern/sector-storage/stores/remote_test.go b/extern/sector-storage/stores/remote_test.go index eb06a713d..b708bb68f 100644 --- a/extern/sector-storage/stores/remote_test.go +++ b/extern/sector-storage/stores/remote_test.go @@ -27,8 +27,10 @@ func TestReader(t *testing.T) { bz := []byte("Hello World") pfPath := "path" - ft := storiface.FTUnsealed emptyPartialFile := &partialfile.PartialFile{} + sectorSize := abi.SealProofInfos[1].SectorSize + + ft := storiface.FTUnsealed sectorRef := storage.SectorRef{ ID: abi.SectorID{ @@ -37,7 +39,6 @@ func TestReader(t *testing.T) { }, ProofType: 1, } - sectorSize := abi.SealProofInfos[1].SectorSize offset := abi.PaddedPieceSize(100) size := abi.PaddedPieceSize(1000) @@ -151,7 +152,7 @@ func TestReader(t *testing.T) { }, // --- nil reader when local unsealed file does NOT have unsealed piece - "nil reader when local unsealed file does not have the piece": { + "nil reader when local unsealed file does not have the unsealed piece and remote sector also dosen't have the unsealed piece": { storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, pfPath, nil) }, @@ -162,7 +163,20 @@ func TestReader(t *testing.T) { false, nil) pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 500, }, // ---- nil reader when none of the remote unsealed file has unsealed piece @@ -231,6 +245,37 @@ func TestReader(t *testing.T) { }, // --- Success for remote unsealed file + // --- Success for remote unsealed file + "successfully fetches reader from remote unsealed piece when local unsealed file does NOT have the unsealed Piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + false, nil) + + pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getSectorReturnCode: 200, + getAllocatedReturnCode: 200, + expectedSectorBytes: bz, + expectedNonNilReader: true, + }, + "successfully fetches reader for piece from remote unsealed piece": { storeFnc: func(l *mocks.MockStore) { mockSectorAcquire(l, sectorRef, "", nil) @@ -326,6 +371,283 @@ func TestReader(t *testing.T) { } } +func TestCheckIsUnsealed(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + pfPath := "path" + ft := storiface.FTUnsealed + emptyPartialFile := &partialfile.PartialFile{} + + sectorRef := storage.SectorRef{ + ID: abi.SectorID{ + Miner: 123, + Number: 123, + }, + ProofType: 1, + } + sectorSize := abi.SealProofInfos[1].SectorSize + + offset := abi.PaddedPieceSize(100) + size := abi.PaddedPieceSize(1000) + ctx := context.Background() + + tcs := map[string]struct { + storeFnc func(s *mocks.MockStore) + pfFunc func(s *mocks.MockpartialFileHandler) + indexFnc func(s *mocks.MockSectorIndex, serverURL string) + + needHttpServer bool + + getAllocatedReturnCode int + + serverUrl string + + // expectation + errStr string + expectedIsUnealed bool + }{ + + // -------- have the unsealed file locally + "fails when error while acquiring unsealed file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error")) + }, + + errStr: "acquire error", + }, + + "fails when error while opening local partial (unsealed) file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error")) + }, + errStr: "pf open error", + }, + + "fails when error while checking if local unsealed file has piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + true, xerrors.New("piece check error")) + }, + + errStr: "piece check error", + }, + + "fails when error while closing local unsealed file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + + mockCheckAllocation(pf, offset, size, emptyPartialFile, + false, nil) + + pf.EXPECT().Close(emptyPartialFile).Return(xerrors.New("close error")).Times(1) + }, + errStr: "close error", + }, + + // ------------------- don't have the unsealed file locally + + "fails when error while finding sector": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, _ string) { + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return(nil, xerrors.New("find sector error")) + }, + errStr: "find sector error", + }, + + "false when no worker has unsealed file": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, _ string) { + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return(nil, nil) + }, + }, + + // false when local unsealed file does NOT have unsealed piece + "false when local unsealed file does not have the piece and remote sector too dosen't have the piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + false, nil) + + pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 500, + }, + + "false when none of the worker has the unsealed piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 500, + }, + + // ---- Success for local unsealed file + "true when local unsealed file has the piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + true, nil) + pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + + }, + + expectedIsUnealed: true, + }, + + // --- Success for remote unsealed file + "true if we have a remote unsealed piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, "", nil) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 200, + expectedIsUnealed: true, + }, + + "true when local unsealed file does NOT have the unsealed Piece but remote sector has the unsealed piece": { + storeFnc: func(l *mocks.MockStore) { + mockSectorAcquire(l, sectorRef, pfPath, nil) + }, + + pfFunc: func(pf *mocks.MockpartialFileHandler) { + mockPartialFileOpen(pf, sectorSize, pfPath, nil) + mockCheckAllocation(pf, offset, size, emptyPartialFile, + false, nil) + + pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1) + }, + + indexFnc: func(in *mocks.MockSectorIndex, url string) { + si := stores.SectorStorageInfo{ + URLs: []string{url}, + } + + in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(), + false).Return([]stores.SectorStorageInfo{si}, nil).Times(1) + }, + + needHttpServer: true, + getAllocatedReturnCode: 200, + expectedIsUnealed: true, + }, + } + + for name, tc := range tcs { + tc := tc + t.Run(name, func(t *testing.T) { + // create go mock controller here + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + // create them mocks + lstore := mocks.NewMockStore(mockCtrl) + pfhandler := mocks.NewMockpartialFileHandler(mockCtrl) + index := mocks.NewMockSectorIndex(mockCtrl) + + if tc.storeFnc != nil { + tc.storeFnc(lstore) + } + if tc.pfFunc != nil { + tc.pfFunc(pfhandler) + } + + if tc.needHttpServer { + // run http server + ts := httptest.NewServer(&mockHttpServer{ + expectedSectorName: storiface.SectorName(sectorRef.ID), + expectedFileType: ft.String(), + expectedOffset: fmt.Sprintf("%d", offset.Unpadded()), + expectedSize: fmt.Sprintf("%d", size.Unpadded()), + expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType), + + getAllocatedReturnCode: tc.getAllocatedReturnCode, + }) + defer ts.Close() + tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID)) + } + if tc.indexFnc != nil { + tc.indexFnc(index, tc.serverUrl) + } + + remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler) + + isUnsealed, err := remoteStore.CheckIsUnsealed(ctx, sectorRef, offset, size) + + if tc.errStr != "" { + require.Error(t, err) + require.False(t, isUnsealed) + require.Contains(t, err.Error(), tc.errStr) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.expectedIsUnealed, isUnsealed) + + }) + } +} + func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) { l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ @@ -358,8 +680,9 @@ type mockHttpServer struct { expectedSectorType string getAllocatedReturnCode int - getSectorReturnCode int - getSectorBytes []byte + + getSectorReturnCode int + getSectorBytes []byte } func (m *mockHttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/go.mod b/go.mod index 137678d81..411522a36 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.6.0 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.4.0 + github.com/filecoin-project/go-fil-markets v1.5.0 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 diff --git a/go.sum b/go.sum index c883dd9bb..5573587fd 100644 --- a/go.sum +++ b/go.sum @@ -283,8 +283,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.4.0 h1:J4L6o+FVOmS7ZWV6wxLPiuoDzGC7iS3S5NRFL1enEr0= -github.com/filecoin-project/go-fil-markets v1.4.0/go.mod h1:7be6zzFwaN8kxVeYZf/UUj/JilHC0ogPvWqE1TW8Ptk= +github.com/filecoin-project/go-fil-markets v1.5.0 h1:3KEs01L8XFCEgujZ6ggFjr1XWjpjTQcmSSeo3I99I0k= +github.com/filecoin-project/go-fil-markets v1.5.0/go.mod h1:7be6zzFwaN8kxVeYZf/UUj/JilHC0ogPvWqE1TW8Ptk= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/itests/ccupgrade_test.go b/itests/ccupgrade_test.go index 28abac171..196cf7106 100644 --- a/itests/ccupgrade_test.go +++ b/itests/ccupgrade_test.go @@ -94,7 +94,14 @@ func runTestCCUpgrade(t *testing.T, b kit.APIBuilder, blocktime time.Duration, u dh := kit.NewDealHarness(t, client, miner) - dh.MakeFullDeal(context.Background(), 6, false, false, 0) + _, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: context.Background(), + Rseed: 6, + CarExport: false, + FastRet: false, + StartEpoch: 0, + DoRetrieval: true, + }) // Validate upgrade diff --git a/itests/deals_test.go b/itests/deals_test.go index 6db335afb..e5909cb80 100644 --- a/itests/deals_test.go +++ b/itests/deals_test.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/miner" @@ -80,6 +81,101 @@ func TestAPIDealFlowReal(t *testing.T) { t.Run("retrieval-second", func(t *testing.T) { runSecondDealRetrievalTest(t, kit.Builder, time.Second) }) + + t.Run("quote-price-for-non-unsealed-retrieval", func(t *testing.T) { + runQuotePriceForUnsealedRetrieval(t, kit.Builder, time.Second, 0) + }) +} + +func runQuotePriceForUnsealedRetrieval(t *testing.T, b kit.APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { + ctx := context.Background() + fulls, miners := b(t, kit.OneFull, kit.OneMiner) + client, miner := fulls[0].FullNode.(*impl.FullNodeAPI), miners[0] + kit.ConnectAndStartMining(t, blocktime, miner, client) + + ppb := int64(1) + unsealPrice := int64(77) + + // Set unsealed price to non-zero + ask, err := miner.MarketGetRetrievalAsk(ctx) + require.NoError(t, err) + ask.PricePerByte = abi.NewTokenAmount(ppb) + ask.UnsealPrice = abi.NewTokenAmount(unsealPrice) + err = miner.MarketSetRetrievalAsk(ctx, ask) + require.NoError(t, err) + + dh := kit.NewDealHarness(t, client, miner) + + _, info, fcid := dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: ctx, + Rseed: 6, + CarExport: false, + FastRet: false, + StartEpoch: startEpoch, + DoRetrieval: false, + }) + + // one more storage deal for the same data + _, _, fcid2 := dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: ctx, + Rseed: 6, + CarExport: false, + FastRet: false, + StartEpoch: startEpoch, + DoRetrieval: false, + }) + require.Equal(t, fcid, fcid2) + + // fetch quote -> zero for unsealed price since unsealed file already exists. + offers, err := client.ClientFindData(ctx, fcid, &info.PieceCID) + require.NoError(t, err) + require.Len(t, offers, 2) + require.Equal(t, offers[0], offers[1]) + require.Equal(t, uint64(0), offers[0].UnsealPrice.Uint64()) + require.Equal(t, info.Size*uint64(ppb), offers[0].MinPrice.Uint64()) + + // remove ONLY one unsealed file + ss, err := miner.StorageList(context.Background()) + require.NoError(t, err) + _, err = miner.SectorsList(ctx) + require.NoError(t, err) + +iLoop: + for storeID, sd := range ss { + for _, sector := range sd { + require.NoError(t, miner.StorageDropSector(ctx, storeID, sector.SectorID, storiface.FTUnsealed)) + // remove ONLY one + break iLoop + } + } + + // get retrieval quote -> zero for unsealed price as unsealed file exists. + offers, err = client.ClientFindData(ctx, fcid, &info.PieceCID) + require.NoError(t, err) + require.Len(t, offers, 2) + require.Equal(t, offers[0], offers[1]) + require.Equal(t, uint64(0), offers[0].UnsealPrice.Uint64()) + require.Equal(t, info.Size*uint64(ppb), offers[0].MinPrice.Uint64()) + + // remove the other unsealed file as well + ss, err = miner.StorageList(context.Background()) + require.NoError(t, err) + _, err = miner.SectorsList(ctx) + require.NoError(t, err) + for storeID, sd := range ss { + for _, sector := range sd { + require.NoError(t, miner.StorageDropSector(ctx, storeID, sector.SectorID, storiface.FTUnsealed)) + } + } + + // fetch quote -> non-zero for unseal price as we no more unsealed files. + offers, err = client.ClientFindData(ctx, fcid, &info.PieceCID) + require.NoError(t, err) + require.Len(t, offers, 2) + require.Equal(t, offers[0], offers[1]) + require.Equal(t, uint64(unsealPrice), offers[0].UnsealPrice.Uint64()) + total := (info.Size * uint64(ppb)) + uint64(unsealPrice) + require.Equal(t, total, offers[0].MinPrice.Uint64()) } func TestPublishDealsBatching(t *testing.T) { @@ -413,7 +509,14 @@ func runFullDealCycles(t *testing.T, n int, b kit.APIBuilder, blocktime time.Dur baseseed := 6 for i := 0; i < n; i++ { - dh.MakeFullDeal(context.Background(), baseseed+i, carExport, fastRet, startEpoch) + _, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: context.Background(), + Rseed: baseseed + i, + CarExport: carExport, + FastRet: fastRet, + StartEpoch: startEpoch, + DoRetrieval: true, + }) } } @@ -519,5 +622,12 @@ func runZeroPricePerByteRetrievalDealFlow(t *testing.T, b kit.APIBuilder, blockt err = miner.MarketSetRetrievalAsk(ctx, ask) require.NoError(t, err) - dh.MakeFullDeal(ctx, 6, false, false, startEpoch) + _, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: ctx, + Rseed: 6, + CarExport: false, + FastRet: false, + StartEpoch: startEpoch, + DoRetrieval: true, + }) } diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 7f1b70f2d..f5eb4e363 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -206,7 +206,14 @@ func TestGatewayDealFlow(t *testing.T) { dealStartEpoch := abi.ChainEpoch(2 << 12) dh := kit.NewDealHarness(t, nodes.lite, nodes.miner) - dh.MakeFullDeal(ctx, 6, false, false, dealStartEpoch) + dh.MakeFullDeal(kit.MakeFullDealParams{ + Ctx: ctx, + Rseed: 6, + CarExport: false, + FastRet: false, + StartEpoch: dealStartEpoch, + DoRetrieval: true, + }) } func TestGatewayCLIDealFlow(t *testing.T) { diff --git a/itests/kit/deals.go b/itests/kit/deals.go index c768eb87f..f85618901 100644 --- a/itests/kit/deals.go +++ b/itests/kit/deals.go @@ -34,6 +34,15 @@ type DealHarness struct { miner TestMiner } +type MakeFullDealParams struct { + Ctx context.Context + Rseed int + CarExport bool + FastRet bool + StartEpoch abi.ChainEpoch + DoRetrieval bool +} + // NewDealHarness creates a test harness that contains testing utilities for deals. func NewDealHarness(t *testing.T, client api.FullNode, miner TestMiner) *DealHarness { return &DealHarness{ @@ -43,8 +52,9 @@ func NewDealHarness(t *testing.T, client api.FullNode, miner TestMiner) *DealHar } } -func (dh *DealHarness) MakeFullDeal(ctx context.Context, rseed int, carExport, fastRet bool, startEpoch abi.ChainEpoch) { - res, _, data, err := CreateImportFile(ctx, dh.client, rseed, 0) +func (dh *DealHarness) MakeFullDeal(params MakeFullDealParams) ([]byte, + *api.DealInfo, cid.Cid) { + res, _, data, err := CreateImportFile(params.Ctx, dh.client, params.Rseed, 0) if err != nil { dh.t.Fatal(err) } @@ -52,17 +62,21 @@ func (dh *DealHarness) MakeFullDeal(ctx context.Context, rseed int, carExport, f fcid := res.Root fmt.Println("FILE CID: ", fcid) - deal := dh.StartDeal(ctx, fcid, fastRet, startEpoch) + deal := dh.StartDeal(params.Ctx, fcid, params.FastRet, params.StartEpoch) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - dh.WaitDealSealed(ctx, deal, false, false, nil) + dh.WaitDealSealed(params.Ctx, deal, false, false, nil) // Retrieval - info, err := dh.client.ClientGetDealInfo(ctx, *deal) + info, err := dh.client.ClientGetDealInfo(params.Ctx, *deal) require.NoError(dh.t, err) - dh.TestRetrieval(ctx, fcid, &info.PieceCID, carExport, data) + if params.DoRetrieval { + dh.TestRetrieval(params.Ctx, fcid, &info.PieceCID, params.CarExport, data) + } + + return data, info, fcid } func (dh *DealHarness) StartDeal(ctx context.Context, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid { diff --git a/markets/pricing/cli.go b/markets/pricing/cli.go new file mode 100644 index 000000000..3c2a5f248 --- /dev/null +++ b/markets/pricing/cli.go @@ -0,0 +1,48 @@ +package pricing + +import ( + "bytes" + "context" + "encoding/json" + "os/exec" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "golang.org/x/xerrors" +) + +func ExternalRetrievalPricingFunc(cmd string) dtypes.RetrievalPricingFunc { + return func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + return runPricingFunc(ctx, cmd, pricingInput) + } +} + +func runPricingFunc(_ context.Context, cmd string, params interface{}) (retrievalmarket.Ask, error) { + j, err := json.Marshal(params) + if err != nil { + return retrievalmarket.Ask{}, err + } + + var out bytes.Buffer + var errb bytes.Buffer + + c := exec.Command("sh", "-c", cmd) + c.Stdin = bytes.NewReader(j) + c.Stdout = &out + c.Stderr = &errb + + switch err := c.Run().(type) { + case nil: + bz := out.Bytes() + resp := retrievalmarket.Ask{} + + if err := json.Unmarshal(bz, &resp); err != nil { + return resp, xerrors.Errorf("failed to parse pricing output %s, err=%w", string(bz), err) + } + return resp, nil + case *exec.ExitError: + return retrievalmarket.Ask{}, xerrors.Errorf("pricing func exited with error: %s", errb.String()) + default: + return retrievalmarket.Ask{}, xerrors.Errorf("pricing func cmd run error: %w", err) + } +} diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index c13a0b03d..95b7e1b3c 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -99,3 +99,67 @@ func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipS return head.Key().Bytes(), head.Height(), nil } + +func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { + si, err := rpn.miner.GetSectorInfo(sectorID) + if err != nil { + return false, xerrors.Errorf("failed to get sectorinfo, err=%s", err) + } + + mid, err := address.IDFromAddress(rpn.miner.Address()) + if err != nil { + return false, err + } + + ref := specstorage.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(mid), + Number: sectorID, + }, + ProofType: si.SectorType, + } + + log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length) + return rpn.pp.IsUnsealed(ctx, ref, storiface.UnpaddedByteIndex(offset), length) +} + +// GetRetrievalPricingInput takes a set of candidate storage deals that can serve a retrieval request, +// and returns an minimally populated PricingInput. This PricingInput should be enhanced +// with more data, and passed to the pricing function to determine the final quoted price. +func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) { + resp := retrievalmarket.PricingInput{} + + head, err := rpn.full.ChainHead(ctx) + if err != nil { + return resp, xerrors.Errorf("failed to get chain head: %w", err) + } + tsk := head.Key() + + for _, dealID := range storageDeals { + ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk) + if err != nil { + return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err) + } + if ds.Proposal.VerifiedDeal { + resp.VerifiedDeal = true + } + + if ds.Proposal.PieceCID.Equals(pieceCID) { + resp.PieceSize = ds.Proposal.PieceSize.Unpadded() + } + + // If we've discovered a verified deal with the required PieceCID, we don't need + // to lookup more deals and we're done. + if resp.VerifiedDeal && resp.PieceSize != 0 { + break + } + } + + // Note: The piece size can never actually be zero. We only use it to here + // to assert that we didn't find a matching piece. + if resp.PieceSize == 0 { + return resp, xerrors.New("failed to find matching piece") + } + + return resp, nil +} diff --git a/markets/retrievaladapter/provider_test.go b/markets/retrievaladapter/provider_test.go new file mode 100644 index 000000000..5cdf5d060 --- /dev/null +++ b/markets/retrievaladapter/provider_test.go @@ -0,0 +1,151 @@ +package retrievaladapter + +import ( + "context" + "testing" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + testnet "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/mocks" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/types" + "github.com/golang/mock/gomock" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" +) + +func TestGetPricingInput(t *testing.T) { + ctx := context.Background() + tsk := &types.TipSet{} + key := tsk.Key() + + pcid := testnet.GenerateCids(1)[0] + deals := []abi.DealID{1, 2} + paddedSize := abi.PaddedPieceSize(128) + unpaddedSize := paddedSize.Unpadded() + + tcs := map[string]struct { + pieceCid cid.Cid + deals []abi.DealID + fFnc func(node *mocks.MockFullNode) + + expectedErrorStr string + expectedVerified bool + expectedPieceSize abi.UnpaddedPieceSize + }{ + "error when fails to fetch chain head": { + fFnc: func(n *mocks.MockFullNode) { + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, xerrors.New("chain head error")).Times(1) + }, + expectedErrorStr: "chain head error", + }, + + "error when no piece matches": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedErrorStr: "failed to find matching piece", + }, + + "verified is true even if one deal is verified and we get the correct piecesize": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: pcid, + PieceSize: paddedSize, + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + VerifiedDeal: true, + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedPieceSize: unpaddedSize, + expectedVerified: true, + }, + + "verified is false if both deals are unverified and we get the correct piece size": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: pcid, + PieceSize: paddedSize, + VerifiedDeal: false, + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + VerifiedDeal: false, + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedPieceSize: unpaddedSize, + expectedVerified: false, + }, + } + + for name, tc := range tcs { + tc := tc + t.Run(name, func(t *testing.T) { + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + mockFull := mocks.NewMockFullNode(mockCtrl) + rpn := &retrievalProviderNode{ + full: mockFull, + } + if tc.fFnc != nil { + tc.fFnc(mockFull) + } + + resp, err := rpn.GetRetrievalPricingInput(ctx, pcid, deals) + + if tc.expectedErrorStr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErrorStr) + require.Equal(t, retrievalmarket.PricingInput{}, resp) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedPieceSize, resp.PieceSize) + require.Equal(t, tc.expectedVerified, resp.VerifiedDeal) + } + }) + } +} diff --git a/node/builder.go b/node/builder.go index f05dd8164..7740e8e97 100644 --- a/node/builder.go +++ b/node/builder.go @@ -409,8 +409,16 @@ var MinerNode = Options( // Markets (retrieval) Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider), + Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(config.DealmakingConfig{ + RetrievalPricing: &config.RetrievalPricing{ + Strategy: config.RetrievalPricingDefaultMode, + Default: &config.RetrievalPricingDefault{}, + }, + })), + Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), + Override(HandleRetrievalKey, modules.HandleRetrieval), // Markets (storage) @@ -564,6 +572,19 @@ func ConfigStorageMiner(c interface{}) Option { return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) } + pricingConfig := cfg.Dealmaking.RetrievalPricing + if pricingConfig.Strategy == config.RetrievalPricingExternalMode { + if pricingConfig.External == nil { + return Error(xerrors.New("retrieval pricing policy has been to set to external but external policy config is nil")) + } + + if pricingConfig.External.Path == "" { + return Error(xerrors.New("retrieval pricing policy has been to set to external but external script path is empty")) + } + } else if pricingConfig.Strategy != config.RetrievalPricingDefaultMode { + return Error(xerrors.New("retrieval pricing policy must be either default or external")) + } + return Options( ConfigCommon(&cfg.Common), @@ -575,6 +596,8 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), + Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, diff --git a/node/config/def.go b/node/config/def.go index 177871cc5..700a3f94f 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -14,6 +14,14 @@ import ( sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" ) +const ( + // RetrievalPricingDefault configures the node to use the default retrieval pricing policy. + RetrievalPricingDefaultMode = "default" + // RetrievalPricingExternal configures the node to use the external retrieval pricing script + // configured by the user. + RetrievalPricingExternalMode = "external" +) + // Common is common config between full node and miner type Common struct { API API @@ -70,6 +78,29 @@ type DealmakingConfig struct { Filter string RetrievalFilter string + + RetrievalPricing *RetrievalPricing +} + +type RetrievalPricing struct { + Strategy string // possible values: "default", "external" + + Default *RetrievalPricingDefault + External *RetrievalPricingExternal +} + +type RetrievalPricingExternal struct { + // Path of the external script that will be run to price a retrieval deal. + // This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external". + Path string +} + +type RetrievalPricingDefault struct { + // VerifiedDealsFreeTransfer configures zero fees for data transfer for a retrieval deal + // of a payloadCid that belongs to a verified storage deal. + // This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "default". + // default value is true + VerifiedDealsFreeTransfer bool } type SealingConfig struct { @@ -326,6 +357,16 @@ func DefaultStorageMiner() *StorageMiner { PublishMsgPeriod: Duration(time.Hour), MaxDealsPerPublishMsg: 8, MaxProviderCollateralMultiplier: 2, + + RetrievalPricing: &RetrievalPricing{ + Strategy: RetrievalPricingDefaultMode, + Default: &RetrievalPricingDefault{ + VerifiedDealsFreeTransfer: true, + }, + External: &RetrievalPricingExternal{ + Path: "", + }, + }, }, Fees: MinerFeeConfig{ diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index b7a1be2e1..3c0c14383 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -92,3 +92,5 @@ type GetExpectedSealDurationFunc func() (time.Duration, error) type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) + +type RetrievalPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 296f1c10e..3b346a80f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -10,6 +10,7 @@ import ( "path/filepath" "time" + "github.com/filecoin-project/lotus/markets/pricing" "go.uber.org/fx" "go.uber.org/multierr" "golang.org/x/xerrors" @@ -634,6 +635,20 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt } } +// RetrievalPricingFunc configures the pricing function to use for retrieval deals. +func RetrievalPricingFunc(cfg config.DealmakingConfig) func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + _ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc { + + return func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + _ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc { + if cfg.RetrievalPricing.Strategy == config.RetrievalPricingExternalMode { + return pricing.ExternalRetrievalPricingFunc(cfg.RetrievalPricing.External.Path) + } + + return retrievalimpl.DefaultPricingFunc(cfg.RetrievalPricing.Default.VerifiedDealsFreeTransfer) + } +} + // RetrievalProvider creates a new retrieval provider attached to the provider blockstore func RetrievalProvider(h host.Host, miner *storage.Miner, @@ -643,6 +658,7 @@ func RetrievalProvider(h host.Host, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, pieceProvider sectorstorage.PieceProvider, + pricingFnc dtypes.RetrievalPricingFunc, userFilter dtypes.RetrievalDealFilter, ) (retrievalmarket.RetrievalProvider, error) { adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full) @@ -655,7 +671,8 @@ func RetrievalProvider(h host.Host, netwk := rmnet.NewFromLibp2pHost(h) opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) - return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) + return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), + retrievalimpl.RetrievalPricingFunc(pricingFnc), opt) } var WorkerCallsPrefix = datastore.NewKey("/worker/calls")