Merge pull request #6175 from filecoin-project/feat/dynamic-retreival-pricing
Dynamic Retrieval pricing
This commit is contained in:
commit
44de67cf7d
@ -61,7 +61,14 @@ func TestMinerAllInfo(t *testing.T) {
|
||||
t.Run("pre-info-all", run)
|
||||
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
dh.MakeFullDeal(context.Background(), 6, false, false, 0)
|
||||
_, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: context.Background(),
|
||||
Rseed: 6,
|
||||
CarExport: false,
|
||||
FastRet: false,
|
||||
StartEpoch: 0,
|
||||
DoRetrieval: true,
|
||||
})
|
||||
|
||||
t.Run("post-info-all", run)
|
||||
}
|
||||
|
@ -235,7 +235,7 @@ var retrievalSetAskCmd = &cli.Command{
|
||||
|
||||
var retrievalGetAskCmd = &cli.Command{
|
||||
Name: "get-ask",
|
||||
Usage: "Get the provider's current retrieval ask",
|
||||
Usage: "Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
ctx := lcli.DaemonContext(cctx)
|
||||
|
@ -745,7 +745,7 @@ COMMANDS:
|
||||
selection Configure acceptance criteria for retrieval deal proposals
|
||||
list List all active retrieval deals for this miner
|
||||
set-ask Configure the provider's retrieval ask
|
||||
get-ask Get the provider's current retrieval ask
|
||||
get-ask Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
|
||||
help, h Shows a list of commands or help for one command
|
||||
|
||||
OPTIONS:
|
||||
@ -848,7 +848,7 @@ OPTIONS:
|
||||
### lotus-miner retrieval-deals get-ask
|
||||
```
|
||||
NAME:
|
||||
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask
|
||||
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
|
||||
|
||||
USAGE:
|
||||
lotus-miner retrieval-deals get-ask [command options] [arguments...]
|
||||
|
4
extern/sector-storage/mock/mock.go
vendored
4
extern/sector-storage/mock/mock.go
vendored
@ -119,6 +119,10 @@ func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ForceState(sid storage.SectorRef, st int) error {
|
||||
mgr.lk.Lock()
|
||||
ss, ok := mgr.sectors[sid.ID]
|
||||
|
23
extern/sector-storage/piece_provider.go
vendored
23
extern/sector-storage/piece_provider.go
vendored
@ -24,8 +24,11 @@ type Unsealer interface {
|
||||
type PieceProvider interface {
|
||||
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
|
||||
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
|
||||
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
|
||||
}
|
||||
|
||||
var _ PieceProvider = &pieceProvider{}
|
||||
|
||||
type pieceProvider struct {
|
||||
storage *stores.Remote
|
||||
index stores.SectorIndex
|
||||
@ -40,6 +43,26 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse
|
||||
}
|
||||
}
|
||||
|
||||
// IsUnsealed checks if we have the unsealed piece at the given offset in an already
|
||||
// existing unsealed file either locally or on any of the workers.
|
||||
func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
if err := offset.Valid(); err != nil {
|
||||
return false, xerrors.Errorf("offset is not valid: %w", err)
|
||||
}
|
||||
if err := size.Validate(); err != nil {
|
||||
return false, xerrors.Errorf("size is not a valid piece size: %w", err)
|
||||
}
|
||||
|
||||
ctxLock, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := p.index.StorageLock(ctxLock, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
|
||||
return false, xerrors.Errorf("acquiring read sector lock: %w", err)
|
||||
}
|
||||
|
||||
return p.storage.CheckIsUnsealed(ctxLock, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
|
||||
}
|
||||
|
||||
// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it.
|
||||
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
|
||||
//
|
||||
|
29
extern/sector-storage/piece_provider_test.go
vendored
29
extern/sector-storage/piece_provider_test.go
vendored
@ -53,6 +53,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
|
||||
// pre-commit 1
|
||||
preCommit1 := ppt.preCommit1(t)
|
||||
|
||||
// check if IsUnsealed -> true
|
||||
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
|
||||
// read piece
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
|
||||
false, pieceData)
|
||||
@ -60,6 +62,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
|
||||
// pre-commit 2
|
||||
ppt.preCommit2(t, preCommit1)
|
||||
|
||||
// check if IsUnsealed -> true
|
||||
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
|
||||
// read piece
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
|
||||
false, pieceData)
|
||||
@ -67,10 +71,14 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
|
||||
// finalize -> nil here will remove unsealed file
|
||||
ppt.finalizeSector(t, nil)
|
||||
|
||||
// check if IsUnsealed -> false
|
||||
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
|
||||
// Read the piece -> will have to unseal
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
|
||||
true, pieceData)
|
||||
|
||||
// check if IsUnsealed -> true
|
||||
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), size))
|
||||
// read the piece -> will not have to unseal
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
|
||||
false, pieceData)
|
||||
@ -118,12 +126,18 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
|
||||
|
||||
// pre-commit 1
|
||||
pC1 := ppt.preCommit1(t)
|
||||
|
||||
// check if IsUnsealed -> true
|
||||
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
|
||||
// Read the piece -> no need to unseal
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
|
||||
false, pd1)
|
||||
|
||||
// pre-commit 2
|
||||
ppt.preCommit2(t, pC1)
|
||||
|
||||
// check if IsUnsealed -> true
|
||||
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
|
||||
// Read the piece -> no need to unseal
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
|
||||
false, pd1)
|
||||
@ -133,6 +147,8 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
|
||||
// sending nil here will remove all unsealed files after sector is finalized.
|
||||
ppt.finalizeSector(t, nil)
|
||||
|
||||
// check if IsUnsealed -> false
|
||||
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
|
||||
// Read the piece -> have to unseal since we removed the file.
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
|
||||
true, pd1)
|
||||
@ -142,14 +158,21 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
|
||||
|
||||
// remove the unsealed file and read again -> will have to unseal.
|
||||
ppt.removeAllUnsealedSectorFiles(t)
|
||||
// check if IsUnsealed -> false
|
||||
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(0), pd1size))
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
|
||||
true, pd1)
|
||||
|
||||
// check if IsUnsealed -> true
|
||||
require.True(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
|
||||
// Read Piece 2 -> no unsealing as it got unsealed above.
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)
|
||||
|
||||
// remove all unseal files -> Read Piece 2 -> will have to Unseal.
|
||||
ppt.removeAllUnsealedSectorFiles(t)
|
||||
|
||||
// check if IsUnsealed -> false
|
||||
require.False(t, ppt.isUnsealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
|
||||
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
|
||||
}
|
||||
|
||||
@ -306,6 +329,12 @@ func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreC
|
||||
p.commD = commD
|
||||
}
|
||||
|
||||
func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool {
|
||||
b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size)
|
||||
require.NoError(t, err)
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
|
||||
expectedHadToUnseal bool, expectedBytes []byte) {
|
||||
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
|
||||
|
100
extern/sector-storage/stores/remote.go
vendored
100
extern/sector-storage/stores/remote.go
vendored
@ -484,6 +484,87 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece
|
||||
// either locally or on any of the workers.
|
||||
// Returns true if we have the unsealed piece, false otherwise.
|
||||
func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (bool, error) {
|
||||
ft := storiface.FTUnsealed
|
||||
|
||||
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("acquire local: %w", err)
|
||||
}
|
||||
|
||||
path := storiface.PathByType(paths, ft)
|
||||
if path != "" {
|
||||
// if we have the unsealed file locally, check if it has the unsealed piece.
|
||||
log.Infof("Read local %s (+%d,%d)", path, offset, size)
|
||||
ssize, err := s.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// open the unsealed sector file for the given sector size located at the given path.
|
||||
pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("opening partial file: %w", err)
|
||||
}
|
||||
log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size)
|
||||
|
||||
// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
|
||||
// in the unsealed sector file. That is what `HasAllocated` checks for.
|
||||
has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("has allocated: %w", err)
|
||||
}
|
||||
|
||||
// close the local unsealed file.
|
||||
if err := r.pfHandler.Close(pf); err != nil {
|
||||
return false, xerrors.Errorf("failed to close partial file: %s", err)
|
||||
}
|
||||
log.Debugf("checked if local partial file has the piece %s (+%d,%d), returning answer=%t", path, offset, size, has)
|
||||
|
||||
// Sector files can technically not have a piece unsealed locally, but have it unsealed in remote storage, so we probably
|
||||
// want to return only if has is true
|
||||
if has {
|
||||
return has, nil
|
||||
}
|
||||
}
|
||||
|
||||
// --- We don't have the unsealed piece in an unsealed sector file locally
|
||||
// Check if we have it in a remote cluster.
|
||||
|
||||
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("StorageFindSector: %s", err)
|
||||
}
|
||||
|
||||
if len(si) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
sort.Slice(si, func(i, j int) bool {
|
||||
return si[i].Weight < si[j].Weight
|
||||
})
|
||||
|
||||
for _, info := range si {
|
||||
for _, url := range info.URLs {
|
||||
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
|
||||
if err != nil {
|
||||
log.Warnw("check if remote has piece", "url", url, "error", err)
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
|
||||
// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy.
|
||||
// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file
|
||||
@ -507,7 +588,7 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
||||
if path != "" {
|
||||
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
|
||||
// unsealed piece.
|
||||
log.Infof("Read local %s (+%d,%d)", path, offset, size)
|
||||
log.Debugf("Check local %s (+%d,%d)", path, offset, size)
|
||||
ssize, err := s.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -529,19 +610,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
||||
}
|
||||
log.Debugf("check if partial file is allocated %s (+%d,%d)", path, offset, size)
|
||||
|
||||
if !has {
|
||||
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
|
||||
if err := r.pfHandler.Close(pf); err != nil {
|
||||
return nil, xerrors.Errorf("close partial file: %w", err)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if has {
|
||||
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
|
||||
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
|
||||
}
|
||||
|
||||
// --- We don't have the unsealed sector file locally
|
||||
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
|
||||
if err := r.pfHandler.Close(pf); err != nil {
|
||||
return nil, xerrors.Errorf("close partial file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- We don't have the unsealed piece in an unsealed sector file locally
|
||||
|
||||
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
|
||||
// to determine which workers have the unsealed file and then query those workers to know
|
||||
|
329
extern/sector-storage/stores/remote_test.go
vendored
329
extern/sector-storage/stores/remote_test.go
vendored
@ -27,8 +27,10 @@ func TestReader(t *testing.T) {
|
||||
bz := []byte("Hello World")
|
||||
|
||||
pfPath := "path"
|
||||
ft := storiface.FTUnsealed
|
||||
emptyPartialFile := &partialfile.PartialFile{}
|
||||
sectorSize := abi.SealProofInfos[1].SectorSize
|
||||
|
||||
ft := storiface.FTUnsealed
|
||||
|
||||
sectorRef := storage.SectorRef{
|
||||
ID: abi.SectorID{
|
||||
@ -37,7 +39,6 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
ProofType: 1,
|
||||
}
|
||||
sectorSize := abi.SealProofInfos[1].SectorSize
|
||||
|
||||
offset := abi.PaddedPieceSize(100)
|
||||
size := abi.PaddedPieceSize(1000)
|
||||
@ -151,7 +152,7 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
// --- nil reader when local unsealed file does NOT have unsealed piece
|
||||
"nil reader when local unsealed file does not have the piece": {
|
||||
"nil reader when local unsealed file does not have the unsealed piece and remote sector also dosen't have the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
@ -162,7 +163,20 @@ func TestReader(t *testing.T) {
|
||||
false, nil)
|
||||
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getAllocatedReturnCode: 500,
|
||||
},
|
||||
|
||||
// ---- nil reader when none of the remote unsealed file has unsealed piece
|
||||
@ -231,6 +245,37 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
// --- Success for remote unsealed file
|
||||
// --- Success for remote unsealed file
|
||||
"successfully fetches reader from remote unsealed piece when local unsealed file does NOT have the unsealed Piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getSectorReturnCode: 200,
|
||||
getAllocatedReturnCode: 200,
|
||||
expectedSectorBytes: bz,
|
||||
expectedNonNilReader: true,
|
||||
},
|
||||
|
||||
"successfully fetches reader for piece from remote unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
@ -326,6 +371,283 @@ func TestReader(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckIsUnsealed(t *testing.T) {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
|
||||
pfPath := "path"
|
||||
ft := storiface.FTUnsealed
|
||||
emptyPartialFile := &partialfile.PartialFile{}
|
||||
|
||||
sectorRef := storage.SectorRef{
|
||||
ID: abi.SectorID{
|
||||
Miner: 123,
|
||||
Number: 123,
|
||||
},
|
||||
ProofType: 1,
|
||||
}
|
||||
sectorSize := abi.SealProofInfos[1].SectorSize
|
||||
|
||||
offset := abi.PaddedPieceSize(100)
|
||||
size := abi.PaddedPieceSize(1000)
|
||||
ctx := context.Background()
|
||||
|
||||
tcs := map[string]struct {
|
||||
storeFnc func(s *mocks.MockStore)
|
||||
pfFunc func(s *mocks.MockpartialFileHandler)
|
||||
indexFnc func(s *mocks.MockSectorIndex, serverURL string)
|
||||
|
||||
needHttpServer bool
|
||||
|
||||
getAllocatedReturnCode int
|
||||
|
||||
serverUrl string
|
||||
|
||||
// expectation
|
||||
errStr string
|
||||
expectedIsUnealed bool
|
||||
}{
|
||||
|
||||
// -------- have the unsealed file locally
|
||||
"fails when error while acquiring unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error"))
|
||||
},
|
||||
|
||||
errStr: "acquire error",
|
||||
},
|
||||
|
||||
"fails when error while opening local partial (unsealed) file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error"))
|
||||
},
|
||||
errStr: "pf open error",
|
||||
},
|
||||
|
||||
"fails when error while checking if local unsealed file has piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, xerrors.New("piece check error"))
|
||||
},
|
||||
|
||||
errStr: "piece check error",
|
||||
},
|
||||
|
||||
"fails when error while closing local unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(xerrors.New("close error")).Times(1)
|
||||
},
|
||||
errStr: "close error",
|
||||
},
|
||||
|
||||
// ------------------- don't have the unsealed file locally
|
||||
|
||||
"fails when error while finding sector": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, xerrors.New("find sector error"))
|
||||
},
|
||||
errStr: "find sector error",
|
||||
},
|
||||
|
||||
"false when no worker has unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, nil)
|
||||
},
|
||||
},
|
||||
|
||||
// false when local unsealed file does NOT have unsealed piece
|
||||
"false when local unsealed file does not have the piece and remote sector too dosen't have the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getAllocatedReturnCode: 500,
|
||||
},
|
||||
|
||||
"false when none of the worker has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getAllocatedReturnCode: 500,
|
||||
},
|
||||
|
||||
// ---- Success for local unsealed file
|
||||
"true when local unsealed file has the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, nil)
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
|
||||
},
|
||||
|
||||
expectedIsUnealed: true,
|
||||
},
|
||||
|
||||
// --- Success for remote unsealed file
|
||||
"true if we have a remote unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getAllocatedReturnCode: 200,
|
||||
expectedIsUnealed: true,
|
||||
},
|
||||
|
||||
"true when local unsealed file does NOT have the unsealed Piece but remote sector has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getAllocatedReturnCode: 200,
|
||||
expectedIsUnealed: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// create go mock controller here
|
||||
mockCtrl := gomock.NewController(t)
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create them mocks
|
||||
lstore := mocks.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks.NewMockpartialFileHandler(mockCtrl)
|
||||
index := mocks.NewMockSectorIndex(mockCtrl)
|
||||
|
||||
if tc.storeFnc != nil {
|
||||
tc.storeFnc(lstore)
|
||||
}
|
||||
if tc.pfFunc != nil {
|
||||
tc.pfFunc(pfhandler)
|
||||
}
|
||||
|
||||
if tc.needHttpServer {
|
||||
// run http server
|
||||
ts := httptest.NewServer(&mockHttpServer{
|
||||
expectedSectorName: storiface.SectorName(sectorRef.ID),
|
||||
expectedFileType: ft.String(),
|
||||
expectedOffset: fmt.Sprintf("%d", offset.Unpadded()),
|
||||
expectedSize: fmt.Sprintf("%d", size.Unpadded()),
|
||||
expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType),
|
||||
|
||||
getAllocatedReturnCode: tc.getAllocatedReturnCode,
|
||||
})
|
||||
defer ts.Close()
|
||||
tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID))
|
||||
}
|
||||
if tc.indexFnc != nil {
|
||||
tc.indexFnc(index, tc.serverUrl)
|
||||
}
|
||||
|
||||
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
|
||||
isUnsealed, err := remoteStore.CheckIsUnsealed(ctx, sectorRef, offset, size)
|
||||
|
||||
if tc.errStr != "" {
|
||||
require.Error(t, err)
|
||||
require.False(t, isUnsealed)
|
||||
require.Contains(t, err.Error(), tc.errStr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, tc.expectedIsUnealed, isUnsealed)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) {
|
||||
l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -358,6 +680,7 @@ type mockHttpServer struct {
|
||||
expectedSectorType string
|
||||
|
||||
getAllocatedReturnCode int
|
||||
|
||||
getSectorReturnCode int
|
||||
getSectorBytes []byte
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -35,7 +35,7 @@ require (
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
github.com/filecoin-project/go-data-transfer v1.6.0
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
|
||||
github.com/filecoin-project/go-fil-markets v1.4.0
|
||||
github.com/filecoin-project/go-fil-markets v1.5.0
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
|
||||
github.com/filecoin-project/go-multistore v0.0.3
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
|
||||
|
4
go.sum
4
go.sum
@ -283,8 +283,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
|
||||
github.com/filecoin-project/go-fil-markets v1.4.0 h1:J4L6o+FVOmS7ZWV6wxLPiuoDzGC7iS3S5NRFL1enEr0=
|
||||
github.com/filecoin-project/go-fil-markets v1.4.0/go.mod h1:7be6zzFwaN8kxVeYZf/UUj/JilHC0ogPvWqE1TW8Ptk=
|
||||
github.com/filecoin-project/go-fil-markets v1.5.0 h1:3KEs01L8XFCEgujZ6ggFjr1XWjpjTQcmSSeo3I99I0k=
|
||||
github.com/filecoin-project/go-fil-markets v1.5.0/go.mod h1:7be6zzFwaN8kxVeYZf/UUj/JilHC0ogPvWqE1TW8Ptk=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
||||
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
|
||||
|
@ -94,7 +94,14 @@ func runTestCCUpgrade(t *testing.T, b kit.APIBuilder, blocktime time.Duration, u
|
||||
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
|
||||
dh.MakeFullDeal(context.Background(), 6, false, false, 0)
|
||||
_, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: context.Background(),
|
||||
Rseed: 6,
|
||||
CarExport: false,
|
||||
FastRet: false,
|
||||
StartEpoch: 0,
|
||||
DoRetrieval: true,
|
||||
})
|
||||
|
||||
// Validate upgrade
|
||||
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/markets/storageadapter"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
@ -80,6 +81,101 @@ func TestAPIDealFlowReal(t *testing.T) {
|
||||
t.Run("retrieval-second", func(t *testing.T) {
|
||||
runSecondDealRetrievalTest(t, kit.Builder, time.Second)
|
||||
})
|
||||
|
||||
t.Run("quote-price-for-non-unsealed-retrieval", func(t *testing.T) {
|
||||
runQuotePriceForUnsealedRetrieval(t, kit.Builder, time.Second, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func runQuotePriceForUnsealedRetrieval(t *testing.T, b kit.APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||
ctx := context.Background()
|
||||
fulls, miners := b(t, kit.OneFull, kit.OneMiner)
|
||||
client, miner := fulls[0].FullNode.(*impl.FullNodeAPI), miners[0]
|
||||
kit.ConnectAndStartMining(t, blocktime, miner, client)
|
||||
|
||||
ppb := int64(1)
|
||||
unsealPrice := int64(77)
|
||||
|
||||
// Set unsealed price to non-zero
|
||||
ask, err := miner.MarketGetRetrievalAsk(ctx)
|
||||
require.NoError(t, err)
|
||||
ask.PricePerByte = abi.NewTokenAmount(ppb)
|
||||
ask.UnsealPrice = abi.NewTokenAmount(unsealPrice)
|
||||
err = miner.MarketSetRetrievalAsk(ctx, ask)
|
||||
require.NoError(t, err)
|
||||
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
|
||||
_, info, fcid := dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: ctx,
|
||||
Rseed: 6,
|
||||
CarExport: false,
|
||||
FastRet: false,
|
||||
StartEpoch: startEpoch,
|
||||
DoRetrieval: false,
|
||||
})
|
||||
|
||||
// one more storage deal for the same data
|
||||
_, _, fcid2 := dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: ctx,
|
||||
Rseed: 6,
|
||||
CarExport: false,
|
||||
FastRet: false,
|
||||
StartEpoch: startEpoch,
|
||||
DoRetrieval: false,
|
||||
})
|
||||
require.Equal(t, fcid, fcid2)
|
||||
|
||||
// fetch quote -> zero for unsealed price since unsealed file already exists.
|
||||
offers, err := client.ClientFindData(ctx, fcid, &info.PieceCID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, offers, 2)
|
||||
require.Equal(t, offers[0], offers[1])
|
||||
require.Equal(t, uint64(0), offers[0].UnsealPrice.Uint64())
|
||||
require.Equal(t, info.Size*uint64(ppb), offers[0].MinPrice.Uint64())
|
||||
|
||||
// remove ONLY one unsealed file
|
||||
ss, err := miner.StorageList(context.Background())
|
||||
require.NoError(t, err)
|
||||
_, err = miner.SectorsList(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
iLoop:
|
||||
for storeID, sd := range ss {
|
||||
for _, sector := range sd {
|
||||
require.NoError(t, miner.StorageDropSector(ctx, storeID, sector.SectorID, storiface.FTUnsealed))
|
||||
// remove ONLY one
|
||||
break iLoop
|
||||
}
|
||||
}
|
||||
|
||||
// get retrieval quote -> zero for unsealed price as unsealed file exists.
|
||||
offers, err = client.ClientFindData(ctx, fcid, &info.PieceCID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, offers, 2)
|
||||
require.Equal(t, offers[0], offers[1])
|
||||
require.Equal(t, uint64(0), offers[0].UnsealPrice.Uint64())
|
||||
require.Equal(t, info.Size*uint64(ppb), offers[0].MinPrice.Uint64())
|
||||
|
||||
// remove the other unsealed file as well
|
||||
ss, err = miner.StorageList(context.Background())
|
||||
require.NoError(t, err)
|
||||
_, err = miner.SectorsList(ctx)
|
||||
require.NoError(t, err)
|
||||
for storeID, sd := range ss {
|
||||
for _, sector := range sd {
|
||||
require.NoError(t, miner.StorageDropSector(ctx, storeID, sector.SectorID, storiface.FTUnsealed))
|
||||
}
|
||||
}
|
||||
|
||||
// fetch quote -> non-zero for unseal price as we no more unsealed files.
|
||||
offers, err = client.ClientFindData(ctx, fcid, &info.PieceCID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, offers, 2)
|
||||
require.Equal(t, offers[0], offers[1])
|
||||
require.Equal(t, uint64(unsealPrice), offers[0].UnsealPrice.Uint64())
|
||||
total := (info.Size * uint64(ppb)) + uint64(unsealPrice)
|
||||
require.Equal(t, total, offers[0].MinPrice.Uint64())
|
||||
}
|
||||
|
||||
func TestPublishDealsBatching(t *testing.T) {
|
||||
@ -413,7 +509,14 @@ func runFullDealCycles(t *testing.T, n int, b kit.APIBuilder, blocktime time.Dur
|
||||
|
||||
baseseed := 6
|
||||
for i := 0; i < n; i++ {
|
||||
dh.MakeFullDeal(context.Background(), baseseed+i, carExport, fastRet, startEpoch)
|
||||
_, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: context.Background(),
|
||||
Rseed: baseseed + i,
|
||||
CarExport: carExport,
|
||||
FastRet: fastRet,
|
||||
StartEpoch: startEpoch,
|
||||
DoRetrieval: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -519,5 +622,12 @@ func runZeroPricePerByteRetrievalDealFlow(t *testing.T, b kit.APIBuilder, blockt
|
||||
err = miner.MarketSetRetrievalAsk(ctx, ask)
|
||||
require.NoError(t, err)
|
||||
|
||||
dh.MakeFullDeal(ctx, 6, false, false, startEpoch)
|
||||
_, _, _ = dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: ctx,
|
||||
Rseed: 6,
|
||||
CarExport: false,
|
||||
FastRet: false,
|
||||
StartEpoch: startEpoch,
|
||||
DoRetrieval: true,
|
||||
})
|
||||
}
|
||||
|
@ -206,7 +206,14 @@ func TestGatewayDealFlow(t *testing.T) {
|
||||
dealStartEpoch := abi.ChainEpoch(2 << 12)
|
||||
|
||||
dh := kit.NewDealHarness(t, nodes.lite, nodes.miner)
|
||||
dh.MakeFullDeal(ctx, 6, false, false, dealStartEpoch)
|
||||
dh.MakeFullDeal(kit.MakeFullDealParams{
|
||||
Ctx: ctx,
|
||||
Rseed: 6,
|
||||
CarExport: false,
|
||||
FastRet: false,
|
||||
StartEpoch: dealStartEpoch,
|
||||
DoRetrieval: true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestGatewayCLIDealFlow(t *testing.T) {
|
||||
|
@ -34,6 +34,15 @@ type DealHarness struct {
|
||||
miner TestMiner
|
||||
}
|
||||
|
||||
type MakeFullDealParams struct {
|
||||
Ctx context.Context
|
||||
Rseed int
|
||||
CarExport bool
|
||||
FastRet bool
|
||||
StartEpoch abi.ChainEpoch
|
||||
DoRetrieval bool
|
||||
}
|
||||
|
||||
// NewDealHarness creates a test harness that contains testing utilities for deals.
|
||||
func NewDealHarness(t *testing.T, client api.FullNode, miner TestMiner) *DealHarness {
|
||||
return &DealHarness{
|
||||
@ -43,8 +52,9 @@ func NewDealHarness(t *testing.T, client api.FullNode, miner TestMiner) *DealHar
|
||||
}
|
||||
}
|
||||
|
||||
func (dh *DealHarness) MakeFullDeal(ctx context.Context, rseed int, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
|
||||
res, _, data, err := CreateImportFile(ctx, dh.client, rseed, 0)
|
||||
func (dh *DealHarness) MakeFullDeal(params MakeFullDealParams) ([]byte,
|
||||
*api.DealInfo, cid.Cid) {
|
||||
res, _, data, err := CreateImportFile(params.Ctx, dh.client, params.Rseed, 0)
|
||||
if err != nil {
|
||||
dh.t.Fatal(err)
|
||||
}
|
||||
@ -52,17 +62,21 @@ func (dh *DealHarness) MakeFullDeal(ctx context.Context, rseed int, carExport, f
|
||||
fcid := res.Root
|
||||
fmt.Println("FILE CID: ", fcid)
|
||||
|
||||
deal := dh.StartDeal(ctx, fcid, fastRet, startEpoch)
|
||||
deal := dh.StartDeal(params.Ctx, fcid, params.FastRet, params.StartEpoch)
|
||||
|
||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||
time.Sleep(time.Second)
|
||||
dh.WaitDealSealed(ctx, deal, false, false, nil)
|
||||
dh.WaitDealSealed(params.Ctx, deal, false, false, nil)
|
||||
|
||||
// Retrieval
|
||||
info, err := dh.client.ClientGetDealInfo(ctx, *deal)
|
||||
info, err := dh.client.ClientGetDealInfo(params.Ctx, *deal)
|
||||
require.NoError(dh.t, err)
|
||||
|
||||
dh.TestRetrieval(ctx, fcid, &info.PieceCID, carExport, data)
|
||||
if params.DoRetrieval {
|
||||
dh.TestRetrieval(params.Ctx, fcid, &info.PieceCID, params.CarExport, data)
|
||||
}
|
||||
|
||||
return data, info, fcid
|
||||
}
|
||||
|
||||
func (dh *DealHarness) StartDeal(ctx context.Context, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
|
||||
|
48
markets/pricing/cli.go
Normal file
48
markets/pricing/cli.go
Normal file
@ -0,0 +1,48 @@
|
||||
package pricing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os/exec"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
func ExternalRetrievalPricingFunc(cmd string) dtypes.RetrievalPricingFunc {
|
||||
return func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
|
||||
return runPricingFunc(ctx, cmd, pricingInput)
|
||||
}
|
||||
}
|
||||
|
||||
func runPricingFunc(_ context.Context, cmd string, params interface{}) (retrievalmarket.Ask, error) {
|
||||
j, err := json.Marshal(params)
|
||||
if err != nil {
|
||||
return retrievalmarket.Ask{}, err
|
||||
}
|
||||
|
||||
var out bytes.Buffer
|
||||
var errb bytes.Buffer
|
||||
|
||||
c := exec.Command("sh", "-c", cmd)
|
||||
c.Stdin = bytes.NewReader(j)
|
||||
c.Stdout = &out
|
||||
c.Stderr = &errb
|
||||
|
||||
switch err := c.Run().(type) {
|
||||
case nil:
|
||||
bz := out.Bytes()
|
||||
resp := retrievalmarket.Ask{}
|
||||
|
||||
if err := json.Unmarshal(bz, &resp); err != nil {
|
||||
return resp, xerrors.Errorf("failed to parse pricing output %s, err=%w", string(bz), err)
|
||||
}
|
||||
return resp, nil
|
||||
case *exec.ExitError:
|
||||
return retrievalmarket.Ask{}, xerrors.Errorf("pricing func exited with error: %s", errb.String())
|
||||
default:
|
||||
return retrievalmarket.Ask{}, xerrors.Errorf("pricing func cmd run error: %w", err)
|
||||
}
|
||||
}
|
@ -99,3 +99,67 @@ func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipS
|
||||
|
||||
return head.Key().Bytes(), head.Height(), nil
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
|
||||
si, err := rpn.miner.GetSectorInfo(sectorID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to get sectorinfo, err=%s", err)
|
||||
}
|
||||
|
||||
mid, err := address.IDFromAddress(rpn.miner.Address())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
ref := specstorage.SectorRef{
|
||||
ID: abi.SectorID{
|
||||
Miner: abi.ActorID(mid),
|
||||
Number: sectorID,
|
||||
},
|
||||
ProofType: si.SectorType,
|
||||
}
|
||||
|
||||
log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length)
|
||||
return rpn.pp.IsUnsealed(ctx, ref, storiface.UnpaddedByteIndex(offset), length)
|
||||
}
|
||||
|
||||
// GetRetrievalPricingInput takes a set of candidate storage deals that can serve a retrieval request,
|
||||
// and returns an minimally populated PricingInput. This PricingInput should be enhanced
|
||||
// with more data, and passed to the pricing function to determine the final quoted price.
|
||||
func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
|
||||
resp := retrievalmarket.PricingInput{}
|
||||
|
||||
head, err := rpn.full.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return resp, xerrors.Errorf("failed to get chain head: %w", err)
|
||||
}
|
||||
tsk := head.Key()
|
||||
|
||||
for _, dealID := range storageDeals {
|
||||
ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk)
|
||||
if err != nil {
|
||||
return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err)
|
||||
}
|
||||
if ds.Proposal.VerifiedDeal {
|
||||
resp.VerifiedDeal = true
|
||||
}
|
||||
|
||||
if ds.Proposal.PieceCID.Equals(pieceCID) {
|
||||
resp.PieceSize = ds.Proposal.PieceSize.Unpadded()
|
||||
}
|
||||
|
||||
// If we've discovered a verified deal with the required PieceCID, we don't need
|
||||
// to lookup more deals and we're done.
|
||||
if resp.VerifiedDeal && resp.PieceSize != 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Note: The piece size can never actually be zero. We only use it to here
|
||||
// to assert that we didn't find a matching piece.
|
||||
if resp.PieceSize == 0 {
|
||||
return resp, xerrors.New("failed to find matching piece")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
151
markets/retrievaladapter/provider_test.go
Normal file
151
markets/retrievaladapter/provider_test.go
Normal file
@ -0,0 +1,151 @@
|
||||
package retrievaladapter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
testnet "github.com/filecoin-project/go-fil-markets/shared_testutil"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/mocks"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
func TestGetPricingInput(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tsk := &types.TipSet{}
|
||||
key := tsk.Key()
|
||||
|
||||
pcid := testnet.GenerateCids(1)[0]
|
||||
deals := []abi.DealID{1, 2}
|
||||
paddedSize := abi.PaddedPieceSize(128)
|
||||
unpaddedSize := paddedSize.Unpadded()
|
||||
|
||||
tcs := map[string]struct {
|
||||
pieceCid cid.Cid
|
||||
deals []abi.DealID
|
||||
fFnc func(node *mocks.MockFullNode)
|
||||
|
||||
expectedErrorStr string
|
||||
expectedVerified bool
|
||||
expectedPieceSize abi.UnpaddedPieceSize
|
||||
}{
|
||||
"error when fails to fetch chain head": {
|
||||
fFnc: func(n *mocks.MockFullNode) {
|
||||
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, xerrors.New("chain head error")).Times(1)
|
||||
},
|
||||
expectedErrorStr: "chain head error",
|
||||
},
|
||||
|
||||
"error when no piece matches": {
|
||||
fFnc: func(n *mocks.MockFullNode) {
|
||||
out1 := &api.MarketDeal{
|
||||
Proposal: market.DealProposal{
|
||||
PieceCID: testnet.GenerateCids(1)[0],
|
||||
},
|
||||
}
|
||||
out2 := &api.MarketDeal{
|
||||
Proposal: market.DealProposal{
|
||||
PieceCID: testnet.GenerateCids(1)[0],
|
||||
},
|
||||
}
|
||||
|
||||
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
|
||||
gomock.InOrder(
|
||||
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil),
|
||||
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
|
||||
)
|
||||
|
||||
},
|
||||
expectedErrorStr: "failed to find matching piece",
|
||||
},
|
||||
|
||||
"verified is true even if one deal is verified and we get the correct piecesize": {
|
||||
fFnc: func(n *mocks.MockFullNode) {
|
||||
out1 := &api.MarketDeal{
|
||||
Proposal: market.DealProposal{
|
||||
PieceCID: pcid,
|
||||
PieceSize: paddedSize,
|
||||
},
|
||||
}
|
||||
out2 := &api.MarketDeal{
|
||||
Proposal: market.DealProposal{
|
||||
PieceCID: testnet.GenerateCids(1)[0],
|
||||
VerifiedDeal: true,
|
||||
},
|
||||
}
|
||||
|
||||
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
|
||||
gomock.InOrder(
|
||||
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil),
|
||||
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
|
||||
)
|
||||
|
||||
},
|
||||
expectedPieceSize: unpaddedSize,
|
||||
expectedVerified: true,
|
||||
},
|
||||
|
||||
"verified is false if both deals are unverified and we get the correct piece size": {
|
||||
fFnc: func(n *mocks.MockFullNode) {
|
||||
out1 := &api.MarketDeal{
|
||||
Proposal: market.DealProposal{
|
||||
PieceCID: pcid,
|
||||
PieceSize: paddedSize,
|
||||
VerifiedDeal: false,
|
||||
},
|
||||
}
|
||||
out2 := &api.MarketDeal{
|
||||
Proposal: market.DealProposal{
|
||||
PieceCID: testnet.GenerateCids(1)[0],
|
||||
VerifiedDeal: false,
|
||||
},
|
||||
}
|
||||
|
||||
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
|
||||
gomock.InOrder(
|
||||
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil),
|
||||
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
|
||||
)
|
||||
|
||||
},
|
||||
expectedPieceSize: unpaddedSize,
|
||||
expectedVerified: false,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockFull := mocks.NewMockFullNode(mockCtrl)
|
||||
rpn := &retrievalProviderNode{
|
||||
full: mockFull,
|
||||
}
|
||||
if tc.fFnc != nil {
|
||||
tc.fFnc(mockFull)
|
||||
}
|
||||
|
||||
resp, err := rpn.GetRetrievalPricingInput(ctx, pcid, deals)
|
||||
|
||||
if tc.expectedErrorStr != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tc.expectedErrorStr)
|
||||
require.Equal(t, retrievalmarket.PricingInput{}, resp)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedPieceSize, resp.PieceSize)
|
||||
require.Equal(t, tc.expectedVerified, resp.VerifiedDeal)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -409,8 +409,16 @@ var MinerNode = Options(
|
||||
|
||||
// Markets (retrieval)
|
||||
Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider),
|
||||
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(config.DealmakingConfig{
|
||||
RetrievalPricing: &config.RetrievalPricing{
|
||||
Strategy: config.RetrievalPricingDefaultMode,
|
||||
Default: &config.RetrievalPricingDefault{},
|
||||
},
|
||||
})),
|
||||
Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider),
|
||||
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
|
||||
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
|
||||
|
||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||
|
||||
// Markets (storage)
|
||||
@ -564,6 +572,19 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
|
||||
}
|
||||
|
||||
pricingConfig := cfg.Dealmaking.RetrievalPricing
|
||||
if pricingConfig.Strategy == config.RetrievalPricingExternalMode {
|
||||
if pricingConfig.External == nil {
|
||||
return Error(xerrors.New("retrieval pricing policy has been to set to external but external policy config is nil"))
|
||||
}
|
||||
|
||||
if pricingConfig.External.Path == "" {
|
||||
return Error(xerrors.New("retrieval pricing policy has been to set to external but external script path is empty"))
|
||||
}
|
||||
} else if pricingConfig.Strategy != config.RetrievalPricingDefaultMode {
|
||||
return Error(xerrors.New("retrieval pricing policy must be either default or external"))
|
||||
}
|
||||
|
||||
return Options(
|
||||
ConfigCommon(&cfg.Common),
|
||||
|
||||
@ -575,6 +596,8 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
|
||||
),
|
||||
|
||||
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
|
||||
|
||||
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{
|
||||
Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod),
|
||||
MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg,
|
||||
|
@ -14,6 +14,14 @@ import (
|
||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||
)
|
||||
|
||||
const (
|
||||
// RetrievalPricingDefault configures the node to use the default retrieval pricing policy.
|
||||
RetrievalPricingDefaultMode = "default"
|
||||
// RetrievalPricingExternal configures the node to use the external retrieval pricing script
|
||||
// configured by the user.
|
||||
RetrievalPricingExternalMode = "external"
|
||||
)
|
||||
|
||||
// Common is common config between full node and miner
|
||||
type Common struct {
|
||||
API API
|
||||
@ -70,6 +78,29 @@ type DealmakingConfig struct {
|
||||
|
||||
Filter string
|
||||
RetrievalFilter string
|
||||
|
||||
RetrievalPricing *RetrievalPricing
|
||||
}
|
||||
|
||||
type RetrievalPricing struct {
|
||||
Strategy string // possible values: "default", "external"
|
||||
|
||||
Default *RetrievalPricingDefault
|
||||
External *RetrievalPricingExternal
|
||||
}
|
||||
|
||||
type RetrievalPricingExternal struct {
|
||||
// Path of the external script that will be run to price a retrieval deal.
|
||||
// This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external".
|
||||
Path string
|
||||
}
|
||||
|
||||
type RetrievalPricingDefault struct {
|
||||
// VerifiedDealsFreeTransfer configures zero fees for data transfer for a retrieval deal
|
||||
// of a payloadCid that belongs to a verified storage deal.
|
||||
// This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "default".
|
||||
// default value is true
|
||||
VerifiedDealsFreeTransfer bool
|
||||
}
|
||||
|
||||
type SealingConfig struct {
|
||||
@ -326,6 +357,16 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
PublishMsgPeriod: Duration(time.Hour),
|
||||
MaxDealsPerPublishMsg: 8,
|
||||
MaxProviderCollateralMultiplier: 2,
|
||||
|
||||
RetrievalPricing: &RetrievalPricing{
|
||||
Strategy: RetrievalPricingDefaultMode,
|
||||
Default: &RetrievalPricingDefault{
|
||||
VerifiedDealsFreeTransfer: true,
|
||||
},
|
||||
External: &RetrievalPricingExternal{
|
||||
Path: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
Fees: MinerFeeConfig{
|
||||
|
@ -92,3 +92,5 @@ type GetExpectedSealDurationFunc func() (time.Duration, error)
|
||||
|
||||
type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
|
||||
type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error)
|
||||
|
||||
type RetrievalPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error)
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/markets/pricing"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/multierr"
|
||||
"golang.org/x/xerrors"
|
||||
@ -634,6 +635,20 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt
|
||||
}
|
||||
}
|
||||
|
||||
// RetrievalPricingFunc configures the pricing function to use for retrieval deals.
|
||||
func RetrievalPricingFunc(cfg config.DealmakingConfig) func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
|
||||
_ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc {
|
||||
|
||||
return func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
|
||||
_ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc {
|
||||
if cfg.RetrievalPricing.Strategy == config.RetrievalPricingExternalMode {
|
||||
return pricing.ExternalRetrievalPricingFunc(cfg.RetrievalPricing.External.Path)
|
||||
}
|
||||
|
||||
return retrievalimpl.DefaultPricingFunc(cfg.RetrievalPricing.Default.VerifiedDealsFreeTransfer)
|
||||
}
|
||||
}
|
||||
|
||||
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
|
||||
func RetrievalProvider(h host.Host,
|
||||
miner *storage.Miner,
|
||||
@ -643,6 +658,7 @@ func RetrievalProvider(h host.Host,
|
||||
mds dtypes.StagingMultiDstore,
|
||||
dt dtypes.ProviderDataTransfer,
|
||||
pieceProvider sectorstorage.PieceProvider,
|
||||
pricingFnc dtypes.RetrievalPricingFunc,
|
||||
userFilter dtypes.RetrievalDealFilter,
|
||||
) (retrievalmarket.RetrievalProvider, error) {
|
||||
adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full)
|
||||
@ -655,7 +671,8 @@ func RetrievalProvider(h host.Host,
|
||||
netwk := rmnet.NewFromLibp2pHost(h)
|
||||
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
|
||||
|
||||
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
|
||||
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")),
|
||||
retrievalimpl.RetrievalPricingFunc(pricingFnc), opt)
|
||||
}
|
||||
|
||||
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
|
||||
|
Loading…
Reference in New Issue
Block a user