diff --git a/api/api_storage.go b/api/api_storage.go index e9c7aad61..02af793c9 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -3,6 +3,7 @@ package api import ( "bytes" "context" + "net/http" "time" "github.com/google/uuid" @@ -17,6 +18,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin/v8/market" "github.com/filecoin-project/go-state-types/builtin/v9/miner" abinetwork "github.com/filecoin-project/go-state-types/network" @@ -144,6 +146,8 @@ type StorageMiner interface { // SectorNumFree drops a sector reservation SectorNumFree(ctx context.Context, name string) error //perm:admin + SectorReceive(ctx context.Context, meta RemoteSectorMeta) error + // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error //perm:admin retry:true WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) //perm:admin @@ -504,3 +508,87 @@ type NumAssignerMeta struct { Next abi.SectorNumber } + +type RemoteSectorMeta struct { + //////// + // BASIC SECTOR INFORMATION + + // State specifies the first state the sector will enter after being imported + // Must be one of the following states: + // * Packing + // * GetTicket + // * PreCommitting + // * SubmitCommit + // * Proving/Available + State SectorState + + Sector abi.SectorID + Type abi.RegisteredSealProof + + //////// + // SEALING METADATA + // (allows lotus to continue the sealing process) + + // Required in Packing and later + Pieces []SectorPiece // todo better type? + + // Required in PreCommitting and later + TicketValue abi.SealRandomness + TicketEpoch abi.ChainEpoch + PreCommit1Out storiface.PreCommit1Out // todo specify better + + CommD *cid.Cid + CommR *cid.Cid // SectorKey + + // Required in SubmitCommit and later + PreCommitInfo *miner.SectorPreCommitInfo + PreCommitDeposit big.Int + PreCommitMessage *cid.Cid + PreCommitTipSet types.TipSetKey + + SeedValue abi.InteractiveSealRandomness + SeedEpoch abi.ChainEpoch + + CommitProof []byte + + // Required in Proving/Available + CommitMessage *cid.Cid + + // Optional sector metadata to import + Log []SectorLog + + //////// + // SECTOR DATA SOURCE + + // Sector urls - lotus will use those for fetching files into local storage + + // Required in all states + DataUnsealed *SectorData + + // Required in PreCommitting and later + DataSealed *SectorData + DataCache *SectorData + + //////// + // SEALING SERVICE HOOKS + + // todo Commit1Provider + // todo OnDone / OnStateChange +} + +type SectorData struct { + // Local when set to true indicates to lotus that sector data is already + // available locally; When set lotus will skip fetching sector data, and + // only check that sector data exists in sector storage + Local bool + + // URL to the sector data + // For sealed/unsealed sector, lotus expects octet-stream + // For cache, lotus expects a tar archive with cache files (todo maybe use not-tar; specify what files with what paths must be present) + // Valid schemas: + // - http:// / https:// + URL string + + // optional http headers to use when requesting sector data + Headers http.Header +} diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 40b6e6078..47192bfd9 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -6,6 +6,7 @@ import ( "go/ast" "go/parser" "go/token" + "net/http" "path/filepath" "reflect" "strings" @@ -340,6 +341,9 @@ func init() { "": bitfield.NewFromSet([]uint64{5, 6, 7, 10}), }) + addExample(http.Header{ + "Authorization": []string{"Bearer ey.."}, + }) } func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 43bd40f83..fd31166a9 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -846,6 +846,8 @@ type StorageMinerStruct struct { SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` + SectorReceive func(p0 context.Context, p1 RemoteSectorMeta) error `` + SectorRemove func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` SectorSetExpectedSealDuration func(p0 context.Context, p1 time.Duration) error `perm:"write"` @@ -5039,6 +5041,17 @@ func (s *StorageMinerStub) SectorPreCommitPending(p0 context.Context) ([]abi.Sec return *new([]abi.SectorID), ErrNotSupported } +func (s *StorageMinerStruct) SectorReceive(p0 context.Context, p1 RemoteSectorMeta) error { + if s.Internal.SectorReceive == nil { + return ErrNotSupported + } + return s.Internal.SectorReceive(p0, p1) +} + +func (s *StorageMinerStub) SectorReceive(p0 context.Context, p1 RemoteSectorMeta) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) SectorRemove(p0 context.Context, p1 abi.SectorNumber) error { if s.Internal.SectorRemove == nil { return ErrNotSupported diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index e9637ed2e..99ca3b366 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 17e767708..f0d2db3c6 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index b42a2379b..32912980b 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -150,6 +150,7 @@ * [SectorNumReserveCount](#SectorNumReserveCount) * [SectorPreCommitFlush](#SectorPreCommitFlush) * [SectorPreCommitPending](#SectorPreCommitPending) + * [SectorReceive](#SectorReceive) * [SectorRemove](#SectorRemove) * [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration) * [SectorSetSealDelay](#SectorSetSealDelay) @@ -3151,6 +3152,131 @@ Response: ] ``` +### SectorReceive +There are not yet any comments for this method. + +Perms: + +Inputs: +```json +[ + { + "State": "Proving", + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "Type": 8, + "Pieces": [ + { + "Piece": { + "Size": 1032, + "PieceCID": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + }, + "DealInfo": { + "PublishCid": null, + "DealID": 5432, + "DealProposal": { + "PieceCID": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "PieceSize": 1032, + "VerifiedDeal": true, + "Client": "f01234", + "Provider": "f01234", + "Label": "", + "StartEpoch": 10101, + "EndEpoch": 10101, + "StoragePricePerEpoch": "0", + "ProviderCollateral": "0", + "ClientCollateral": "0" + }, + "DealSchedule": { + "StartEpoch": 10101, + "EndEpoch": 10101 + }, + "KeepUnsealed": true + } + } + ], + "TicketValue": "Bw==", + "TicketEpoch": 10101, + "PreCommit1Out": "Bw==", + "CommD": null, + "CommR": null, + "PreCommitInfo": { + "SealProof": 8, + "SectorNumber": 9, + "SealedCID": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "SealRandEpoch": 10101, + "DealIDs": [ + 5432 + ], + "Expiration": 10101, + "ReplaceCapacity": true, + "ReplaceSectorDeadline": 42, + "ReplaceSectorPartition": 42, + "ReplaceSectorNumber": 9 + }, + "PreCommitDeposit": "0", + "PreCommitMessage": null, + "PreCommitTipSet": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], + "SeedValue": "Bw==", + "SeedEpoch": 10101, + "CommitProof": "Ynl0ZSBhcnJheQ==", + "CommitMessage": null, + "Log": [ + { + "Kind": "string value", + "Timestamp": 42, + "Trace": "string value", + "Message": "string value" + } + ], + "DataUnsealed": { + "Local": true, + "URL": "string value", + "Headers": { + "Authorization": [ + "Bearer ey.." + ] + } + }, + "DataSealed": { + "Local": true, + "URL": "string value", + "Headers": { + "Authorization": [ + "Bearer ey.." + ] + } + }, + "DataCache": { + "Local": true, + "URL": "string value", + "Headers": { + "Authorization": [ + "Bearer ey.." + ] + } + } + } +] +``` + +Response: `{}` + ### SectorRemove SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. diff --git a/node/impl/storminer.go b/node/impl/storminer.go index c32fba856..9f1b9af35 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -448,6 +448,10 @@ func (sm *StorageMinerAPI) SectorNumFree(ctx context.Context, name string) error return sm.Miner.NumFree(ctx, name) } +func (sm *StorageMinerAPI) SectorReceive(ctx context.Context, meta api.RemoteSectorMeta) error { + return sm.Miner.Receive(ctx, meta) +} + func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]minertypes.SubmitWindowedPoStParams, error) { var ts *types.TipSet var err error diff --git a/storage/pipeline/checks.go b/storage/pipeline/checks.go index 2192db5cf..ce0512c2b 100644 --- a/storage/pipeline/checks.go +++ b/storage/pipeline/checks.go @@ -41,7 +41,7 @@ type ErrCommitWaitFailed struct{ error } type ErrBadRU struct{ error } type ErrBadPR struct{ error } -func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error { +func checkPieces(ctx context.Context, maddr address.Address, sn abi.SectorNumber, pieces []Piece, api SealingAPI, mustHaveDeals bool) error { ts, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} @@ -49,13 +49,13 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api dealCount := 0 - for i, p := range si.Pieces { + for i, p := range pieces { // if no deal is associated with the piece, ensure that we added it as // filler (i.e. ensure that it has a zero PieceCID) if p.DealInfo == nil { exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded()) if !p.Piece.PieceCID.Equals(exp) { - return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", si.SectorNumber, i, p.Piece.PieceCID)} + return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sn, i, p.Piece.PieceCID)} } continue } @@ -68,24 +68,24 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api } if deal.Proposal.Provider != maddr { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, deal.Proposal.Provider, maddr)} + return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(pieces), sn, p.DealInfo.DealID, deal.Proposal.Provider, maddr)} } if deal.Proposal.PieceCID != p.Piece.PieceCID { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, deal.Proposal.PieceCID)} + return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(pieces), sn, p.DealInfo.DealID, p.Piece.PieceCID, deal.Proposal.PieceCID)} } if p.Piece.Size != deal.Proposal.PieceSize { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.Size, deal.Proposal.PieceSize)} + return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(pieces), sn, p.DealInfo.DealID, p.Piece.Size, deal.Proposal.PieceSize)} } if ts.Height() >= deal.Proposal.StartEpoch { - return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, deal.Proposal.StartEpoch, ts.Height())} + return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(pieces), sn, p.DealInfo.DealID, deal.Proposal.StartEpoch, ts.Height())} } } if mustHaveDeals && dealCount <= 0 { - return &ErrNoDeals{(xerrors.Errorf("sector %d must have deals, but does not", si.SectorNumber))} + return &ErrNoDeals{xerrors.Errorf("sector %d must have deals, but does not", sn)} } return nil @@ -95,7 +95,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api // // matches pieces, and that the seal ticket isn't expired func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, height abi.ChainEpoch, api SealingAPI) (err error) { - if err := checkPieces(ctx, maddr, si, api, false); err != nil { + if err := checkPieces(ctx, maddr, si.SectorNumber, si.Pieces, api, false); err != nil { return err } @@ -210,7 +210,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")} } - if err := checkPieces(ctx, m.maddr, si, m.Api, false); err != nil { + if err := checkPieces(ctx, m.maddr, si.SectorNumber, si.Pieces, m.Api, false); err != nil { return err } @@ -220,7 +220,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, // check that sector info is good after running a replica update func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, api SealingAPI) error { - if err := checkPieces(ctx, maddr, si, api, true); err != nil { + if err := checkPieces(ctx, maddr, si.SectorNumber, si.Pieces, api, true); err != nil { return err } if !si.CCUpdate { diff --git a/storage/pipeline/receive.go b/storage/pipeline/receive.go new file mode 100644 index 000000000..e32fd4f86 --- /dev/null +++ b/storage/pipeline/receive.go @@ -0,0 +1,78 @@ +package sealing + +import ( + "context" + + "github.com/ipfs/go-datastore" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api" +) + +func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error { + if err := m.checkSectorMeta(ctx, meta); err != nil { + return err + } + + panic("impl me") +} + +func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) error { + { + mid, err := address.IDFromAddress(m.maddr) + if err != nil { + panic(err) + } + + if meta.Sector.Miner != abi.ActorID(mid) { + return xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner) + } + } + + { + // initial sanity check, doesn't prevent races + _, err := m.GetSectorInfo(meta.Sector.Number) + if err != nil && !xerrors.Is(err, datastore.ErrNotFound) { + return err + } + if err == nil { + return xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number) + } + } + + { + spt, err := m.currentSealProof(ctx) + if err != nil { + return err + } + + if meta.Type != spt { + return xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt) + } + } + + switch SectorState(meta.State) { + case Packing: + //checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false) + + fallthrough + case GetTicket: + + fallthrough + case PreCommitting: + + fallthrough + case SubmitCommit: + + fallthrough + case Proving, Available: + + return nil + default: + return xerrors.Errorf("imported sector State in not supported") + } + +} diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 0261201f3..a78d6dee8 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -26,7 +26,7 @@ func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorAbortUpgrade{xerrors.New("sector had no deals")}) } - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, true); err != nil { // Sanity check state return handleErrors(ctx, err, sector) } out, err := m.sealer.ReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.pieceInfos()) @@ -66,7 +66,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect return ctx.Send(SectorProveReplicaUpdateFailed{xerrors.Errorf("prove replica update (1) failed: %w", err)}) } - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, true); err != nil { // Sanity check state return handleErrors(ctx, err, sector) } diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index f769341dd..f2d61fc62 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -207,7 +207,7 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e } func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, false); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)