sector import: Plumbing for DownloadSectorData in the sealing system

This commit is contained in:
Łukasz Magiera 2022-08-31 13:56:25 +02:00
parent 39e4845f42
commit fbb487ae2b
17 changed files with 235 additions and 27 deletions

View File

@ -3,7 +3,6 @@ package api
import (
"bytes"
"context"
"net/http"
"time"
"github.com/google/uuid"
@ -170,6 +169,7 @@ type StorageMiner interface {
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error //perm:admin retry:true
ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
// SealingSchedDiag dumps internal sealing scheduler state
@ -563,11 +563,11 @@ type RemoteSectorMeta struct {
// Sector urls - lotus will use those for fetching files into local storage
// Required in all states
DataUnsealed *SectorData
DataUnsealed *storiface.SectorData
// Required in PreCommitting and later
DataSealed *SectorData
DataCache *SectorData
DataSealed *storiface.SectorData
DataCache *storiface.SectorData
////////
// SEALING SERVICE HOOKS
@ -575,20 +575,3 @@ type RemoteSectorMeta struct {
// todo Commit1Provider
// todo OnDone / OnStateChange
}
type SectorData struct {
// Local when set to true indicates to lotus that sector data is already
// available locally; When set lotus will skip fetching sector data, and
// only check that sector data exists in sector storage
Local bool
// URL to the sector data
// For sealed/unsealed sector, lotus expects octet-stream
// For cache, lotus expects a tar archive with cache files (todo maybe use not-tar; specify what files with what paths must be present)
// Valid schemas:
// - http:// / https://
URL string
// optional http headers to use when requesting sector data
Headers http.Header
}

View File

@ -49,6 +49,7 @@ type Worker interface {
MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) //perm:admin

View File

@ -344,6 +344,14 @@ func init() {
addExample(http.Header{
"Authorization": []string{"Bearer ey.."},
})
addExample(map[storiface.SectorFileType]storiface.SectorData{
storiface.FTSealed: {
Local: false,
URL: "https://example.com/sealingservice/sectors/s-f0123-12345",
Headers: nil,
},
})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -778,6 +778,8 @@ type StorageMinerStruct struct {
ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
ReturnDownloadSector func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
@ -953,6 +955,8 @@ type WorkerStruct struct {
DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data) (storiface.CallID, error) `perm:"admin"`
DownloadSectorData func(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) `perm:"admin"`
Enabled func(p0 context.Context) (bool, error) `perm:"admin"`
Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
@ -4667,6 +4671,17 @@ func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnDownloadSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnDownloadSector == nil {
return ErrNotSupported
}
return s.Internal.ReturnDownloadSector(p0, p1, p2)
}
func (s *StorageMinerStub) ReturnDownloadSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFetch == nil {
return ErrNotSupported
@ -5536,6 +5551,17 @@ func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 st
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
if s.Internal.DownloadSectorData == nil {
return *new(storiface.CallID), ErrNotSupported
}
return s.Internal.DownloadSectorData(p0, p1, p2, p3)
}
func (s *WorkerStub) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) {
if s.Internal.Enabled == nil {
return false, ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -113,6 +113,7 @@
* [Return](#Return)
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnDataCid](#ReturnDataCid)
* [ReturnDownloadSector](#ReturnDownloadSector)
* [ReturnFetch](#ReturnFetch)
* [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate)
* [ReturnFinalizeSector](#ReturnFinalizeSector)
@ -2389,6 +2390,30 @@ Inputs:
Response: `{}`
### ReturnDownloadSector
Perms: admin
Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Code": 0,
"Message": "string value"
}
]
```
Response: `{}`
### ReturnFetch

View File

@ -12,6 +12,8 @@
* [AddPiece](#AddPiece)
* [Data](#Data)
* [DataCid](#DataCid)
* [Download](#Download)
* [DownloadSectorData](#DownloadSectorData)
* [Finalize](#Finalize)
* [FinalizeReplicaUpdate](#FinalizeReplicaUpdate)
* [FinalizeSector](#FinalizeSector)
@ -1542,6 +1544,46 @@ Response:
}
```
## Download
### DownloadSectorData
Perms: admin
Inputs:
```json
[
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
true,
{
"2": {
"Local": false,
"URL": "https://example.com/sealingservice/sectors/s-f0123-12345",
"Headers": null
}
}
]
```
Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```
## Finalize

View File

@ -2,12 +2,13 @@ package sealing
import (
"context"
"github.com/filecoin-project/go-statemachine"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
)
@ -125,6 +126,7 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error {
// todo fetch stuff
// m.sealer.DownloadSectorData(ctx, m.minerSector(sector.SectorType, sector.SectorNumber), )
panic("todo")
return ctx.Send(SectorReceived{})
}

View File

@ -1127,6 +1127,10 @@ func (sb *Sealer) Remove(ctx context.Context, sector storiface.SectorRef) error
return xerrors.Errorf("not supported at this layer") // happens in localworker
}
func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
panic("todo")
}
func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
padPieces := make([]abi.PaddedPieceSize, 0)

View File

@ -5,6 +5,7 @@ import (
"errors"
"io"
"net/http"
"sort"
"sync"
"github.com/google/uuid"
@ -1084,6 +1085,78 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storiface.Sect
return out, waitErr
}
func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var toFetch storiface.SectorFileType
// get a sorted list of sectors files to make a consistent work key from
ents := make([]struct {
T storiface.SectorFileType
S storiface.SectorData
}, 0, len(src))
for fileType, data := range src {
if len(fileType.AllSet()) != 1 {
return xerrors.Errorf("sector data entry must be for a single file type")
}
toFetch |= fileType
ents = append(ents, struct {
T storiface.SectorFileType
S storiface.SectorData
}{T: fileType, S: data})
}
sort.Slice(ents, func(i, j int) bool {
return ents[i].T < ents[j].T
})
// get a work key
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTDownloadSector, sector, ents)
if err != nil {
return xerrors.Errorf("getWork: %w", err)
}
defer cancel()
var waitErr error
waitRes := func() {
_, werr := m.waitWork(ctx, wk)
if werr != nil {
waitErr = werr
return
}
}
if wait { // already in progress
waitRes()
return waitErr
}
ptype := storiface.PathSealing
if finalized {
ptype = storiface.PathStorage
}
selector := newAllocSelector(m.index, toFetch, ptype)
err = m.sched.Schedule(ctx, sector, sealtasks.TTDownloadSector, selector, schedNop, func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, w, wk)(w.DownloadSectorData(ctx, sector, finalized, src))
if err != nil {
return err
}
waitRes()
return nil
})
if err != nil {
return err
}
return waitErr
}
func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(ctx, callID, pi, err)
}
@ -1148,6 +1221,10 @@ func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID,
return m.returnResult(ctx, callID, ok, err)
}
func (m *Manager) ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}

View File

@ -31,6 +31,8 @@ const (
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTFinalizeReplicaUpdate TaskType = "seal/v0/finalize/replicaupdate"
TTDownloadSector TaskType = "seal/v0/download/sector"
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
)
@ -48,11 +50,12 @@ var order = map[TaskType]int{
TTCommit1: 2,
TTUnseal: 1,
TTFetch: -1,
TTFinalize: -2,
TTFetch: -1,
TTDownloadSector: -2,
TTFinalize: -3,
TTGenerateWindowPoSt: -3,
TTGenerateWinningPoSt: -4, // most priority
TTGenerateWindowPoSt: -4,
TTGenerateWinningPoSt: -5, // most priority
}
var shortNames = map[TaskType]string{
@ -75,6 +78,8 @@ var shortNames = map[TaskType]string{
TTRegenSectorKey: "GSK",
TTFinalizeReplicaUpdate: "FRU",
TTDownloadSector: "DL",
TTGenerateWindowPoSt: "WDP",
TTGenerateWinningPoSt: "WNP",
}

View File

@ -3,6 +3,7 @@ package storiface
import (
"context"
"io"
"net/http"
"github.com/ipfs/go-cid"
@ -85,6 +86,8 @@ type Sealer interface {
GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorData) error
}
type Unsealer interface {
@ -119,3 +122,20 @@ type Prover interface {
AggregateSealProofs(aggregateInfo proof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error)
}
type SectorData struct {
// Local when set to true indicates to lotus that sector data is already
// available locally; When set lotus will skip fetching sector data, and
// only check that sector data exists in sector storage
Local bool
// URL to the sector data
// For sealed/unsealed sector, lotus expects octet-stream
// For cache, lotus expects a tar archive with cache files (todo maybe use not-tar; specify what files with what paths must be present)
// Valid schemas:
// - http:// / https://
URL string
// optional http headers to use when requesting sector data
Headers http.Header
}

View File

@ -134,6 +134,7 @@ type WorkerCalls interface {
MoveStorage(ctx context.Context, sector SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorData) (CallID, error)
// sync
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error)
@ -215,5 +216,6 @@ type WorkerReturn interface {
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnDownloadSector(ctx context.Context, callID CallID, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
}

View File

@ -202,6 +202,7 @@ const (
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
DownloadSector ReturnType = "DownloadSector"
Fetch ReturnType = "Fetch"
)
@ -255,6 +256,7 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac
FinalizeReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnFinalizeReplicaUpdate),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
DownloadSector: rfunc(storiface.WorkerReturn.ReturnDownloadSector),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
@ -586,6 +588,17 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe
})
}
func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, DownloadSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return nil, sb.DownloadSectorData(ctx, sector, finalized, src)
})
}
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
sb, err := l.executor()
if err != nil {