dpr changes and test based on new unsealing PR

This commit is contained in:
aarshkshah1992 2021-05-22 22:40:21 +05:30
parent 50e023edd3
commit dc6dbc9a11
16 changed files with 715 additions and 29 deletions

View File

@ -235,7 +235,7 @@ var retrievalSetAskCmd = &cli.Command{
var retrievalGetAskCmd = &cli.Command{ var retrievalGetAskCmd = &cli.Command{
Name: "get-ask", Name: "get-ask",
Usage: "Get the provider's current retrieval ask", Usage: "Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command",
Flags: []cli.Flag{}, Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
ctx := lcli.DaemonContext(cctx) ctx := lcli.DaemonContext(cctx)

View File

@ -745,7 +745,7 @@ COMMANDS:
selection Configure acceptance criteria for retrieval deal proposals selection Configure acceptance criteria for retrieval deal proposals
list List all active retrieval deals for this miner list List all active retrieval deals for this miner
set-ask Configure the provider's retrieval ask set-ask Configure the provider's retrieval ask
get-ask Get the provider's current retrieval ask get-ask Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
help, h Shows a list of commands or help for one command help, h Shows a list of commands or help for one command
OPTIONS: OPTIONS:
@ -848,7 +848,7 @@ OPTIONS:
### lotus-miner retrieval-deals get-ask ### lotus-miner retrieval-deals get-ask
``` ```
NAME: NAME:
lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask lotus-miner retrieval-deals get-ask - Get the provider's current retrieval ask configured by the provider in the ask-store using the set-ask CLI command
USAGE: USAGE:
lotus-miner retrieval-deals get-ask [command options] [arguments...] lotus-miner retrieval-deals get-ask [command options] [arguments...]

View File

@ -381,6 +381,10 @@ func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, o
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil
} }
func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
return false, nil
}
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) { func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
psize, err := spt.SectorSize() psize, err := spt.SectorSize()
if err != nil { if err != nil {

View File

@ -24,8 +24,11 @@ type Unsealer interface {
type PieceProvider interface { type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
} }
var _ PieceProvider = &pieceProvider{}
type pieceProvider struct { type pieceProvider struct {
storage *stores.Remote storage *stores.Remote
index stores.SectorIndex index stores.SectorIndex
@ -40,6 +43,26 @@ func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unse
} }
} }
// IsUnsealed checks if we have the unsealed piece at the given offset in an already
// existing unsealed file either locally or on any of the workers.
func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
if err := offset.Valid(); err != nil {
return false, xerrors.Errorf("offset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return false, xerrors.Errorf("size is not a valid piece size: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return false, xerrors.Errorf("acquiring read sector lock: %w", err)
}
defer cancel()
return p.storage.CheckIsUnsealed(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
}
// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it. // tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it.
// It will NOT try to schedule an Unseal of a sealed sector file for the read. // It will NOT try to schedule an Unseal of a sealed sector file for the read.
// //

View File

@ -53,6 +53,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// pre-commit 1 // pre-commit 1
preCommit1 := ppt.preCommit1(t) preCommit1 := ppt.preCommit1(t)
// check if IsUnsealed -> true
require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size))
// read piece // read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData) false, pieceData)
@ -60,6 +62,8 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// pre-commit 2 // pre-commit 2
ppt.preCommit2(t, preCommit1) ppt.preCommit2(t, preCommit1)
// check if IsUnsealed -> true
require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size))
// read piece // read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData) false, pieceData)
@ -67,10 +71,14 @@ func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// finalize -> nil here will remove unsealed file // finalize -> nil here will remove unsealed file
ppt.finalizeSector(t, nil) ppt.finalizeSector(t, nil)
// check if IsUnsealed -> false
require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size))
// Read the piece -> will have to unseal // Read the piece -> will have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
true, pieceData) true, pieceData)
// check if IsUnsealed -> true
require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), size))
// read the piece -> will not have to unseal // read the piece -> will not have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData) false, pieceData)
@ -118,12 +126,18 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
// pre-commit 1 // pre-commit 1
pC1 := ppt.preCommit1(t) pC1 := ppt.preCommit1(t)
// check if IsUnsealed -> true
require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> no need to unseal // Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1) false, pd1)
// pre-commit 2 // pre-commit 2
ppt.preCommit2(t, pC1) ppt.preCommit2(t, pC1)
// check if IsUnsealed -> true
require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> no need to unseal // Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1) false, pd1)
@ -133,6 +147,8 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
// sending nil here will remove all unsealed files after sector is finalized. // sending nil here will remove all unsealed files after sector is finalized.
ppt.finalizeSector(t, nil) ppt.finalizeSector(t, nil)
// check if IsUnsealed -> false
require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size))
// Read the piece -> have to unseal since we removed the file. // Read the piece -> have to unseal since we removed the file.
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1) true, pd1)
@ -142,14 +158,21 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
// remove the unsealed file and read again -> will have to unseal. // remove the unsealed file and read again -> will have to unseal.
ppt.removeAllUnsealedSectorFiles(t) ppt.removeAllUnsealedSectorFiles(t)
// check if IsUnsealed -> false
require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(0), pd1size))
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1) true, pd1)
// check if IsUnsealed -> true
require.True(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
// Read Piece 2 -> no unsealing as it got unsealed above. // Read Piece 2 -> no unsealing as it got unsealed above.
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2) ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)
// remove all unseal files -> Read Piece 2 -> will have to Unseal. // remove all unseal files -> Read Piece 2 -> will have to Unseal.
ppt.removeAllUnsealedSectorFiles(t) ppt.removeAllUnsealedSectorFiles(t)
// check if IsUnsealed -> false
require.False(t, ppt.isUnealed(t, storiface.UnpaddedByteIndex(pd1size), pd2size))
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2) ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
} }
@ -306,6 +329,12 @@ func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreC
p.commD = commD p.commD = commD
} }
func (p *pieceProviderTestHarness) isUnealed(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) bool {
b, err := p.pp.IsUnsealed(p.ctx, p.sector, offset, size)
require.NoError(t, err)
return b
}
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) { expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD) rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)

View File

@ -484,6 +484,78 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa
return resp.Body, nil return resp.Body, nil
} }
// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece
// either locally or on any of the workers.
// Returns true if we have the unsealed piece, false otherwise.
func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (bool, error) {
ft := storiface.FTUnsealed
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return false, xerrors.Errorf("acquire local: %w", err)
}
path := storiface.PathByType(paths, ft)
if path != "" {
// if we have the unsealed file locally, check if it has the unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return false, err
}
// open the unsealed sector file for the given sector size located at the given path.
pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return false, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size)
// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
// in the unsealed sector file. That is what `HasAllocated` checks for.
has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return false, xerrors.Errorf("has allocated: %w", err)
}
if err := r.pfHandler.Close(pf); err != nil {
return false, xerrors.Errorf("failed to close partial file: %s", err)
}
log.Debugf("checked if local partial file has the piece %s (+%d,%d), returning answer=%t", path, offset, size, has)
return has, nil
}
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return false, xerrors.Errorf("StorageFindSector: %s", err)
}
if len(si) == 0 {
return false, nil
}
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
for _, info := range si {
for _, url := range info.URLs {
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}
return true, nil
}
}
return false, nil
}
// Reader returns a reader for an unsealed piece at the given offset in the given sector. // Reader returns a reader for an unsealed piece at the given offset in the given sector.
// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy. // If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy.
// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file // If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file

View File

@ -283,8 +283,6 @@ func TestReader(t *testing.T) {
expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType), expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType),
getAllocatedReturnCode: tc.getAllocatedReturnCode, getAllocatedReturnCode: tc.getAllocatedReturnCode,
getSectorReturnCode: tc.getSectorReturnCode,
getSectorBytes: tc.expectedSectorBytes,
}) })
defer ts.Close() defer ts.Close()
tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID)) tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID))
@ -326,6 +324,244 @@ func TestReader(t *testing.T) {
} }
} }
func TestCheckIsUnsealed(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
pfPath := "path"
ft := storiface.FTUnsealed
emptyPartialFile := &partialfile.PartialFile{}
sectorRef := storage.SectorRef{
ID: abi.SectorID{
Miner: 123,
Number: 123,
},
ProofType: 1,
}
sectorSize := abi.SealProofInfos[1].SectorSize
offset := abi.PaddedPieceSize(100)
size := abi.PaddedPieceSize(1000)
ctx := context.Background()
tcs := map[string]struct {
storeFnc func(s *mocks.MockStore)
pfFunc func(s *mocks.MockpartialFileHandler)
indexFnc func(s *mocks.MockSectorIndex, serverURL string)
needHttpServer bool
getAllocatedReturnCode int
serverUrl string
// expectation
errStr string
expectedIsUnealed bool
}{
// -------- have the unsealed file locally
"fails when error while acquiring unsealed file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error"))
},
errStr: "acquire error",
},
"fails when error while opening local partial (unsealed) file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error"))
},
errStr: "pf open error",
},
"fails when error while checking if local unsealed file has piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
true, xerrors.New("piece check error"))
},
errStr: "piece check error",
},
"fails when error while closing local unsealed file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
false, nil)
pf.EXPECT().Close(emptyPartialFile).Return(xerrors.New("close error")).Times(1)
},
errStr: "close error",
},
// ------------------- don't have the unsealed file locally
"fails when error while finding sector": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return(nil, xerrors.New("find sector error"))
},
errStr: "find sector error",
},
"false when no worker has unsealed file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return(nil, nil)
},
},
// false when local unsealed file does NOT have unsealed piece
"false when local unsealed file does not have the piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
false, nil)
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
},
},
"false when none of the worker has the unsealed piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, url string) {
si := stores.SectorStorageInfo{
URLs: []string{url},
}
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
},
needHttpServer: true,
getAllocatedReturnCode: 500,
},
// ---- Success for local unsealed file
"true when local unsealed file has the piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
true, nil)
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
},
expectedIsUnealed: true,
},
// --- Success for remote unsealed file
"successfully fetches reader for piece from remote unsealed piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, url string) {
si := stores.SectorStorageInfo{
URLs: []string{url},
}
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
},
needHttpServer: true,
getAllocatedReturnCode: 200,
expectedIsUnealed: true,
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create them mocks
lstore := mocks.NewMockStore(mockCtrl)
pfhandler := mocks.NewMockpartialFileHandler(mockCtrl)
index := mocks.NewMockSectorIndex(mockCtrl)
if tc.storeFnc != nil {
tc.storeFnc(lstore)
}
if tc.pfFunc != nil {
tc.pfFunc(pfhandler)
}
if tc.needHttpServer {
// run http server
ts := httptest.NewServer(&mockHttpServer{
expectedSectorName: storiface.SectorName(sectorRef.ID),
expectedFileType: ft.String(),
expectedOffset: fmt.Sprintf("%d", offset.Unpadded()),
expectedSize: fmt.Sprintf("%d", size.Unpadded()),
expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType),
getAllocatedReturnCode: tc.getAllocatedReturnCode,
})
defer ts.Close()
tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID))
}
if tc.indexFnc != nil {
tc.indexFnc(index, tc.serverUrl)
}
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
isUnsealed, err := remoteStore.CheckIsUnsealed(ctx, sectorRef, offset, size)
if tc.errStr != "" {
require.Error(t, err)
require.False(t, isUnsealed)
require.Contains(t, err.Error(), tc.errStr)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedIsUnealed, isUnsealed)
})
}
}
func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) { func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) {
l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed, l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{ storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
@ -358,13 +594,10 @@ type mockHttpServer struct {
expectedSectorType string expectedSectorType string
getAllocatedReturnCode int getAllocatedReturnCode int
getSectorReturnCode int
getSectorBytes []byte
} }
func (m *mockHttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (m *mockHttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux := mux.NewRouter() mux := mux.NewRouter()
mux.HandleFunc("/remote/{type}/{id}", m.getSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", m.getAllocated).Methods("GET") mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", m.getAllocated).Methods("GET")
mux.ServeHTTP(w, r) mux.ServeHTTP(w, r)
} }
@ -399,20 +632,3 @@ func (m *mockHttpServer) getAllocated(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(m.getAllocatedReturnCode) w.WriteHeader(m.getAllocatedReturnCode)
} }
func (m *mockHttpServer) getSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
if vars["id"] != m.expectedSectorName {
w.WriteHeader(http.StatusBadRequest)
return
}
if vars["type"] != m.expectedFileType {
w.WriteHeader(http.StatusBadRequest)
return
}
w.WriteHeader(m.getSectorReturnCode)
_, _ = w.Write(m.getSectorBytes)
}

2
go.mod
View File

@ -35,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.5.0 github.com/filecoin-project/go-data-transfer v1.5.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.3.0 github.com/filecoin-project/go-fil-markets v1.2.6-0.20210522045113-7d33a6e5f793
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20

4
go.sum
View File

@ -277,8 +277,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.3.0 h1:yYWHO5x87i+5UlqBwlMVk4oN2GPNfQ0WG6LdORArL/o= github.com/filecoin-project/go-fil-markets v1.2.6-0.20210522045113-7d33a6e5f793 h1:t2u3m3cQM4MFxtQ2EZQkPGtUNUW/NAADbtmTAL44WSw=
github.com/filecoin-project/go-fil-markets v1.3.0/go.mod h1:v8QjFAGf5h2wKH3saYjGOu3pOFUoVQ1Uhow4gIcUR3I= github.com/filecoin-project/go-fil-markets v1.2.6-0.20210522045113-7d33a6e5f793/go.mod h1:v8QjFAGf5h2wKH3saYjGOu3pOFUoVQ1Uhow4gIcUR3I=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

48
markets/pricing/cli.go Normal file
View File

@ -0,0 +1,48 @@
package pricing
import (
"bytes"
"context"
"encoding/json"
"os/exec"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"golang.org/x/xerrors"
)
func ExternalRetrievalPricingFunc(cmd string) dtypes.RetrievalPricingFunc {
return func(ctx context.Context, pricingInput retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
return runPricingFunc(ctx, cmd, pricingInput)
}
}
func runPricingFunc(_ context.Context, cmd string, params interface{}) (retrievalmarket.Ask, error) {
j, err := json.Marshal(params)
if err != nil {
return retrievalmarket.Ask{}, err
}
var out bytes.Buffer
var errb bytes.Buffer
c := exec.Command("sh", "-c", cmd)
c.Stdin = bytes.NewReader(j)
c.Stdout = &out
c.Stderr = &errb
switch err := c.Run().(type) {
case nil:
bz := out.Bytes()
resp := retrievalmarket.Ask{}
if err := json.Unmarshal(bz, &resp); err != nil {
return resp, xerrors.Errorf("failed to parse pricing output %s, err=%w", string(bz), err)
}
return resp, nil
case *exec.ExitError:
return retrievalmarket.Ask{}, xerrors.Errorf("pricing func exited with error: %s", errb.String())
default:
return retrievalmarket.Ask{}, xerrors.Errorf("pricing func cmd run error: %w", err)
}
}

View File

@ -99,3 +99,69 @@ func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipS
return head.Key().Bytes(), head.Height(), nil return head.Key().Bytes(), head.Height(), nil
} }
func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
si, err := rpn.miner.GetSectorInfo(sectorID)
if err != nil {
return false, xerrors.Errorf("failed to get sectorinfo, err=%s", err)
}
mid, err := address.IDFromAddress(rpn.miner.Address())
if err != nil {
return false, err
}
ref := specstorage.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: sectorID,
},
ProofType: si.SectorType,
}
log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length)
return rpn.pp.IsUnsealed(ctx, ref, storiface.UnpaddedByteIndex(offset), length)
}
// `storageDeals` param here is the list of storage deals made for the `payloadCID` the retrieval client is looking for.
//
// `pieceCID` is the CID of the specific Piece we want to retrieve the payload from. The client can either mandate that
// we retrieve the payload from a specific piece or we choose a Piece to retrieve the payload from, prioritizing
// a Piece for which an unsealed sector file already exists if possible.
//
// 1. For the `VerifiedDeal` flag in the response `PricingInput`, we are looking to answer the question "does there exist any verified storage deal for this `payloadCID`" ?
//
// 2. We also want to ensure that we return the `PieceSize` for the actual piece we want to retrieve the deal from.
func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
resp := retrievalmarket.PricingInput{}
head, err := rpn.full.ChainHead(ctx)
if err != nil {
return resp, xerrors.Errorf("failed to get chain head: %w", err)
}
tsk := head.Key()
for _, dealID := range storageDeals {
ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk)
if err != nil {
return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err)
}
if ds.Proposal.VerifiedDeal {
resp.VerifiedDeal = true
}
if ds.Proposal.PieceCID.Equals(pieceCID) {
resp.PieceSize = ds.Proposal.PieceSize.Unpadded()
}
if resp.VerifiedDeal && resp.PieceSize != 0 {
break
}
}
if resp.PieceSize == 0 {
return resp, xerrors.New("failed to find matching piece, PieceSize is zero")
}
return resp, nil
}

View File

@ -0,0 +1,151 @@
package retrievaladapter
import (
"context"
"testing"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
testnet "github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/mocks"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
"github.com/golang/mock/gomock"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
)
func TestGetPricingInput(t *testing.T) {
ctx := context.Background()
tsk := &types.TipSet{}
key := tsk.Key()
pcid := testnet.GenerateCids(1)[0]
deals := []abi.DealID{1, 2}
paddedSize := abi.PaddedPieceSize(128)
unpaddedSize := paddedSize.Unpadded()
tcs := map[string]struct {
pieceCid cid.Cid
deals []abi.DealID
fFnc func(node *mocks.MockFullNode)
expectedErrorStr string
expectedVerified bool
expectedPieceSize abi.UnpaddedPieceSize
}{
"error when fails to fetch chain head": {
fFnc: func(n *mocks.MockFullNode) {
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, xerrors.New("chain head error")).Times(1)
},
expectedErrorStr: "chain head error",
},
"error when no piece matches": {
fFnc: func(n *mocks.MockFullNode) {
out1 := &api.MarketDeal{
Proposal: market.DealProposal{
PieceCID: testnet.GenerateCids(1)[0],
},
}
out2 := &api.MarketDeal{
Proposal: market.DealProposal{
PieceCID: testnet.GenerateCids(1)[0],
},
}
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
gomock.InOrder(
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil),
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
)
},
expectedErrorStr: "failed to find matching piece",
},
"verified is true even if one deal is verified and we get the correct piecesize": {
fFnc: func(n *mocks.MockFullNode) {
out1 := &api.MarketDeal{
Proposal: market.DealProposal{
PieceCID: pcid,
PieceSize: paddedSize,
},
}
out2 := &api.MarketDeal{
Proposal: market.DealProposal{
PieceCID: testnet.GenerateCids(1)[0],
VerifiedDeal: true,
},
}
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
gomock.InOrder(
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil),
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
)
},
expectedPieceSize: unpaddedSize,
expectedVerified: true,
},
"verified is false if both deals are unverified and we get the correct piece size": {
fFnc: func(n *mocks.MockFullNode) {
out1 := &api.MarketDeal{
Proposal: market.DealProposal{
PieceCID: pcid,
PieceSize: paddedSize,
VerifiedDeal: false,
},
}
out2 := &api.MarketDeal{
Proposal: market.DealProposal{
PieceCID: testnet.GenerateCids(1)[0],
VerifiedDeal: false,
},
}
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
gomock.InOrder(
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, nil),
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
)
},
expectedPieceSize: unpaddedSize,
expectedVerified: false,
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
mockFull := mocks.NewMockFullNode(mockCtrl)
rpn := &retrievalProviderNode{
full: mockFull,
}
if tc.fFnc != nil {
tc.fFnc(mockFull)
}
resp, err := rpn.GetRetrievalPricingInput(ctx, pcid, deals)
if tc.expectedErrorStr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorStr)
require.Equal(t, retrievalmarket.PricingInput{}, resp)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedPieceSize, resp.PieceSize)
require.Equal(t, tc.expectedVerified, resp.VerifiedDeal)
}
})
}
}

View File

@ -407,9 +407,16 @@ var MinerNode = Options(
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
// Markets (retrieval) // Markets (retrieval)
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(config.DealmakingConfig{
RetrievalPricing: &config.RetrievalPricing{
Strategy: config.DefaultRetrievalPricing,
Default: &config.RetrievalPricingDefault{},
},
})),
Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider), Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(HandleRetrievalKey, modules.HandleRetrieval), Override(HandleRetrievalKey, modules.HandleRetrieval),
// Markets (storage) // Markets (storage)
@ -563,6 +570,17 @@ func ConfigStorageMiner(c interface{}) Option {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
} }
pricingConfig := cfg.Dealmaking.RetrievalPricing
if pricingConfig.Strategy == config.ExternalRetrievalPricing {
if pricingConfig.External == nil {
return Error(xerrors.New("retrieval pricing policy has been to set to external but external policy config is nil"))
}
if pricingConfig.External.Path == "" {
return Error(xerrors.New("retrieval pricing policy has been to set to external but external script path is empty"))
}
}
return Options( return Options(
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
@ -574,6 +592,8 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
), ),
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{
Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg,

View File

@ -10,6 +10,14 @@ import (
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
) )
const (
// DefaultRetrievalPricing configures the node to use the default retrieval pricing policy.
DefaultRetrievalPricing = "default"
// ExternalRetrievalPricing configures the node to use the external retrieval pricing script
// configured by the user.
ExternalRetrievalPricing = "external"
)
// Common is common config between full node and miner // Common is common config between full node and miner
type Common struct { type Common struct {
API API API API
@ -66,6 +74,29 @@ type DealmakingConfig struct {
Filter string Filter string
RetrievalFilter string RetrievalFilter string
RetrievalPricing *RetrievalPricing
}
type RetrievalPricing struct {
Strategy string // possible values: "default", "external"
Default *RetrievalPricingDefault
External *RetrievalPricingExternal
}
type RetrievalPricingExternal struct {
// Path of the external script that will be run to price a retrieval deal.
// This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external".
Path string
}
type RetrievalPricingDefault struct {
// VerifiedDealsFreeTransfer configures zero fees for data transfer for a retrieval deal
// of a payloadCid that belongs to a verified storage deal.
// This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "default".
// default value is true
VerifiedDealsFreeTransfer bool
} }
type SealingConfig struct { type SealingConfig struct {
@ -264,6 +295,13 @@ func DefaultStorageMiner() *StorageMiner {
PublishMsgPeriod: Duration(time.Hour), PublishMsgPeriod: Duration(time.Hour),
MaxDealsPerPublishMsg: 8, MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2, MaxProviderCollateralMultiplier: 2,
RetrievalPricing: &RetrievalPricing{
Strategy: DefaultRetrievalPricing,
Default: &RetrievalPricingDefault{
VerifiedDealsFreeTransfer: true,
},
},
}, },
Fees: MinerFeeConfig{ Fees: MinerFeeConfig{

View File

@ -90,3 +90,5 @@ type GetExpectedSealDurationFunc func() (time.Duration, error)
type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error)
type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error)
type RetrievalPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error)

View File

@ -10,6 +10,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"github.com/filecoin-project/lotus/markets/pricing"
"go.uber.org/fx" "go.uber.org/fx"
"go.uber.org/multierr" "go.uber.org/multierr"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -632,6 +633,20 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt
} }
} }
// RetrievalPricingFunc configures the pricing function to use for retrieval deals.
func RetrievalPricingFunc(cfg config.DealmakingConfig) func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
_ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc {
return func(_ dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
_ dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalPricingFunc {
if cfg.RetrievalPricing.Strategy == config.ExternalRetrievalPricing {
return pricing.ExternalRetrievalPricingFunc(cfg.RetrievalPricing.External.Path)
}
return retrievalimpl.DefaultPricingFunc(cfg.RetrievalPricing.Default.VerifiedDealsFreeTransfer)
}
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore // RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, func RetrievalProvider(h host.Host,
miner *storage.Miner, miner *storage.Miner,
@ -641,6 +656,7 @@ func RetrievalProvider(h host.Host,
mds dtypes.StagingMultiDstore, mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer, dt dtypes.ProviderDataTransfer,
pieceProvider sectorstorage.PieceProvider, pieceProvider sectorstorage.PieceProvider,
pricingFnc dtypes.RetrievalPricingFunc,
userFilter dtypes.RetrievalDealFilter, userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) { ) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full) adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full)
@ -653,7 +669,8 @@ func RetrievalProvider(h host.Host,
netwk := rmnet.NewFromLibp2pHost(h) netwk := rmnet.NewFromLibp2pHost(h)
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")),
retrievalimpl.RetrievalPricingFunc(pricingFnc), opt)
} }
var WorkerCallsPrefix = datastore.NewKey("/worker/calls") var WorkerCallsPrefix = datastore.NewKey("/worker/calls")