Merge pull request #9210 from filecoin-project/feat/recievesector

feat: sealing: Partially sealed sector import
This commit is contained in:
Łukasz Magiera 2022-09-19 13:04:39 +02:00 committed by GitHub
commit 4cdeb6cdb1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 2952 additions and 287 deletions

View File

@ -974,6 +974,16 @@ workflows:
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"
- test:
name: test-itest-sector_import_full
suite: itest-sector_import_full
target: "./itests/sector_import_full_test.go"
- test:
name: test-itest-sector_import_simple
suite: itest-sector_import_simple
target: "./itests/sector_import_simple_test.go"
- test:
name: test-itest-sector_make_cc_avail
suite: itest-sector_make_cc_avail

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v8/market"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
abinetwork "github.com/filecoin-project/go-state-types/network"
@ -144,6 +145,8 @@ type StorageMiner interface {
// SectorNumFree drops a sector reservation
SectorNumFree(ctx context.Context, name string) error //perm:admin
SectorReceive(ctx context.Context, meta RemoteSectorMeta) error //perm:admin
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error //perm:admin retry:true
WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) //perm:admin
@ -166,6 +169,7 @@ type StorageMiner interface {
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error //perm:admin retry:true
ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
// SealingSchedDiag dumps internal sealing scheduler state
@ -504,3 +508,109 @@ type NumAssignerMeta struct {
Next abi.SectorNumber
}
type RemoteSectorMeta struct {
////////
// BASIC SECTOR INFORMATION
// State specifies the first state the sector will enter after being imported
// Must be one of the following states:
// * Packing
// * GetTicket
// * PreCommitting
// * SubmitCommit
// * Proving/Available
State SectorState
Sector abi.SectorID
Type abi.RegisteredSealProof
////////
// SEALING METADATA
// (allows lotus to continue the sealing process)
// Required in Packing and later
Pieces []SectorPiece // todo better type?
// Required in PreCommitting and later
TicketValue abi.SealRandomness
TicketEpoch abi.ChainEpoch
PreCommit1Out storiface.PreCommit1Out // todo specify better
CommD *cid.Cid
CommR *cid.Cid // SectorKey
// Required in SubmitCommit and later
PreCommitInfo *miner.SectorPreCommitInfo
PreCommitDeposit *big.Int
PreCommitMessage *cid.Cid
PreCommitTipSet types.TipSetKey
SeedValue abi.InteractiveSealRandomness
SeedEpoch abi.ChainEpoch
CommitProof []byte
// Required in Proving/Available
CommitMessage *cid.Cid
// Optional sector metadata to import
Log []SectorLog
////////
// SECTOR DATA SOURCE
// Sector urls - lotus will use those for fetching files into local storage
// Required in all states
DataUnsealed *storiface.SectorLocation
// Required in PreCommitting and later
DataSealed *storiface.SectorLocation
DataCache *storiface.SectorLocation
////////
// SEALING SERVICE HOOKS
// URL
// RemoteCommit1Endpoint is an URL of POST endpoint which lotus will call requesting Commit1 (seal_commit_phase1)
// request body will be json-serialized RemoteCommit1Params struct
RemoteCommit1Endpoint string
// RemoteCommit2Endpoint is an URL of POST endpoint which lotus will call requesting Commit2 (seal_commit_phase2)
// request body will be json-serialized RemoteCommit2Params struct
RemoteCommit2Endpoint string
// RemoteSealingDoneEndpoint is called after the sector exists the sealing pipeline
// request body will be json-serialized RemoteSealingDoneParams struct
RemoteSealingDoneEndpoint string
}
type RemoteCommit1Params struct {
Ticket, Seed []byte
Unsealed cid.Cid
Sealed cid.Cid
ProofType abi.RegisteredSealProof
}
type RemoteCommit2Params struct {
Sector abi.SectorID
ProofType abi.RegisteredSealProof
// todo spec better
Commit1Out storiface.Commit1Out
}
type RemoteSealingDoneParams struct {
// Successful is true if the sector has entered state considered as "successfully sealed"
Successful bool
// State is the state the sector has entered
// For example "Proving" / "Removing"
State string
// Optional commit message CID
CommitMessage *cid.Cid
}

View File

@ -49,6 +49,7 @@ type Worker interface {
MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) //perm:admin

View File

@ -1005,6 +1005,129 @@ func (t *PieceDealInfo) UnmarshalCBOR(r io.Reader) (err error) {
return nil
}
func (t *SectorPiece) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{162}); err != nil {
return err
}
// t.Piece (abi.PieceInfo) (struct)
if len("Piece") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Piece\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Piece"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Piece")); err != nil {
return err
}
if err := t.Piece.MarshalCBOR(cw); err != nil {
return err
}
// t.DealInfo (api.PieceDealInfo) (struct)
if len("DealInfo") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"DealInfo\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("DealInfo"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("DealInfo")); err != nil {
return err
}
if err := t.DealInfo.MarshalCBOR(cw); err != nil {
return err
}
return nil
}
func (t *SectorPiece) UnmarshalCBOR(r io.Reader) (err error) {
*t = SectorPiece{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("SectorPiece: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Piece (abi.PieceInfo) (struct)
case "Piece":
{
if err := t.Piece.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.Piece: %w", err)
}
}
// t.DealInfo (api.PieceDealInfo) (struct)
case "DealInfo":
{
b, err := cr.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := cr.UnreadByte(); err != nil {
return err
}
t.DealInfo = new(PieceDealInfo)
if err := t.DealInfo.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.DealInfo pointer: %w", err)
}
}
}
default:
// Field doesn't exist on this type, so ignore it
cbg.ScanForLinks(r, func(cid.Cid) {})
}
}
return nil
}
func (t *DealSchedule) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)

View File

@ -6,6 +6,7 @@ import (
"go/ast"
"go/parser"
"go/token"
"net/http"
"path/filepath"
"reflect"
"strings"
@ -340,6 +341,17 @@ func init() {
"": bitfield.NewFromSet([]uint64{5, 6, 7, 10}),
})
addExample(http.Header{
"Authorization": []string{"Bearer ey.."},
})
addExample(map[storiface.SectorFileType]storiface.SectorLocation{
storiface.FTSealed: {
Local: false,
URL: "https://example.com/sealingservice/sectors/s-f0123-12345",
Headers: nil,
},
})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -778,6 +778,8 @@ type StorageMinerStruct struct {
ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
ReturnDownloadSector func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
@ -846,6 +848,8 @@ type StorageMinerStruct struct {
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorReceive func(p0 context.Context, p1 RemoteSectorMeta) error `perm:"admin"`
SectorRemove func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
SectorSetExpectedSealDuration func(p0 context.Context, p1 time.Duration) error `perm:"write"`
@ -951,6 +955,8 @@ type WorkerStruct struct {
DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data) (storiface.CallID, error) `perm:"admin"`
DownloadSectorData func(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) `perm:"admin"`
Enabled func(p0 context.Context) (bool, error) `perm:"admin"`
Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
@ -4665,6 +4671,17 @@ func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnDownloadSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnDownloadSector == nil {
return ErrNotSupported
}
return s.Internal.ReturnDownloadSector(p0, p1, p2)
}
func (s *StorageMinerStub) ReturnDownloadSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFetch == nil {
return ErrNotSupported
@ -5039,6 +5056,17 @@ func (s *StorageMinerStub) SectorPreCommitPending(p0 context.Context) ([]abi.Sec
return *new([]abi.SectorID), ErrNotSupported
}
func (s *StorageMinerStruct) SectorReceive(p0 context.Context, p1 RemoteSectorMeta) error {
if s.Internal.SectorReceive == nil {
return ErrNotSupported
}
return s.Internal.SectorReceive(p0, p1)
}
func (s *StorageMinerStub) SectorReceive(p0 context.Context, p1 RemoteSectorMeta) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) SectorRemove(p0 context.Context, p1 abi.SectorNumber) error {
if s.Internal.SectorRemove == nil {
return ErrNotSupported
@ -5523,6 +5551,17 @@ func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 st
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
if s.Internal.DownloadSectorData == nil {
return *new(storiface.CallID), ErrNotSupported
}
return s.Internal.DownloadSectorData(p0, p1, p2, p3)
}
func (s *WorkerStub) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) {
if s.Internal.Enabled == nil {
return false, ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -498,6 +498,8 @@ var stateList = []stateMeta{
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgMagenta, state: sealing.ReceiveSector},
{col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals},
{col: color.FgBlue, state: sealing.AddPiece},
@ -543,6 +545,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.SealPreCommit2Failed},
{col: color.FgRed, state: sealing.PreCommitFailed},
{col: color.FgRed, state: sealing.ComputeProofFailed},
{col: color.FgRed, state: sealing.RemoteCommitFailed},
{col: color.FgRed, state: sealing.CommitFailed},
{col: color.FgRed, state: sealing.CommitFinalizeFailed},
{col: color.FgRed, state: sealing.PackingFailed},

View File

@ -314,7 +314,7 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string
info := &pipeline.SectorInfo{
State: pipeline.Proving,
SectorNumber: sector.SectorID,
Pieces: []pipeline.Piece{
Pieces: []lapi.SectorPiece{
{
Piece: abi.PieceInfo{
Size: abi.PaddedPieceSize(meta.SectorSize),

View File

@ -224,6 +224,12 @@ var runCmd = &cli.Command{
Value: true,
EnvVars: []string{"LOTUS_WORKER_REGEN_SECTOR_KEY"},
},
&cli.BoolFlag{
Name: "sector-download",
Usage: "enable external sector data download",
Value: false,
EnvVars: []string{"LOTUS_WORKER_SECTOR_DOWNLOAD"},
},
&cli.BoolFlag{
Name: "windowpost",
Usage: "enable window post",
@ -373,6 +379,9 @@ var runCmd = &cli.Command{
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece, sealtasks.TTDataCid)
}
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("sector-download")) && cctx.Bool("sector-download") {
taskTypes = append(taskTypes, sealtasks.TTDownloadSector)
}
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
}

View File

@ -113,6 +113,7 @@
* [Return](#Return)
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnDataCid](#ReturnDataCid)
* [ReturnDownloadSector](#ReturnDownloadSector)
* [ReturnFetch](#ReturnFetch)
* [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate)
* [ReturnFinalizeSector](#ReturnFinalizeSector)
@ -150,6 +151,7 @@
* [SectorNumReserveCount](#SectorNumReserveCount)
* [SectorPreCommitFlush](#SectorPreCommitFlush)
* [SectorPreCommitPending](#SectorPreCommitPending)
* [SectorReceive](#SectorReceive)
* [SectorRemove](#SectorRemove)
* [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration)
* [SectorSetSealDelay](#SectorSetSealDelay)
@ -2388,6 +2390,30 @@ Inputs:
Response: `{}`
### ReturnDownloadSector
Perms: admin
Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Code": 0,
"Message": "string value"
}
]
```
Response: `{}`
### ReturnFetch
@ -3151,6 +3177,134 @@ Response:
]
```
### SectorReceive
Perms: admin
Inputs:
```json
[
{
"State": "Proving",
"Sector": {
"Miner": 1000,
"Number": 9
},
"Type": 8,
"Pieces": [
{
"Piece": {
"Size": 1032,
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
},
"DealInfo": {
"PublishCid": null,
"DealID": 5432,
"DealProposal": {
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"PieceSize": 1032,
"VerifiedDeal": true,
"Client": "f01234",
"Provider": "f01234",
"Label": "",
"StartEpoch": 10101,
"EndEpoch": 10101,
"StoragePricePerEpoch": "0",
"ProviderCollateral": "0",
"ClientCollateral": "0"
},
"DealSchedule": {
"StartEpoch": 10101,
"EndEpoch": 10101
},
"KeepUnsealed": true
}
}
],
"TicketValue": "Bw==",
"TicketEpoch": 10101,
"PreCommit1Out": "Bw==",
"CommD": null,
"CommR": null,
"PreCommitInfo": {
"SealProof": 8,
"SectorNumber": 9,
"SealedCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"SealRandEpoch": 10101,
"DealIDs": [
5432
],
"Expiration": 10101,
"UnsealedCid": null
},
"PreCommitDeposit": "0",
"PreCommitMessage": null,
"PreCommitTipSet": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"SeedValue": "Bw==",
"SeedEpoch": 10101,
"CommitProof": "Ynl0ZSBhcnJheQ==",
"CommitMessage": null,
"Log": [
{
"Kind": "string value",
"Timestamp": 42,
"Trace": "string value",
"Message": "string value"
}
],
"DataUnsealed": {
"Local": true,
"URL": "string value",
"Headers": [
{
"Key": "string value",
"Value": "string value"
}
]
},
"DataSealed": {
"Local": true,
"URL": "string value",
"Headers": [
{
"Key": "string value",
"Value": "string value"
}
]
},
"DataCache": {
"Local": true,
"URL": "string value",
"Headers": [
{
"Key": "string value",
"Value": "string value"
}
]
},
"RemoteCommit1Endpoint": "string value",
"RemoteCommit2Endpoint": "string value",
"RemoteSealingDoneEndpoint": "string value"
}
]
```
Response: `{}`
### SectorRemove
SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.

View File

@ -12,6 +12,8 @@
* [AddPiece](#AddPiece)
* [Data](#Data)
* [DataCid](#DataCid)
* [Download](#Download)
* [DownloadSectorData](#DownloadSectorData)
* [Finalize](#Finalize)
* [FinalizeReplicaUpdate](#FinalizeReplicaUpdate)
* [FinalizeSector](#FinalizeSector)
@ -1542,6 +1544,46 @@ Response:
}
```
## Download
### DownloadSectorData
Perms: admin
Inputs:
```json
[
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
true,
{
"2": {
"Local": false,
"URL": "https://example.com/sealingservice/sectors/s-f0123-12345",
"Headers": null
}
}
]
```
Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```
## Finalize

View File

@ -53,6 +53,7 @@ OPTIONS:
--prove-replica-update2 enable prove replica update 2 (default: true) [$LOTUS_WORKER_PROVE_REPLICA_UPDATE2]
--regen-sector-key enable regen sector key (default: true) [$LOTUS_WORKER_REGEN_SECTOR_KEY]
--replica-update enable replica update (default: true) [$LOTUS_WORKER_REPLICA_UPDATE]
--sector-download enable external sector data download (default: false) [$LOTUS_WORKER_SECTOR_DOWNLOAD]
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT]
--unseal enable unsealing (32G sectors: 1 core, 128GiB Memory) (default: true) [$LOTUS_WORKER_UNSEAL]
--windowpost enable window post (default: false) [$LOTUS_WORKER_WINDOWPOST]

View File

@ -612,8 +612,10 @@
# env var: LOTUS_STORAGE_PARALLELFETCHLIMIT
#ParallelFetchLimit = 10
# Local worker config
#
# type: bool
# env var: LOTUS_STORAGE_ALLOWSECTORDOWNLOAD
#AllowSectorDownload = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWADDPIECE
#AllowAddPiece = true

View File

@ -65,6 +65,7 @@ func main() {
api.SealTicket{},
api.SealSeed{},
api.PieceDealInfo{},
api.SectorPiece{},
api.DealSchedule{},
)
if err != nil {
@ -102,6 +103,8 @@ func main() {
err = gen.WriteMapEncodersToFile("./storage/sealer/storiface/cbor_gen.go", "storiface",
storiface.CallID{},
storiface.SecDataHttpHeader{},
storiface.SectorLocation{},
)
if err != nil {
fmt.Println(err)

View File

@ -636,6 +636,7 @@ func (n *Ensemble) Start() *Ensemble {
scfg := config.DefaultStorageMiner()
if noLocal {
scfg.Storage.AllowSectorDownload = false
scfg.Storage.AllowAddPiece = false
scfg.Storage.AllowPreCommit1 = false
scfg.Storage.AllowPreCommit2 = false
@ -962,7 +963,7 @@ func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.Metad
info := &pipeline.SectorInfo{
State: pipeline.Proving,
SectorNumber: sector.SectorID,
Pieces: []pipeline.Piece{
Pieces: []api.SectorPiece{
{
Piece: abi.PieceInfo{
Size: abi.PaddedPieceSize(meta.SectorSize),

View File

@ -97,6 +97,10 @@ func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNo
}
func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.SectorNumber]struct{}) {
tm.WaitSectorsProvingAllowFails(ctx, toCheck, map[api.SectorState]struct{}{})
}
func (tm *TestMiner) WaitSectorsProvingAllowFails(ctx context.Context, toCheck map[abi.SectorNumber]struct{}, okFails map[api.SectorState]struct{}) {
for len(toCheck) > 0 {
tm.FlushSealingBatches(ctx)
@ -105,11 +109,13 @@ func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.Sec
st, err := tm.StorageMiner.SectorsStatus(ctx, n, false)
require.NoError(tm.t, err)
states[st.State]++
if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) {
if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) || st.State == api.SectorState(sealing.Removed) {
delete(toCheck, n)
}
if strings.Contains(string(st.State), "Fail") {
tm.t.Fatal("sector in a failed state", st.State)
if _, ok := okFails[st.State]; !ok {
tm.t.Fatal("sector in a failed state", st.State)
}
}
}

View File

@ -0,0 +1,494 @@
package itests
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
spaths "github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)
func TestSectorImport(t *testing.T) {
type testCase struct {
c1handler func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request)
mutateRemoteMeta func(*api.RemoteSectorMeta)
expectImportErrContains string
expectDoneSuccess bool
expectDoneState string
}
makeTest := func(mut func(*testCase)) *testCase {
tc := &testCase{
c1handler: testRemoteCommit1,
expectDoneSuccess: true,
expectDoneState: "Proving",
}
mut(tc)
return tc
}
runTest := func(tc *testCase) func(t *testing.T) {
return func(t *testing.T) {
kit.QuietMiningLogs()
var blockTime = 50 * time.Millisecond
////////
// Start a miner node
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
ctx := context.Background()
////////
// Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely"
snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16)
require.NoError(t, err)
sectorDir := t.TempDir()
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
mid, err := address.IDFromAddress(maddr)
require.NoError(t, err)
mi, err := client.StateMinerInfo(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
ver, err := client.StateNetworkVersion(ctx, types.EmptyTSK)
require.NoError(t, err)
spt, err := lminer.PreferredSealProofTypeFromWindowPoStType(ver, mi.WindowPoStProofType)
require.NoError(t, err)
ssize, err := spt.SectorSize()
require.NoError(t, err)
pieceSize := abi.PaddedPieceSize(ssize)
////////
// Create/Seal a sector up to pc2 outside of the pipeline
// get one sector number from the reservation done on the miner above
sn, err := snums.First()
require.NoError(t, err)
// create all the sector identifiers
snum := abi.SectorNumber(sn)
sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum}
sref := storiface.SectorRef{ID: sid, ProofType: spt}
// create a low-level sealer instance
sealer, err := ffiwrapper.New(&basicfs.Provider{
Root: sectorDir,
})
require.NoError(t, err)
// CREATE THE UNSEALED FILE
// create a reader for all-zero (CC) data
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
// create the unsealed CC sector file
pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader)
require.NoError(t, err)
// GENERATE THE TICKET
// get most recent valid ticket epoch
ts, err := client.ChainHead(ctx)
require.NoError(t, err)
ticketEpoch := ts.Height() - policy.SealRandomnessLookback
// ticket entropy is cbor-seriasized miner addr
buf := new(bytes.Buffer)
require.NoError(t, maddr.MarshalCBOR(buf))
// generate ticket randomness
rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
require.NoError(t, err)
// EXECUTE PRECOMMIT 1 / 2
// run PC1
pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo})
require.NoError(t, err)
// run pc2
scids, err := sealer.SealPreCommit2(ctx, sref, pc1out)
require.NoError(t, err)
// make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1
finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum))
require.NoError(t, os.MkdirAll(finDst, 0777))
require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst))
////////
// start http server serving sector data
doneResp := new(*api.RemoteSealingDoneParams)
m := mux.NewRouter()
m.HandleFunc("/sectors/{type}/{id}", testRemoteGetSector(sectorDir)).Methods("GET")
m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer, miner)).Methods("POST")
m.HandleFunc("/sectors/{id}/sealed", testRemoteDone(doneResp)).Methods("POST")
m.HandleFunc("/commit2", testRemoteCommit2(sealer)).Methods("POST")
srv := httptest.NewServer(m)
unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum)
sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum)
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL)
doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum)
////////
// import the sector and continue sealing
rmeta := api.RemoteSectorMeta{
State: "PreCommitting",
Sector: sid,
Type: spt,
Pieces: []api.SectorPiece{
{
Piece: pieceInfo,
DealInfo: nil,
},
},
TicketValue: abi.SealRandomness(rand),
TicketEpoch: ticketEpoch,
PreCommit1Out: pc1out,
CommD: &scids.Unsealed,
CommR: &scids.Sealed,
DataUnsealed: &storiface.SectorLocation{
Local: false,
URL: unsealedURL,
},
DataSealed: &storiface.SectorLocation{
Local: false,
URL: sealedURL,
},
DataCache: &storiface.SectorLocation{
Local: false,
URL: cacheURL,
},
RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit2Endpoint: remoteC2URL,
RemoteSealingDoneEndpoint: doneURL,
}
if tc.mutateRemoteMeta != nil {
tc.mutateRemoteMeta(&rmeta)
}
err = miner.SectorReceive(ctx, rmeta)
if tc.expectImportErrContains != "" {
require.ErrorContains(t, err, tc.expectImportErrContains)
return
}
require.NoError(t, err)
// check that we see the imported sector
ng, err := miner.SectorsListNonGenesis(ctx)
require.NoError(t, err)
require.Len(t, ng, 1)
require.Equal(t, snum, ng[0])
miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommitFailed): {}})
require.NotNil(t, *doneResp)
require.Equal(t, tc.expectDoneSuccess, (*doneResp).Successful)
require.Equal(t, tc.expectDoneState, (*doneResp).State)
if tc.expectDoneSuccess {
require.NotNil(t, (*doneResp).CommitMessage)
}
}
}
// fail first remote c1, verifies that c1 retry works
t.Run("c1-retry", runTest(makeTest(func(testCase *testCase) {
prt := sealing.MinRetryTime
sealing.MinRetryTime = time.Second
t.Cleanup(func() {
sealing.MinRetryTime = prt
})
testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) {
var failedOnce bool
return func(w http.ResponseWriter, r *http.Request) {
if !failedOnce {
failedOnce = true
w.WriteHeader(http.StatusBadGateway)
return
}
testRemoteCommit1(s, m)(w, r)
}
}
})))
t.Run("c1-fail-remove", runTest(makeTest(func(testCase *testCase) {
prt := sealing.MinRetryTime
sealing.MinRetryTime = time.Second
t.Cleanup(func() {
sealing.MinRetryTime = prt
})
testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
panic(err)
}
err = m.SectorRemove(r.Context(), id.Number)
if err != nil {
panic(err)
}
w.WriteHeader(http.StatusBadGateway)
}
}
testCase.expectDoneSuccess = false
testCase.expectDoneState = "Removing"
})))
t.Run("nil-commd", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.CommD = nil
}
testCase.expectImportErrContains = "both CommR/CommD cids need to be set for sectors in PreCommitting and later states"
})))
t.Run("nil-commr", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.CommR = nil
}
testCase.expectImportErrContains = "both CommR/CommD cids need to be set for sectors in PreCommitting and later states"
})))
t.Run("nil-uns", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.DataUnsealed = nil
}
testCase.expectImportErrContains = "expected DataUnsealed to be set"
})))
t.Run("nil-sealed", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.DataSealed = nil
}
testCase.expectImportErrContains = "expected DataSealed to be set"
})))
t.Run("nil-cache", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.DataCache = nil
}
testCase.expectImportErrContains = "expected DataCache to be set"
})))
t.Run("bad-commd", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.CommD = meta.CommR
}
testCase.expectImportErrContains = "CommD cid has wrong prefix"
})))
t.Run("bad-commr", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.CommR = meta.CommD
}
testCase.expectImportErrContains = "CommR cid has wrong prefix"
})))
t.Run("bad-ticket", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
// flip one bit
meta.TicketValue[23] ^= 4
}
testCase.expectImportErrContains = "tickets differ"
})))
}
// note: stuff below is almost the same as in _simple version of this file; We need
// to copy it because on Circle we can't call those functions between test files,
// and for the _simple test we want everything in one file to make it easy to follow
func testRemoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
*rs = new(api.RemoteSealingDoneParams)
if err := json.NewDecoder(r.Body).Decode(*rs); err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
}
}
func testRemoteCommit1(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// validate sector id
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
w.WriteHeader(500)
return
}
var params api.RemoteCommit1Params
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
w.WriteHeader(500)
return
}
sref := storiface.SectorRef{
ID: id,
ProofType: params.ProofType,
}
ssize, err := params.ProofType.SectorSize()
if err != nil {
w.WriteHeader(500)
return
}
p, err := s.SealCommit1(r.Context(), sref, params.Ticket, params.Seed, []abi.PieceInfo{
{
Size: abi.PaddedPieceSize(ssize),
PieceCID: params.Unsealed,
},
}, storiface.SectorCids{
Unsealed: params.Unsealed,
Sealed: params.Sealed,
})
if err != nil {
w.WriteHeader(500)
return
}
if _, err := w.Write(p); err != nil {
fmt.Println("c1 write error")
}
}
}
func testRemoteCommit2(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var params api.RemoteCommit2Params
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
w.WriteHeader(500)
return
}
sref := storiface.SectorRef{
ID: params.Sector,
ProofType: params.ProofType,
}
p, err := s.SealCommit2(r.Context(), sref, params.Commit1Out)
if err != nil {
fmt.Println("c2 error: ", err)
w.WriteHeader(500)
return
}
if _, err := w.Write(p); err != nil {
fmt.Println("c2 write error")
}
}
}
func testRemoteGetSector(sectorRoot string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// validate sector id
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
w.WriteHeader(500)
return
}
// validate type
_, err = spaths.FileTypeFromString(vars["type"])
if err != nil {
w.WriteHeader(500)
return
}
typ := vars["type"]
if typ == "cache" {
// if cache is requested, send the finalized cache we've created above
typ = "fin-cache"
}
path := filepath.Join(sectorRoot, typ, vars["id"])
stat, err := os.Stat(path)
if err != nil {
w.WriteHeader(500)
return
}
if stat.IsDir() {
if _, has := r.Header["Range"]; has {
w.WriteHeader(500)
return
}
w.Header().Set("Content-Type", "application/x-tar")
w.WriteHeader(200)
err := tarutil.TarDirectory(path, w, make([]byte, 1<<20))
if err != nil {
return
}
} else {
w.Header().Set("Content-Type", "application/octet-stream")
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
http.ServeFile(w, r, path)
}
fmt.Printf("served sector file/dir, sectorID=%+v, fileType=%s, path=%s\n", id, vars["type"], path)
}
}

View File

@ -0,0 +1,345 @@
package itests
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
spaths "github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)
func TestSectorImportAfterPC2(t *testing.T) {
kit.QuietMiningLogs()
var blockTime = 50 * time.Millisecond
////////
// Start a miner node
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
ctx := context.Background()
////////
// Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely"
snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16)
require.NoError(t, err)
sectorDir := t.TempDir()
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
mid, err := address.IDFromAddress(maddr)
require.NoError(t, err)
mi, err := client.StateMinerInfo(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
ver, err := client.StateNetworkVersion(ctx, types.EmptyTSK)
require.NoError(t, err)
spt, err := lminer.PreferredSealProofTypeFromWindowPoStType(ver, mi.WindowPoStProofType)
require.NoError(t, err)
ssize, err := spt.SectorSize()
require.NoError(t, err)
pieceSize := abi.PaddedPieceSize(ssize)
////////
// Create/Seal a sector up to pc2 outside of the pipeline
// get one sector number from the reservation done on the miner above
sn, err := snums.First()
require.NoError(t, err)
// create all the sector identifiers
snum := abi.SectorNumber(sn)
sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum}
sref := storiface.SectorRef{ID: sid, ProofType: spt}
// create a low-level sealer instance
sealer, err := ffiwrapper.New(&basicfs.Provider{
Root: sectorDir,
})
require.NoError(t, err)
// CREATE THE UNSEALED FILE
// create a reader for all-zero (CC) data
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
// create the unsealed CC sector file
pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader)
require.NoError(t, err)
// GENERATE THE TICKET
// get most recent valid ticket epoch
ts, err := client.ChainHead(ctx)
require.NoError(t, err)
ticketEpoch := ts.Height() - policy.SealRandomnessLookback
// ticket entropy is cbor-seriasized miner addr
buf := new(bytes.Buffer)
require.NoError(t, maddr.MarshalCBOR(buf))
// generate ticket randomness
rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
require.NoError(t, err)
// EXECUTE PRECOMMIT 1 / 2
// run PC1
pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo})
require.NoError(t, err)
// run pc2
scids, err := sealer.SealPreCommit2(ctx, sref, pc1out)
require.NoError(t, err)
// make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1
finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum))
require.NoError(t, os.MkdirAll(finDst, 0777))
require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst))
////////
// start http server serving sector data
doneResp := new(*api.RemoteSealingDoneParams)
m := mux.NewRouter()
m.HandleFunc("/sectors/{type}/{id}", remoteGetSector(sectorDir)).Methods("GET")
m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST")
m.HandleFunc("/sectors/{id}/sealed", remoteDone(doneResp)).Methods("POST")
m.HandleFunc("/commit2", remoteCommit2(sealer)).Methods("POST")
srv := httptest.NewServer(m)
unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum)
sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum)
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL)
doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum)
////////
// import the sector and continue sealing
err = miner.SectorReceive(ctx, api.RemoteSectorMeta{
State: "PreCommitting",
Sector: sid,
Type: spt,
Pieces: []api.SectorPiece{
{
Piece: pieceInfo,
DealInfo: nil,
},
},
TicketValue: abi.SealRandomness(rand),
TicketEpoch: ticketEpoch,
PreCommit1Out: pc1out,
CommD: &scids.Unsealed,
CommR: &scids.Sealed,
DataUnsealed: &storiface.SectorLocation{
Local: false,
URL: unsealedURL,
},
DataSealed: &storiface.SectorLocation{
Local: false,
URL: sealedURL,
},
DataCache: &storiface.SectorLocation{
Local: false,
URL: cacheURL,
},
RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit2Endpoint: remoteC2URL,
RemoteSealingDoneEndpoint: doneURL,
})
require.NoError(t, err)
// check that we see the imported sector
ng, err := miner.SectorsListNonGenesis(ctx)
require.NoError(t, err)
require.Len(t, ng, 1)
require.Equal(t, snum, ng[0])
miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{snum: {}})
require.NotNil(t, *doneResp)
require.True(t, (*doneResp).Successful)
require.Equal(t, "Proving", (*doneResp).State)
require.NotNil(t, (*doneResp).CommitMessage)
}
func remoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// validate sector id
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
w.WriteHeader(500)
return
}
var params api.RemoteCommit1Params
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
w.WriteHeader(500)
return
}
sref := storiface.SectorRef{
ID: id,
ProofType: params.ProofType,
}
ssize, err := params.ProofType.SectorSize()
if err != nil {
w.WriteHeader(500)
return
}
p, err := s.SealCommit1(r.Context(), sref, params.Ticket, params.Seed, []abi.PieceInfo{
{
Size: abi.PaddedPieceSize(ssize),
PieceCID: params.Unsealed,
},
}, storiface.SectorCids{
Unsealed: params.Unsealed,
Sealed: params.Sealed,
})
if err != nil {
w.WriteHeader(500)
return
}
if _, err := w.Write(p); err != nil {
fmt.Println("c1 write error")
}
}
}
func remoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
*rs = new(api.RemoteSealingDoneParams)
if err := json.NewDecoder(r.Body).Decode(*rs); err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
}
}
func remoteCommit2(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var params api.RemoteCommit2Params
if err := json.NewDecoder(r.Body).Decode(&params); err != nil {
w.WriteHeader(500)
return
}
sref := storiface.SectorRef{
ID: params.Sector,
ProofType: params.ProofType,
}
p, err := s.SealCommit2(r.Context(), sref, params.Commit1Out)
if err != nil {
fmt.Println("c2 error: ", err)
w.WriteHeader(500)
return
}
if _, err := w.Write(p); err != nil {
fmt.Println("c2 write error")
}
}
}
func remoteGetSector(sectorRoot string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// validate sector id
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
w.WriteHeader(500)
return
}
// validate type
_, err = spaths.FileTypeFromString(vars["type"])
if err != nil {
w.WriteHeader(500)
return
}
typ := vars["type"]
if typ == "cache" {
// if cache is requested, send the finalized cache we've created above
typ = "fin-cache"
}
path := filepath.Join(sectorRoot, typ, vars["id"])
stat, err := os.Stat(path)
if err != nil {
w.WriteHeader(500)
return
}
if stat.IsDir() {
if _, has := r.Header["Range"]; has {
w.WriteHeader(500)
return
}
w.Header().Set("Content-Type", "application/x-tar")
w.WriteHeader(200)
err := tarutil.TarDirectory(path, w, make([]byte, 1<<20))
if err != nil {
return
}
} else {
w.Header().Set("Content-Type", "application/octet-stream")
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
http.ServeFile(w, r, path)
}
fmt.Printf("served sector file/dir, sectorID=%+v, fileType=%s, path=%s\n", id, vars["type"], path)
}
}

View File

@ -146,6 +146,7 @@ func DefaultStorageMiner() *StorageMiner {
},
Storage: SealerConfig{
AllowSectorDownload: true,
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,

View File

@ -796,11 +796,17 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b
Comment: ``,
},
{
Name: "AllowSectorDownload",
Type: "bool",
Comment: ``,
},
{
Name: "AllowAddPiece",
Type: "bool",
Comment: `Local worker config`,
Comment: ``,
},
{
Name: "AllowPreCommit1",

View File

@ -54,6 +54,7 @@ func WriteStorageFile(path string, config paths.StorageConfig) error {
func (c *StorageMiner) StorageManager() sealer.Config {
return sealer.Config{
ParallelFetchLimit: c.Storage.ParallelFetchLimit,
AllowSectorDownload: c.Storage.AllowSectorDownload,
AllowAddPiece: c.Storage.AllowAddPiece,
AllowPreCommit1: c.Storage.AllowPreCommit1,
AllowPreCommit2: c.Storage.AllowPreCommit2,

View File

@ -409,7 +409,7 @@ type SealingConfig struct {
type SealerConfig struct {
ParallelFetchLimit int
// Local worker config
AllowSectorDownload bool
AllowAddPiece bool
AllowPreCommit1 bool
AllowPreCommit2 bool

View File

@ -182,16 +182,20 @@ func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, erro
return abi.SectorID{}, err
}
return sm.waitSectorStarted(ctx, sr.ID)
}
func (sm *StorageMinerAPI) waitSectorStarted(ctx context.Context, si abi.SectorID) (abi.SectorID, error) {
// wait for the sector to enter the Packing state
// TODO: instead of polling implement some pubsub-type thing in storagefsm
for {
info, err := sm.Miner.SectorsStatus(ctx, sr.ID.Number, false)
info, err := sm.Miner.SectorsStatus(ctx, si.Number, false)
if err != nil {
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
}
if info.State != api.SectorState(sealing.UndefinedSectorState) {
return sr.ID, nil
return si, nil
}
select {
@ -448,6 +452,15 @@ func (sm *StorageMinerAPI) SectorNumFree(ctx context.Context, name string) error
return sm.Miner.NumFree(ctx, name)
}
func (sm *StorageMinerAPI) SectorReceive(ctx context.Context, meta api.RemoteSectorMeta) error {
if err := sm.Miner.Receive(ctx, meta); err != nil {
return err
}
_, err := sm.waitSectorStarted(ctx, meta.Sector)
return err
}
func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]minertypes.SubmitWindowedPoStParams, error) {
var ts *types.TipSet
var err error

105
storage/paths/fetch.go Normal file
View File

@ -0,0 +1,105 @@
package paths
import (
"context"
"io"
"mime"
"net/http"
"os"
"time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)
func fetch(ctx context.Context, url, outname string, header http.Header) (rerr error) {
log.Infof("Fetch %s -> %s", url, outname)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = header
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
start := time.Now()
var bytes int64
defer func() {
took := time.Now().Sub(start)
mibps := float64(bytes) / 1024 / 1024 * float64(time.Second) / float64(took)
log.Infow("Fetch done", "url", url, "out", outname, "took", took.Round(time.Millisecond), "bytes", bytes, "MiB/s", mibps, "err", rerr)
}()
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}
if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}
switch mediatype {
case "application/x-tar":
bytes, err = tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
return err
case "application/octet-stream":
f, err := os.Create(outname)
if err != nil {
return err
}
bytes, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
if err != nil {
f.Close() // nolint
return err
}
return f.Close()
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}
// FetchWithTemp fetches data into a temp 'fetching' directory, then moves the file to destination
// The set of URLs must refer to the same object, if one fails, another one will be tried.
func FetchWithTemp(ctx context.Context, urls []string, dest string, header http.Header) (string, error) {
var merr error
for _, url := range urls {
tempDest, err := tempFetchDest(dest, true)
if err != nil {
return "", err
}
if err := os.RemoveAll(dest); err != nil {
return "", xerrors.Errorf("removing dest: %w", err)
}
err = fetch(ctx, url, tempDest, header)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s -> %s: %w", url, tempDest, err))
continue
}
if err := move(tempDest, dest); err != nil {
return "", xerrors.Errorf("fetch move error %s -> %s: %w", tempDest, dest, err)
}
if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
}
return url, nil
}
return "", xerrors.Errorf("failed to fetch sector file (tried %v): %w", urls, merr)
}

View File

@ -95,7 +95,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
return
}
ft, err := ftFromString(vars["type"])
ft, err := FileTypeFromString(vars["type"])
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
@ -167,7 +167,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
return
}
ft, err := ftFromString(vars["type"])
ft, err := FileTypeFromString(vars["type"])
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
@ -195,9 +195,9 @@ func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.R
return
}
ft, err := ftFromString(vars["type"])
ft, err := FileTypeFromString(vars["type"])
if err != nil {
log.Errorf("ftFromString: %+v", err)
log.Errorf("FileTypeFromString: %+v", err)
w.WriteHeader(500)
return
}
@ -311,7 +311,7 @@ func (handler *FetchHandler) generateSingleVanillaProof(w http.ResponseWriter, r
http.ServeContent(w, r, "", time.Time{}, bytes.NewReader(vanilla))
}
func ftFromString(t string) (storiface.SectorFileType, error) {
func FileTypeFromString(t string) (storiface.SectorFileType, error) {
switch t {
case storiface.FTUnsealed.String():
return storiface.FTUnsealed, nil

View File

@ -7,7 +7,6 @@ import (
"io"
"io/ioutil"
"math/bits"
"mime"
"net/http"
"net/url"
"os"
@ -24,7 +23,6 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)
var FetchTempSubdir = "fetching"
@ -236,7 +234,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("removing dest: %w", err)
}
err = r.fetch(ctx, url, tempDest)
err = r.fetchThrottled(ctx, url, tempDest)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
continue
@ -256,9 +254,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
}
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
log.Infof("Fetch %s -> %s", url, outname)
func (r *Remote) fetchThrottled(ctx context.Context, url, outname string) (rerr error) {
if len(r.limit) >= cap(r.limit) {
log.Infof("Throttling fetch, %d already running", len(r.limit))
}
@ -274,59 +270,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
/*bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()*/
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}
if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}
switch mediatype {
case "application/x-tar":
return tarutil.ExtractTar(resp.Body, outname, make([]byte, CopyBuf))
case "application/octet-stream":
f, err := os.Create(outname)
if err != nil {
return err
}
_, err = io.CopyBuffer(f, resp.Body, make([]byte, CopyBuf))
if err != nil {
f.Close() // nolint
return err
}
return f.Close()
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
return fetch(ctx, url, outname, r.auth)
}
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {

View File

@ -15,6 +15,7 @@ import (
abi "github.com/filecoin-project/go-state-types/abi"
api "github.com/filecoin-project/lotus/api"
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var _ = xerrors.Errorf
@ -22,129 +23,6 @@ var _ = cid.Undef
var _ = math.E
var _ = sort.Sort
func (t *Piece) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{162}); err != nil {
return err
}
// t.Piece (abi.PieceInfo) (struct)
if len("Piece") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Piece\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Piece"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Piece")); err != nil {
return err
}
if err := t.Piece.MarshalCBOR(cw); err != nil {
return err
}
// t.DealInfo (api.PieceDealInfo) (struct)
if len("DealInfo") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"DealInfo\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("DealInfo"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("DealInfo")); err != nil {
return err
}
if err := t.DealInfo.MarshalCBOR(cw); err != nil {
return err
}
return nil
}
func (t *Piece) UnmarshalCBOR(r io.Reader) (err error) {
*t = Piece{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("Piece: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Piece (abi.PieceInfo) (struct)
case "Piece":
{
if err := t.Piece.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.Piece: %w", err)
}
}
// t.DealInfo (api.PieceDealInfo) (struct)
case "DealInfo":
{
b, err := cr.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := cr.UnreadByte(); err != nil {
return err
}
t.DealInfo = new(api.PieceDealInfo)
if err := t.DealInfo.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.DealInfo pointer: %w", err)
}
}
}
default:
// Field doesn't exist on this type, so ignore it
cbg.ScanForLinks(r, func(cid.Cid) {})
}
}
return nil
}
func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -153,7 +31,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{184, 31}); err != nil {
if _, err := cw.Write([]byte{184, 38}); err != nil {
return err
}
@ -240,7 +118,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.Pieces ([]sealing.Piece) (slice)
// t.Pieces ([]api.SectorPiece) (slice)
if len("Pieces") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Pieces\" was too long")
}
@ -573,7 +451,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.CCPieces ([]sealing.Piece) (slice)
// t.CCPieces ([]api.SectorPiece) (slice)
if len("CCPieces") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"CCPieces\" was too long")
}
@ -777,6 +655,139 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.RemoteDataUnsealed (storiface.SectorLocation) (struct)
if len("RemoteDataUnsealed") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataUnsealed\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteDataUnsealed"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteDataUnsealed")); err != nil {
return err
}
if err := t.RemoteDataUnsealed.MarshalCBOR(cw); err != nil {
return err
}
// t.RemoteDataSealed (storiface.SectorLocation) (struct)
if len("RemoteDataSealed") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataSealed\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteDataSealed"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteDataSealed")); err != nil {
return err
}
if err := t.RemoteDataSealed.MarshalCBOR(cw); err != nil {
return err
}
// t.RemoteDataCache (storiface.SectorLocation) (struct)
if len("RemoteDataCache") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataCache\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteDataCache"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteDataCache")); err != nil {
return err
}
if err := t.RemoteDataCache.MarshalCBOR(cw); err != nil {
return err
}
// t.RemoteCommit1Endpoint (string) (string)
if len("RemoteCommit1Endpoint") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteCommit1Endpoint\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteCommit1Endpoint"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteCommit1Endpoint")); err != nil {
return err
}
if len(t.RemoteCommit1Endpoint) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.RemoteCommit1Endpoint was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.RemoteCommit1Endpoint))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.RemoteCommit1Endpoint)); err != nil {
return err
}
// t.RemoteCommit2Endpoint (string) (string)
if len("RemoteCommit2Endpoint") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteCommit2Endpoint\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteCommit2Endpoint"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteCommit2Endpoint")); err != nil {
return err
}
if len(t.RemoteCommit2Endpoint) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.RemoteCommit2Endpoint was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.RemoteCommit2Endpoint))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.RemoteCommit2Endpoint)); err != nil {
return err
}
// t.RemoteSealingDoneEndpoint (string) (string)
if len("RemoteSealingDoneEndpoint") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteSealingDoneEndpoint\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteSealingDoneEndpoint"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteSealingDoneEndpoint")); err != nil {
return err
}
if len(t.RemoteSealingDoneEndpoint) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.RemoteSealingDoneEndpoint was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.RemoteSealingDoneEndpoint))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.RemoteSealingDoneEndpoint)); err != nil {
return err
}
// t.RemoteDataFinalized (bool) (bool)
if len("RemoteDataFinalized") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataFinalized\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteDataFinalized"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteDataFinalized")); err != nil {
return err
}
if err := cbg.WriteBool(w, t.RemoteDataFinalized); err != nil {
return err
}
// t.LastErr (string) (string)
if len("LastErr") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"LastErr\" was too long")
@ -943,7 +954,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
t.CreationTime = int64(extraI)
}
// t.Pieces ([]sealing.Piece) (slice)
// t.Pieces ([]api.SectorPiece) (slice)
case "Pieces":
maj, extra, err = cr.ReadHeader()
@ -960,12 +971,12 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
}
if extra > 0 {
t.Pieces = make([]Piece, extra)
t.Pieces = make([]api.SectorPiece, extra)
}
for i := 0; i < int(extra); i++ {
var v Piece
var v api.SectorPiece
if err := v.UnmarshalCBOR(cr); err != nil {
return err
}
@ -1273,7 +1284,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
default:
return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
}
// t.CCPieces ([]sealing.Piece) (slice)
// t.CCPieces ([]api.SectorPiece) (slice)
case "CCPieces":
maj, extra, err = cr.ReadHeader()
@ -1290,12 +1301,12 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
}
if extra > 0 {
t.CCPieces = make([]Piece, extra)
t.CCPieces = make([]api.SectorPiece, extra)
}
for i := 0; i < int(extra); i++ {
var v Piece
var v api.SectorPiece
if err := v.UnmarshalCBOR(cr); err != nil {
return err
}
@ -1477,6 +1488,117 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
t.TerminatedAt = abi.ChainEpoch(extraI)
}
// t.RemoteDataUnsealed (storiface.SectorLocation) (struct)
case "RemoteDataUnsealed":
{
b, err := cr.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := cr.UnreadByte(); err != nil {
return err
}
t.RemoteDataUnsealed = new(storiface.SectorLocation)
if err := t.RemoteDataUnsealed.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.RemoteDataUnsealed pointer: %w", err)
}
}
}
// t.RemoteDataSealed (storiface.SectorLocation) (struct)
case "RemoteDataSealed":
{
b, err := cr.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := cr.UnreadByte(); err != nil {
return err
}
t.RemoteDataSealed = new(storiface.SectorLocation)
if err := t.RemoteDataSealed.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.RemoteDataSealed pointer: %w", err)
}
}
}
// t.RemoteDataCache (storiface.SectorLocation) (struct)
case "RemoteDataCache":
{
b, err := cr.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := cr.UnreadByte(); err != nil {
return err
}
t.RemoteDataCache = new(storiface.SectorLocation)
if err := t.RemoteDataCache.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.RemoteDataCache pointer: %w", err)
}
}
}
// t.RemoteCommit1Endpoint (string) (string)
case "RemoteCommit1Endpoint":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.RemoteCommit1Endpoint = string(sval)
}
// t.RemoteCommit2Endpoint (string) (string)
case "RemoteCommit2Endpoint":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.RemoteCommit2Endpoint = string(sval)
}
// t.RemoteSealingDoneEndpoint (string) (string)
case "RemoteSealingDoneEndpoint":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.RemoteSealingDoneEndpoint = string(sval)
}
// t.RemoteDataFinalized (bool) (bool)
case "RemoteDataFinalized":
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajOther {
return fmt.Errorf("booleans must be major type 7")
}
switch extra {
case 20:
t.RemoteDataFinalized = false
case 21:
t.RemoteDataFinalized = true
default:
return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
}
// t.LastErr (string) (string)
case "LastErr":

View File

@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
prooftypes "github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
)
@ -41,21 +42,28 @@ type ErrCommitWaitFailed struct{ error }
type ErrBadRU struct{ error }
type ErrBadPR struct{ error }
func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error {
func checkPieces(ctx context.Context, maddr address.Address, sn abi.SectorNumber, pieces []api.SectorPiece, api SealingAPI, mustHaveDeals bool) error {
ts, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
dealCount := 0
var offset abi.PaddedPieceSize
for i, p := range pieces {
// check that the piece is correctly aligned
if offset%p.Piece.Size != 0 {
return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d is not aligned: size=%xh offset=%xh off-by=%xh", sn, i, p.Piece.Size, offset, offset%p.Piece.Size)}
}
offset += p.Piece.Size
for i, p := range si.Pieces {
// if no deal is associated with the piece, ensure that we added it as
// filler (i.e. ensure that it has a zero PieceCID)
if p.DealInfo == nil {
exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded())
if !p.Piece.PieceCID.Equals(exp) {
return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", si.SectorNumber, i, p.Piece.PieceCID)}
return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sn, i, p.Piece.PieceCID)}
}
continue
}
@ -68,24 +76,24 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
}
if deal.Proposal.Provider != maddr {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, deal.Proposal.Provider, maddr)}
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(pieces), sn, p.DealInfo.DealID, deal.Proposal.Provider, maddr)}
}
if deal.Proposal.PieceCID != p.Piece.PieceCID {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, deal.Proposal.PieceCID)}
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(pieces), sn, p.DealInfo.DealID, p.Piece.PieceCID, deal.Proposal.PieceCID)}
}
if p.Piece.Size != deal.Proposal.PieceSize {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.Size, deal.Proposal.PieceSize)}
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(pieces), sn, p.DealInfo.DealID, p.Piece.Size, deal.Proposal.PieceSize)}
}
if ts.Height() >= deal.Proposal.StartEpoch {
return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, deal.Proposal.StartEpoch, ts.Height())}
return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(pieces), sn, p.DealInfo.DealID, deal.Proposal.StartEpoch, ts.Height())}
}
}
if mustHaveDeals && dealCount <= 0 {
return &ErrNoDeals{(xerrors.Errorf("sector %d must have deals, but does not", si.SectorNumber))}
return &ErrNoDeals{xerrors.Errorf("sector %d must have deals, but does not", sn)}
}
return nil
@ -95,7 +103,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
//
// matches pieces, and that the seal ticket isn't expired
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, height abi.ChainEpoch, api SealingAPI) (err error) {
if err := checkPieces(ctx, maddr, si, api, false); err != nil {
if err := checkPieces(ctx, maddr, si.SectorNumber, si.Pieces, api, false); err != nil {
return err
}
@ -210,7 +218,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")}
}
if err := checkPieces(ctx, m.maddr, si, m.Api, false); err != nil {
if err := checkPieces(ctx, m.maddr, si.SectorNumber, si.Pieces, m.Api, false); err != nil {
return err
}
@ -220,7 +228,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
// check that sector info is good after running a replica update
func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, api SealingAPI) error {
if err := checkPieces(ctx, maddr, si, api, true); err != nil {
if err := checkPieces(ctx, maddr, si.SectorNumber, si.Pieces, api, true); err != nil {
return err
}
if !si.CCUpdate {

View File

@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"time"
@ -14,6 +15,8 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
)
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
@ -40,11 +43,17 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
}
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){
// external import
ReceiveSector: planOne(
onReturning(SectorReceived{}),
),
// Sealing
UndefinedSectorState: planOne(
on(SectorStart{}, WaitDeals),
on(SectorStartCC{}, Packing),
on(SectorReceive{}, ReceiveSector),
),
Empty: planOne( // deprecated
on(SectorAddPiece{}, AddPiece),
@ -136,8 +145,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
),
FinalizeSector: planOne(
on(SectorFinalized{}, Proving),
on(SectorFinalizedAvailable{}, Available),
onWithCB(SectorFinalized{}, Proving, maybeNotifyRemoteDone(true, "Proving")),
onWithCB(SectorFinalizedAvailable{}, Available, maybeNotifyRemoteDone(true, "Available")),
on(SectorFinalizeFailed{}, FinalizeFailed),
),
@ -212,13 +221,16 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryWaitSeed{}, WaitSeed),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
ComputeProofFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
RemoteCommitFailed: planOne(
on(SectorRetryComputeProof{}, Committing),
),
CommitFinalizeFailed: planOne(
on(SectorRetryFinalize{}, CommitFinalize),
),
@ -232,9 +244,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
on(SectorRetrySubmitCommit{}, SubmitCommit),
on(SectorDealsExpired{}, DealsExpired),
onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
on(SectorTicketExpired{}, Removing),
onWithCB(SectorTicketExpired{}, Removing, maybeNotifyRemoteDone(false, "Removing")),
),
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
@ -457,6 +469,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
}
switch state.State {
case ReceiveSector:
return m.handleReceiveSector, processed, nil
// Happy path
case Empty:
fallthrough
@ -530,6 +545,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handlePreCommitFailed, processed, nil
case ComputeProofFailed:
return m.handleComputeProofFailed, processed, nil
case RemoteCommitFailed:
return m.handleRemoteCommitFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
@ -657,6 +674,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
return uint64(i + 1), nil
case SectorComputeProofFailed:
state.State = ComputeProofFailed
case SectorRemoteCommit1Failed, SectorRemoteCommit2Failed:
state.State = RemoteCommitFailed
case SectorSealPreCommit1Failed:
state.State = SealPreCommit1Failed
case SectorCommitFailed:
@ -720,6 +739,16 @@ func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool,
}
}
func onWithCB(mut mutator, next SectorState, cb func(info *SectorInfo)) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
cb(state)
state.State = next
return false, nil
}
}
}
// like `on`, but doesn't change state
func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
@ -796,3 +825,44 @@ func planOneOrIgnore(ts ...func() (mut mutator, next func(*SectorInfo) (more boo
return cnt, nil
}
}
// maybeNotifyRemoteDone will send sealing-done notification to the RemoteSealingDone
// if the RemoteSealingDoneEndpoint is set. If RemoteSealingDoneEndpoint is not set,
// this is no-op
func maybeNotifyRemoteDone(success bool, state string) func(*SectorInfo) {
return func(sector *SectorInfo) {
if sector.RemoteSealingDoneEndpoint == "" {
return
}
reqData := api.RemoteSealingDoneParams{
Successful: success,
State: state,
CommitMessage: sector.CommitMessage,
}
reqBody, err := json.Marshal(&reqData)
if err != nil {
log.Errorf("marshaling remote done notification request params: %s", err)
return
}
req, err := http.NewRequest("POST", sector.RemoteSealingDoneEndpoint, bytes.NewReader(reqBody))
if err != nil {
log.Errorf("creating new remote done notification request: %s", err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Errorf("sending remote done notification: %s", err)
return
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
log.Errorf("remote done notification received non-200 http response %s", resp.Status)
return
}
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -87,7 +88,7 @@ func (evt SectorAddPiece) apply(state *SectorInfo) {
}
type SectorPieceAdded struct {
NewPieces []Piece
NewPieces []api.SectorPiece
}
func (evt SectorPieceAdded) apply(state *SectorInfo) {
@ -113,7 +114,7 @@ type SectorPacked struct{ FillerPieces []abi.PieceInfo }
func (evt SectorPacked) apply(state *SectorInfo) {
for idx := range evt.FillerPieces {
state.Pieces = append(state.Pieces, Piece{
state.Pieces = append(state.Pieces, api.SectorPiece{
Piece: evt.FillerPieces[idx],
DealInfo: nil, // filler pieces don't have deals associated with them
})
@ -217,6 +218,16 @@ func (evt SectorSeedReady) apply(state *SectorInfo) {
state.SeedValue = evt.SeedValue
}
type SectorRemoteCommit1Failed struct{ error }
func (evt SectorRemoteCommit1Failed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoteCommit1Failed) apply(*SectorInfo) {}
type SectorRemoteCommit2Failed struct{ error }
func (evt SectorRemoteCommit2Failed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoteCommit2Failed) apply(*SectorInfo) {}
type SectorComputeProofFailed struct{ error }
func (evt SectorComputeProofFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
@ -513,6 +524,9 @@ func (evt SectorTerminateFailed) apply(*SectorInfo) {}
type SectorRemove struct{}
func (evt SectorRemove) applyGlobal(state *SectorInfo) bool {
// because this event is global we need to send the notification here instead through an fsm callback
maybeNotifyRemoteDone(false, "Removing")(state)
state.State = Removing
return true
}
@ -525,3 +539,15 @@ type SectorRemoveFailed struct{ error }
func (evt SectorRemoveFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoveFailed) apply(*SectorInfo) {}
type SectorReceive struct {
State SectorInfo
}
func (evt SectorReceive) apply(state *SectorInfo) {
*state = evt.State
}
type SectorReceived struct{}
func (evt SectorReceived) apply(state *SectorInfo) {}

View File

@ -11,7 +11,6 @@ import (
func main() {
err := gen.WriteMapEncodersToFile("./cbor_gen.go", "sealing",
sealing.Piece{},
sealing.SectorInfo{},
sealing.Log{},
)

View File

@ -234,7 +234,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
}
pieceSizes = append(pieceSizes, p.Unpadded())
res.NewPieces = append(res.NewPieces, Piece{
res.NewPieces = append(res.NewPieces, api.SectorPiece{
Piece: ppi,
})
}
@ -262,7 +262,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
offset += deal.size
pieceSizes = append(pieceSizes, deal.size)
res.NewPieces = append(res.NewPieces, Piece{
res.NewPieces = append(res.NewPieces, api.SectorPiece{
Piece: ppi,
DealInfo: &deal.deal,
})

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v8/miner"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
@ -16,7 +17,7 @@ import (
)
type PreCommitPolicy interface {
Expiration(ctx context.Context, ps ...Piece) (abi.ChainEpoch, error)
Expiration(ctx context.Context, ps ...api.SectorPiece) (abi.ChainEpoch, error)
}
type Chain interface {
@ -59,7 +60,7 @@ func NewBasicPreCommitPolicy(api Chain, cfgGetter dtypes.GetSealingConfigFunc, p
// Expiration produces the pre-commit sector expiration epoch for an encoded
// replica containing the provided enumeration of pieces and deals.
func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi.ChainEpoch, error) {
func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...api.SectorPiece) (abi.ChainEpoch, error) {
ts, err := p.api.ChainHead(ctx)
if err != nil {
return 0, err

View File

@ -95,7 +95,7 @@ func TestBasicPolicyMostConstrictiveSchedule(t *testing.T) {
h: abi.ChainEpoch(55),
}, cfg, 2)
longestDealEpochEnd := abi.ChainEpoch(547300)
pieces := []pipeline.Piece{
pieces := []api.SectorPiece{
{
Piece: abi.PieceInfo{
Size: abi.PaddedPieceSize(1024),
@ -136,7 +136,7 @@ func TestBasicPolicyIgnoresExistingScheduleIfExpired(t *testing.T) {
h: abi.ChainEpoch(55),
}, cfg, 0)
pieces := []pipeline.Piece{
pieces := []api.SectorPiece{
{
Piece: abi.PieceInfo{
Size: abi.PaddedPieceSize(1024),
@ -165,7 +165,7 @@ func TestMissingDealIsIgnored(t *testing.T) {
h: abi.ChainEpoch(55),
}, cfg, 0)
pieces := []pipeline.Piece{
pieces := []api.SectorPiece{
{
Piece: abi.PieceInfo{
Size: abi.PaddedPieceSize(1024),

304
storage/pipeline/receive.go Normal file
View File

@ -0,0 +1,304 @@
package sealing
import (
"bytes"
"context"
"net/url"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/multiformats/go-multihash"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error {
m.inputLk.Lock()
defer m.inputLk.Unlock()
si, err := m.checkSectorMeta(ctx, meta)
if err != nil {
return err
}
exists, err := m.sectors.Has(uint64(meta.Sector.Number))
if err != nil {
return xerrors.Errorf("checking if sector exists: %w", err)
}
if exists {
return xerrors.Errorf("sector %d state already exists", meta.Sector.Number)
}
err = m.sectors.Send(uint64(meta.Sector.Number), SectorReceive{
State: si,
})
if err != nil {
return xerrors.Errorf("receiving sector: %w", err)
}
return nil
}
func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) (SectorInfo, error) {
{
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
panic(err)
}
if meta.Sector.Miner != abi.ActorID(mid) {
return SectorInfo{}, xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner)
}
}
{
// initial sanity check, doesn't prevent races
_, err := m.GetSectorInfo(meta.Sector.Number)
if err != nil && !xerrors.Is(err, datastore.ErrNotFound) {
return SectorInfo{}, err
}
if err == nil {
return SectorInfo{}, xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number)
}
}
{
spt, err := m.currentSealProof(ctx)
if err != nil {
return SectorInfo{}, err
}
if meta.Type != spt {
return SectorInfo{}, xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt)
}
}
ts, err := m.Api.ChainHead(ctx)
if err != nil {
return SectorInfo{}, xerrors.Errorf("getting chain head: %w", err)
}
var info SectorInfo
var validatePoRep bool
switch SectorState(meta.State) {
case Proving, Available:
if meta.CommitMessage != nil {
if err := checkMessagePrefix(*meta.CommitMessage); err != nil {
return SectorInfo{}, xerrors.Errorf("commit message prefix: %w", err)
}
info.CommitMessage = meta.CommitMessage
}
fallthrough
case SubmitCommit:
if meta.PreCommitDeposit == nil {
return SectorInfo{}, xerrors.Errorf("sector PreCommitDeposit was null")
}
info.PreCommitDeposit = *meta.PreCommitDeposit
info.PreCommitTipSet = meta.PreCommitTipSet
if info.PreCommitMessage != nil {
if err := checkMessagePrefix(*meta.PreCommitMessage); err != nil {
return SectorInfo{}, xerrors.Errorf("commit message prefix: %w", err)
}
info.PreCommitMessage = meta.PreCommitMessage
}
// check provided seed
if len(meta.SeedValue) != abi.RandomnessLength {
return SectorInfo{}, xerrors.Errorf("seed randomness had wrong length %d", len(meta.SeedValue))
}
maddrBuf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(maddrBuf); err != nil {
return SectorInfo{}, xerrors.Errorf("marshal miner address for seed check: %w", err)
}
rand, err := m.Api.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, meta.SeedEpoch, maddrBuf.Bytes(), ts.Key())
if err != nil {
return SectorInfo{}, xerrors.Errorf("generating check seed: %w", err)
}
if !bytes.Equal(rand, meta.SeedValue) {
return SectorInfo{}, xerrors.Errorf("provided(%x) and generated(%x) seeds differ", meta.SeedValue, rand)
}
info.SeedValue = meta.SeedValue
info.SeedEpoch = meta.SeedEpoch
info.Proof = meta.CommitProof
validatePoRep = true
fallthrough
case PreCommitting:
// check provided ticket
if len(meta.TicketValue) != abi.RandomnessLength {
return SectorInfo{}, xerrors.Errorf("ticket randomness had wrong length %d", len(meta.TicketValue))
}
maddrBuf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(maddrBuf); err != nil {
return SectorInfo{}, xerrors.Errorf("marshal miner address for ticket check: %w", err)
}
rand, err := m.Api.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, meta.TicketEpoch, maddrBuf.Bytes(), ts.Key())
if err != nil {
return SectorInfo{}, xerrors.Errorf("generating check ticket: %w", err)
}
if !bytes.Equal(rand, meta.TicketValue) {
return SectorInfo{}, xerrors.Errorf("provided(%x) and generated(%x) tickets differ", meta.TicketValue, rand)
}
info.TicketValue = meta.TicketValue
info.TicketEpoch = meta.TicketEpoch
info.PreCommit1Out = meta.PreCommit1Out
// check CommD/R
if meta.CommD == nil || meta.CommR == nil {
return SectorInfo{}, xerrors.Errorf("both CommR/CommD cids need to be set for sectors in PreCommitting and later states")
}
dp := meta.CommD.Prefix()
if dp.Version != 1 || dp.Codec != cid.FilCommitmentUnsealed || dp.MhType != multihash.SHA2_256_TRUNC254_PADDED || dp.MhLength != 32 {
return SectorInfo{}, xerrors.Errorf("CommD cid has wrong prefix")
}
rp := meta.CommR.Prefix()
if rp.Version != 1 || rp.Codec != cid.FilCommitmentSealed || rp.MhType != multihash.POSEIDON_BLS12_381_A1_FC1 || rp.MhLength != 32 {
return SectorInfo{}, xerrors.Errorf("CommR cid has wrong prefix")
}
info.CommD = meta.CommD
info.CommR = meta.CommR
if meta.DataSealed == nil {
return SectorInfo{}, xerrors.Errorf("expected DataSealed to be set")
}
if meta.DataCache == nil {
return SectorInfo{}, xerrors.Errorf("expected DataCache to be set")
}
info.RemoteDataSealed = meta.DataSealed // todo make head requests to check?
info.RemoteDataCache = meta.DataCache
if meta.RemoteCommit1Endpoint != "" {
// validate the url
if _, err := url.Parse(meta.RemoteCommit1Endpoint); err != nil {
return SectorInfo{}, xerrors.Errorf("parsing remote c1 endpoint url: %w", err)
}
info.RemoteCommit1Endpoint = meta.RemoteCommit1Endpoint
}
if meta.RemoteCommit2Endpoint != "" {
// validate the url
if _, err := url.Parse(meta.RemoteCommit2Endpoint); err != nil {
return SectorInfo{}, xerrors.Errorf("parsing remote c2 endpoint url: %w", err)
}
info.RemoteCommit2Endpoint = meta.RemoteCommit2Endpoint
}
// If we get a sector after PC2, and remote C1 endpoint is set, assume that we're getting finalized sector data
if info.RemoteCommit1Endpoint != "" {
info.RemoteDataFinalized = true
}
fallthrough
case GetTicket, Packing:
info.Return = ReturnState(meta.State)
info.State = ReceiveSector
info.SectorNumber = meta.Sector.Number
info.Pieces = meta.Pieces
info.SectorType = meta.Type
if meta.RemoteSealingDoneEndpoint != "" {
// validate the url
if _, err := url.Parse(meta.RemoteSealingDoneEndpoint); err != nil {
return SectorInfo{}, xerrors.Errorf("parsing remote sealing-done endpoint url: %w", err)
}
info.RemoteSealingDoneEndpoint = meta.RemoteSealingDoneEndpoint
}
if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil {
return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err)
}
if meta.DataUnsealed == nil {
return SectorInfo{}, xerrors.Errorf("expected DataUnsealed to be set")
}
info.RemoteDataUnsealed = meta.DataUnsealed
// some late checks which require previous checks
if validatePoRep {
ok, err := m.verif.VerifySeal(proof.SealVerifyInfo{
SealProof: meta.Type,
SectorID: meta.Sector,
DealIDs: nil,
Randomness: meta.TicketValue,
InteractiveRandomness: meta.SeedValue,
Proof: meta.CommitProof,
SealedCID: *meta.CommR,
UnsealedCID: *meta.CommD,
})
if err != nil {
return SectorInfo{}, xerrors.Errorf("validating seal proof: %w", err)
}
if !ok {
return SectorInfo{}, xerrors.Errorf("seal proof invalid")
}
}
return info, nil
default:
return SectorInfo{}, xerrors.Errorf("imported sector State in not supported")
}
}
func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error {
toFetch := map[storiface.SectorFileType]storiface.SectorLocation{}
for fileType, data := range map[storiface.SectorFileType]*storiface.SectorLocation{
storiface.FTUnsealed: sector.RemoteDataUnsealed,
storiface.FTSealed: sector.RemoteDataSealed,
storiface.FTCache: sector.RemoteDataCache,
} {
if data == nil {
continue
}
if data.Local {
// todo check exists
continue
}
toFetch[fileType] = *data
}
if len(toFetch) > 0 {
if err := m.sealer.DownloadSectorData(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.RemoteDataFinalized, toFetch); err != nil {
return xerrors.Errorf("downloading sector data: %w", err) // todo send err event
}
}
// todo data checks?
return ctx.Send(SectorReceived{})
}
func checkMessagePrefix(c cid.Cid) error {
p := c.Prefix()
if p.Version != 1 || p.MhLength != 32 || p.MhType != multihash.BLAKE2B_MIN+31 || p.Codec != cid.DagCBOR {
return xerrors.New("invalid message prefix")
}
return nil
}

View File

@ -31,6 +31,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
SealPreCommit2Failed: {},
PreCommitFailed: {},
ComputeProofFailed: {},
RemoteCommitFailed: {},
CommitFailed: {},
PackingFailed: {},
FinalizeFailed: {},
@ -63,6 +64,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
ReleaseSectorKeyFailed: {},
FinalizeReplicaUpdateFailed: {},
AbortUpgrade: {},
ReceiveSector: {},
}
// cmd/lotus-miner/info.go defines CLI colors corresponding to these states
@ -113,6 +115,9 @@ const (
UpdateActivating SectorState = "UpdateActivating"
ReleaseSectorKey SectorState = "ReleaseSectorKey"
// external import
ReceiveSector SectorState = "ReceiveSector"
// error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable"
AddPieceFailed SectorState = "AddPieceFailed"
@ -120,6 +125,7 @@ const (
SealPreCommit2Failed SectorState = "SealPreCommit2Failed"
PreCommitFailed SectorState = "PreCommitFailed"
ComputeProofFailed SectorState = "ComputeProofFailed"
RemoteCommitFailed SectorState = "RemoteCommitFailed"
CommitFailed SectorState = "CommitFailed"
PackingFailed SectorState = "PackingFailed" // TODO: deprecated, remove
FinalizeFailed SectorState = "FinalizeFailed"
@ -153,7 +159,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece, AddPieceFailed, SnapDealsWaitDeals, SnapDealsAddPiece:
return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate:
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate, ReceiveSector:
return sstSealing
case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, SubmitReplicaUpdate, ReplicaUpdateWait:
if finEarly {

View File

@ -19,12 +19,12 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
const minRetryTime = 1 * time.Minute
var MinRetryTime = 1 * time.Minute
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Exponential backoff when we see consecutive failures
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(MinRetryTime)
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
log.Infof("%s(%d), waiting %s before retrying", sector.State, sector.SectorNumber, time.Until(retryStart))
select {
@ -184,6 +184,18 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
return ctx.Send(SectorRetryComputeProof{})
}
func (m *Sealing) handleRemoteCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := failedCooldown(ctx, sector); err != nil {
return err
}
if sector.InvalidProofs > 1 {
log.Errorw("consecutive remote commit fails", "sector", sector.SectorNumber, "c1url", sector.RemoteCommit1Endpoint, "c2url", sector.RemoteCommit2Endpoint)
}
return ctx.Send(SectorRetryComputeProof{})
}
func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := failedCooldown(ctx, sector); err != nil {
return err

View File

@ -76,7 +76,7 @@ func TestStateRecoverDealIDs(t *testing.T) {
// TODO sctx should satisfy an interface so it can be useable for mocking. This will fail because we are passing in an empty context now to get this to build.
// https://github.com/filecoin-project/lotus/issues/7867
err := fakeSealing.HandleRecoverDealIDs(statemachine.Context{}, pipeline.SectorInfo{
Pieces: []pipeline.Piece{
Pieces: []api2.SectorPiece{
{
DealInfo: &api2.PieceDealInfo{
DealID: dealId,

View File

@ -26,7 +26,7 @@ func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorAbortUpgrade{xerrors.New("sector had no deals")})
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}
out, err := m.sealer.ReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.pieceInfos())
@ -66,7 +66,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
return ctx.Send(SectorProveReplicaUpdateFailed{xerrors.Errorf("prove replica update (1) failed: %w", err)})
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}

View File

@ -3,6 +3,9 @@ package sealing
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -207,7 +210,7 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e
}
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, false); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
@ -569,18 +572,99 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
}
cids := storiface.SectorCids{
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
}
c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)})
var c2in storiface.Commit1Out
if sector.RemoteCommit1Endpoint == "" {
// Local Commit1
cids := storiface.SectorCids{
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
}
c2in, err = m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)})
}
} else {
// Remote Commit1
reqData := api.RemoteCommit1Params{
Ticket: sector.TicketValue,
Seed: sector.SeedValue,
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
ProofType: sector.SectorType,
}
reqBody, err := json.Marshal(&reqData)
if err != nil {
return xerrors.Errorf("marshaling remote commit1 request: %w", err)
}
req, err := http.NewRequest("POST", sector.RemoteCommit1Endpoint, bytes.NewReader(reqBody))
if err != nil {
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("creating new remote commit1 request: %w", err)})
}
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx.Context())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("requesting remote commit1: %w", err)})
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("remote commit1 received non-200 http response %s", resp.Status)})
}
c2in, err = io.ReadAll(resp.Body) // todo some len constraint
if err != nil {
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("reading commit1 response: %w", err)})
}
}
proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
var porepProof storiface.Proof
if sector.RemoteCommit2Endpoint == "" {
// Local Commit2
porepProof, err = m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}
} else {
// Remote Commit2
reqData := api.RemoteCommit2Params{
ProofType: sector.SectorType,
Sector: m.minerSectorID(sector.SectorNumber),
Commit1Out: c2in,
}
reqBody, err := json.Marshal(&reqData)
if err != nil {
return xerrors.Errorf("marshaling remote commit2 request: %w", err)
}
req, err := http.NewRequest("POST", sector.RemoteCommit2Endpoint, bytes.NewReader(reqBody))
if err != nil {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("creating new remote commit2 request: %w", err)})
}
req.Header.Set("Content-Type", "application/json")
req = req.WithContext(ctx.Context())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("requesting remote commit2: %w", err)})
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("remote commit2 received non-200 http response %s", resp.Status)})
}
porepProof, err = io.ReadAll(resp.Body) // todo some len constraint
if err != nil {
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("reading commit2 response: %w", err)})
}
}
{
@ -590,19 +674,19 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, proof, ts.Key()); err != nil {
if err := m.checkCommit(ctx.Context(), sector, porepProof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
}
if cfg.FinalizeEarly {
return ctx.Send(SectorProofReady{
Proof: proof,
Proof: porepProof,
})
}
return ctx.Send(SectorCommitted{
Proof: proof,
Proof: porepProof,
})
}

View File

@ -22,18 +22,6 @@ type Context interface {
Send(evt interface{}) error
}
// Piece is a tuple of piece and deal info
type PieceWithDealInfo struct {
Piece abi.PieceInfo
DealInfo api.PieceDealInfo
}
// Piece is a tuple of piece info and optional deal
type Piece struct {
Piece abi.PieceInfo
DealInfo *api.PieceDealInfo // nil for pieces which do not appear in deals (e.g. filler pieces)
}
type Log struct {
Timestamp uint64
Trace string // for errors
@ -61,7 +49,7 @@ type SectorInfo struct {
// Packing
CreationTime int64 // unix seconds
Pieces []Piece
Pieces []api.SectorPiece
// PreCommit1
TicketValue abi.SealRandomness
@ -89,7 +77,7 @@ type SectorInfo struct {
// CCUpdate
CCUpdate bool
CCPieces []Piece
CCPieces []api.SectorPiece
UpdateSealed *cid.Cid
UpdateUnsealed *cid.Cid
ReplicaUpdateProof storiface.ReplicaUpdateProof
@ -98,13 +86,22 @@ type SectorInfo struct {
// Faults
FaultReportMsg *cid.Cid
// Recovery
// Recovery / Import
Return ReturnState
// Termination
TerminateMessage *cid.Cid
TerminatedAt abi.ChainEpoch
// Remote import
RemoteDataUnsealed *storiface.SectorLocation
RemoteDataSealed *storiface.SectorLocation
RemoteDataCache *storiface.SectorLocation
RemoteCommit1Endpoint string
RemoteCommit2Endpoint string
RemoteSealingDoneEndpoint string
RemoteDataFinalized bool
// Debug
LastErr string
@ -161,7 +158,7 @@ func (t *SectorInfo) sealingCtx(ctx context.Context) context.Context {
// Returns list of offset/length tuples of sector data ranges which clients
// requested to keep unsealed
func (t *SectorInfo) keepUnsealedRanges(pieces []Piece, invert, alwaysKeep bool) []storiface.Range {
func (t *SectorInfo) keepUnsealedRanges(pieces []api.SectorPiece, invert, alwaysKeep bool) []storiface.Range {
var out []storiface.Range
var at abi.UnpaddedPieceSize

View File

@ -43,7 +43,7 @@ func TestSectorInfoSerialization(t *testing.T) {
si := &SectorInfo{
State: "stateful",
SectorNumber: 234,
Pieces: []Piece{{
Pieces: []api.SectorPiece{{
Piece: abi.PieceInfo{
Size: 5,
PieceCID: dummyCid,

View File

@ -11,9 +11,12 @@ import (
"encoding/base64"
"encoding/json"
"io"
"io/ioutil"
"math/bits"
"os"
"path/filepath"
"runtime"
"syscall"
"github.com/detailyang/go-fallocate"
"github.com/ipfs/go-cid"
@ -28,6 +31,7 @@ import (
"github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/lotus/lib/nullreader"
spaths "github.com/filecoin-project/lotus/storage/paths"
nr "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/fr32"
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
@ -1068,6 +1072,44 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storiface.SectorRef
return ffi.ClearCache(uint64(ssize), paths.Cache)
}
// FinalizeSectorInto is like FinalizeSector, but writes finalized sector cache into a new path
func (sb *Sealer) FinalizeSectorInto(ctx context.Context, sector storiface.SectorRef, dest string) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
files, err := ioutil.ReadDir(paths.Cache)
if err != nil {
return err
}
for _, file := range files {
if file.Name() != "t_aux" && file.Name() != "p_aux" {
// link all the non-aux files
if err := syscall.Link(filepath.Join(paths.Cache, file.Name()), filepath.Join(dest, file.Name())); err != nil {
return xerrors.Errorf("link %s: %w", file.Name(), err)
}
continue
}
d, err := os.ReadFile(filepath.Join(paths.Cache, file.Name()))
if err != nil {
return xerrors.Errorf("read %s: %w", file.Name(), err)
}
if err := os.WriteFile(filepath.Join(dest, file.Name()), d, 0666); err != nil {
return xerrors.Errorf("write %s: %w", file.Name(), err)
}
}
return ffi.ClearCache(uint64(ssize), dest)
}
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
@ -1127,6 +1169,39 @@ func (sb *Sealer) Remove(ctx context.Context, sector storiface.SectorRef) error
return xerrors.Errorf("not supported at this layer") // happens in localworker
}
func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
var todo storiface.SectorFileType
for fileType := range src {
todo |= fileType
}
ptype := storiface.PathSealing
if finalized {
ptype = storiface.PathStorage
}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, todo, ptype)
if err != nil {
return xerrors.Errorf("failed to acquire sector paths: %w", err)
}
defer done()
for fileType, data := range src {
out := storiface.PathByType(paths, fileType)
if data.Local {
return xerrors.Errorf("sector(%v) with local data (%#v) requested in DownloadSectorData", sector, data)
}
_, err := spaths.FetchWithTemp(ctx, []string{data.URL}, out, data.HttpHeaders())
if err != nil {
return xerrors.Errorf("downloading sector data: %w", err)
}
}
return nil
}
func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
padPieces := make([]abi.PaddedPieceSize, 0)

View File

@ -5,6 +5,7 @@ import (
"errors"
"io"
"net/http"
"sort"
"sync"
"github.com/google/uuid"
@ -107,6 +108,7 @@ type Config struct {
ParallelFetchLimit int
// Local worker config
AllowSectorDownload bool
AllowAddPiece bool
AllowPreCommit1 bool
AllowPreCommit2 bool
@ -181,6 +183,9 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
localTasks := []sealtasks.TaskType{
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
}
if sc.AllowSectorDownload {
localTasks = append(localTasks, sealtasks.TTDownloadSector)
}
if sc.AllowAddPiece {
localTasks = append(localTasks, sealtasks.TTAddPiece, sealtasks.TTDataCid)
}
@ -1084,6 +1089,78 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storiface.Sect
return out, waitErr
}
func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var toFetch storiface.SectorFileType
// get a sorted list of sectors files to make a consistent work key from
ents := make([]struct {
T storiface.SectorFileType
S storiface.SectorLocation
}, 0, len(src))
for fileType, data := range src {
if len(fileType.AllSet()) != 1 {
return xerrors.Errorf("sector data entry must be for a single file type")
}
toFetch |= fileType
ents = append(ents, struct {
T storiface.SectorFileType
S storiface.SectorLocation
}{T: fileType, S: data})
}
sort.Slice(ents, func(i, j int) bool {
return ents[i].T < ents[j].T
})
// get a work key
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTDownloadSector, sector, ents)
if err != nil {
return xerrors.Errorf("getWork: %w", err)
}
defer cancel()
var waitErr error
waitRes := func() {
_, werr := m.waitWork(ctx, wk)
if werr != nil {
waitErr = werr
return
}
}
if wait { // already in progress
waitRes()
return waitErr
}
ptype := storiface.PathSealing
if finalized {
ptype = storiface.PathStorage
}
selector := newAllocSelector(m.index, toFetch, ptype)
err = m.sched.Schedule(ctx, sector, sealtasks.TTDownloadSector, selector, schedNop, func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, w, wk)(w.DownloadSectorData(ctx, sector, finalized, src))
if err != nil {
return err
}
waitRes()
return nil
})
if err != nil {
return err
}
return waitErr
}
func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(ctx, callID, pi, err)
}
@ -1148,6 +1225,10 @@ func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID,
return m.returnResult(ctx, callID, ok, err)
}
func (m *Manager) ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}

View File

@ -517,6 +517,10 @@ func (mgr *SectorMgr) ReleaseSectorKey(ctx context.Context, sector storiface.Sec
return nil
}
func (mgr *SectorMgr) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
return xerrors.Errorf("not supported")
}
func (mgr *SectorMgr) Remove(ctx context.Context, sector storiface.SectorRef) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
@ -613,6 +617,10 @@ func (mgr *SectorMgr) ReturnFinalizeReplicaUpdate(ctx context.Context, callID st
panic("not supported")
}
func (mgr *SectorMgr) ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (m mockVerifProver) VerifySeal(svi prooftypes.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -67,6 +67,10 @@ type schedTestWorker struct {
ignoreResources bool
}
func (s *schedTestWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -31,6 +31,8 @@ const (
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTFinalizeReplicaUpdate TaskType = "seal/v0/finalize/replicaupdate"
TTDownloadSector TaskType = "seal/v0/download/sector"
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
)
@ -48,11 +50,12 @@ var order = map[TaskType]int{
TTCommit1: 2,
TTUnseal: 1,
TTFetch: -1,
TTFinalize: -2,
TTFetch: -1,
TTDownloadSector: -2,
TTFinalize: -3,
TTGenerateWindowPoSt: -3,
TTGenerateWinningPoSt: -4, // most priority
TTGenerateWindowPoSt: -4,
TTGenerateWinningPoSt: -5, // most priority
}
var shortNames = map[TaskType]string{
@ -75,6 +78,8 @@ var shortNames = map[TaskType]string{
TTRegenSectorKey: "GSK",
TTFinalizeReplicaUpdate: "FRU",
TTDownloadSector: "DL",
TTGenerateWindowPoSt: "WDP",
TTGenerateWinningPoSt: "WNP",
}

View File

@ -153,3 +153,315 @@ func (t *CallID) UnmarshalCBOR(r io.Reader) (err error) {
return nil
}
func (t *SecDataHttpHeader) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{162}); err != nil {
return err
}
// t.Key (string) (string)
if len("Key") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Key\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Key"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Key")); err != nil {
return err
}
if len(t.Key) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Key was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Key))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Key)); err != nil {
return err
}
// t.Value (string) (string)
if len("Value") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Value\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Value"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Value")); err != nil {
return err
}
if len(t.Value) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Value was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Value))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.Value)); err != nil {
return err
}
return nil
}
func (t *SecDataHttpHeader) UnmarshalCBOR(r io.Reader) (err error) {
*t = SecDataHttpHeader{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("SecDataHttpHeader: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Key (string) (string)
case "Key":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.Key = string(sval)
}
// t.Value (string) (string)
case "Value":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.Value = string(sval)
}
default:
// Field doesn't exist on this type, so ignore it
cbg.ScanForLinks(r, func(cid.Cid) {})
}
}
return nil
}
func (t *SectorLocation) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{163}); err != nil {
return err
}
// t.Local (bool) (bool)
if len("Local") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Local\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Local"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Local")); err != nil {
return err
}
if err := cbg.WriteBool(w, t.Local); err != nil {
return err
}
// t.URL (string) (string)
if len("URL") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"URL\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("URL"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("URL")); err != nil {
return err
}
if len(t.URL) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.URL was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.URL))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.URL)); err != nil {
return err
}
// t.Headers ([]storiface.SecDataHttpHeader) (slice)
if len("Headers") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Headers\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("Headers"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Headers")); err != nil {
return err
}
if len(t.Headers) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Headers was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Headers))); err != nil {
return err
}
for _, v := range t.Headers {
if err := v.MarshalCBOR(cw); err != nil {
return err
}
}
return nil
}
func (t *SectorLocation) UnmarshalCBOR(r io.Reader) (err error) {
*t = SectorLocation{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("SectorLocation: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Local (bool) (bool)
case "Local":
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajOther {
return fmt.Errorf("booleans must be major type 7")
}
switch extra {
case 20:
t.Local = false
case 21:
t.Local = true
default:
return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra)
}
// t.URL (string) (string)
case "URL":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.URL = string(sval)
}
// t.Headers ([]storiface.SecDataHttpHeader) (slice)
case "Headers":
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Headers: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Headers = make([]SecDataHttpHeader, extra)
}
for i := 0; i < int(extra); i++ {
var v SecDataHttpHeader
if err := v.UnmarshalCBOR(cr); err != nil {
return err
}
t.Headers[i] = v
}
default:
// Field doesn't exist on this type, so ignore it
cbg.ScanForLinks(r, func(cid.Cid) {})
}
}
return nil
}

View File

@ -3,6 +3,7 @@ package storiface
import (
"context"
"io"
"net/http"
"github.com/ipfs/go-cid"
@ -85,6 +86,8 @@ type Sealer interface {
GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) error
}
type Unsealer interface {
@ -119,3 +122,34 @@ type Prover interface {
AggregateSealProofs(aggregateInfo proof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error)
}
type SectorLocation struct {
// Local when set to true indicates to lotus that sector data is already
// available locally; When set lotus will skip fetching sector data, and
// only check that sector data exists in sector storage
Local bool
// URL to the sector data
// For sealed/unsealed sector, lotus expects octet-stream
// For cache, lotus expects a tar archive with cache files
// Valid schemas:
// - http:// / https://
URL string
// optional http headers to use when requesting sector data
Headers []SecDataHttpHeader
}
func (sd *SectorLocation) HttpHeaders() http.Header {
out := http.Header{}
for _, header := range sd.Headers {
out[header.Key] = append(out[header.Key], header.Value)
}
return out
}
// note: we can't use http.Header as that's backed by a go map, which is all kinds of messy
type SecDataHttpHeader struct {
Key string
Value string
}

View File

@ -134,6 +134,7 @@ type WorkerCalls interface {
MoveStorage(ctx context.Context, sector SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) (CallID, error)
// sync
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error)
@ -215,5 +216,6 @@ type WorkerReturn interface {
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnDownloadSector(ctx context.Context, callID CallID, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
}

View File

@ -5,6 +5,7 @@ import (
"io"
"os"
"path/filepath"
"strings"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
@ -12,38 +13,110 @@ import (
var log = logging.Logger("tarutil") // nolint
func ExtractTar(body io.Reader, dir string, buf []byte) error {
var CacheFileConstraints = map[string]int64{
"p_aux": 64,
"t_aux": 10240,
"sc-02-data-tree-r-last.dat": 10_000_000, // small sectors
"sc-02-data-tree-r-last-0.dat": 10_000_000,
"sc-02-data-tree-r-last-1.dat": 10_000_000,
"sc-02-data-tree-r-last-2.dat": 10_000_000,
"sc-02-data-tree-r-last-3.dat": 10_000_000,
"sc-02-data-tree-r-last-4.dat": 10_000_000,
"sc-02-data-tree-r-last-5.dat": 10_000_000,
"sc-02-data-tree-r-last-6.dat": 10_000_000,
"sc-02-data-tree-r-last-7.dat": 10_000_000,
"sc-02-data-tree-r-last-8.dat": 10_000_000,
"sc-02-data-tree-r-last-9.dat": 10_000_000,
"sc-02-data-tree-r-last-10.dat": 10_000_000,
"sc-02-data-tree-r-last-11.dat": 10_000_000,
"sc-02-data-tree-r-last-12.dat": 10_000_000,
"sc-02-data-tree-r-last-13.dat": 10_000_000,
"sc-02-data-tree-r-last-14.dat": 10_000_000,
"sc-02-data-tree-r-last-15.dat": 10_000_000,
"sc-02-data-layer-1.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-2.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-3.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-4.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-5.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-6.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-7.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-8.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-9.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-10.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-layer-11.dat": 65 << 30, // 1x sector size + small buffer
"sc-02-data-tree-c-0.dat": 5 << 30, // ~4.6G
"sc-02-data-tree-c-1.dat": 5 << 30,
"sc-02-data-tree-c-2.dat": 5 << 30,
"sc-02-data-tree-c-3.dat": 5 << 30,
"sc-02-data-tree-c-4.dat": 5 << 30,
"sc-02-data-tree-c-5.dat": 5 << 30,
"sc-02-data-tree-c-6.dat": 5 << 30,
"sc-02-data-tree-c-7.dat": 5 << 30,
"sc-02-data-tree-c-8.dat": 5 << 30,
"sc-02-data-tree-c-9.dat": 5 << 30,
"sc-02-data-tree-c-10.dat": 5 << 30,
"sc-02-data-tree-c-11.dat": 5 << 30,
"sc-02-data-tree-c-12.dat": 5 << 30,
"sc-02-data-tree-c-13.dat": 5 << 30,
"sc-02-data-tree-c-14.dat": 5 << 30,
"sc-02-data-tree-c-15.dat": 5 << 30,
"sc-02-data-tree-d.dat": 130 << 30, // 2x sector size, ~130G accunting for small buffer on 64G sectors
}
func ExtractTar(body io.Reader, dir string, buf []byte) (int64, error) {
if err := os.MkdirAll(dir, 0755); err != nil { // nolint
return xerrors.Errorf("mkdir: %w", err)
return 0, xerrors.Errorf("mkdir: %w", err)
}
tr := tar.NewReader(body)
var read int64
for {
header, err := tr.Next()
switch err {
default:
return err
return read, err
case io.EOF:
return nil
return read, nil
case nil:
}
//nolint:gosec
f, err := os.Create(filepath.Join(dir, header.Name))
if err != nil {
//nolint:gosec
return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
sz, found := CacheFileConstraints[header.Name]
if !found {
return read, xerrors.Errorf("tar file %#v isn't expected")
}
if header.Size > sz {
return read, xerrors.Errorf("tar file %#v is bigger than expected: %d > %d", header.Name, header.Size, sz)
}
// This data is coming from a trusted source, no need to check the size.
//nolint:gosec
if _, err := io.CopyBuffer(f, tr, buf); err != nil {
return err
out := filepath.Join(dir, header.Name) //nolint:gosec
if !strings.HasPrefix(out, filepath.Clean(dir)) {
return read, xerrors.Errorf("unsafe tar path %#v (must be within %#v)", out, filepath.Clean(dir))
}
f, err := os.Create(out)
if err != nil {
return read, xerrors.Errorf("creating file %s: %w", out, err)
}
ltr := io.LimitReader(tr, header.Size)
r, err := io.CopyBuffer(f, ltr, buf)
read += r
if err != nil {
return read, err
}
if err := f.Close(); err != nil {
return err
return read, err
}
}
}

View File

@ -21,6 +21,10 @@ type testExec struct {
apch chan chan apres
}
func (t *testExec) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
panic("implement me")
}
func (t *testExec) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) {
panic("implement me")
}

View File

@ -202,6 +202,7 @@ const (
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
DownloadSector ReturnType = "DownloadSector"
Fetch ReturnType = "Fetch"
)
@ -255,6 +256,7 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac
FinalizeReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnFinalizeReplicaUpdate),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
DownloadSector: rfunc(storiface.WorkerReturn.ReturnDownloadSector),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
@ -586,6 +588,17 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe
})
}
func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, DownloadSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return nil, sb.DownloadSectorData(ctx, sector, finalized, src)
})
}
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
sb, err := l.executor()
if err != nil {