sector import: Initial api scaffolding

This commit is contained in:
Łukasz Magiera 2022-08-24 15:27:27 -04:00
parent 4abc38dacc
commit 29135aa77c
11 changed files with 327 additions and 14 deletions

View File

@ -3,6 +3,7 @@ package api
import (
"bytes"
"context"
"net/http"
"time"
"github.com/google/uuid"
@ -17,6 +18,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v8/market"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
abinetwork "github.com/filecoin-project/go-state-types/network"
@ -144,6 +146,8 @@ type StorageMiner interface {
// SectorNumFree drops a sector reservation
SectorNumFree(ctx context.Context, name string) error //perm:admin
SectorReceive(ctx context.Context, meta RemoteSectorMeta) error
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error //perm:admin retry:true
WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) //perm:admin
@ -504,3 +508,87 @@ type NumAssignerMeta struct {
Next abi.SectorNumber
}
type RemoteSectorMeta struct {
////////
// BASIC SECTOR INFORMATION
// State specifies the first state the sector will enter after being imported
// Must be one of the following states:
// * Packing
// * GetTicket
// * PreCommitting
// * SubmitCommit
// * Proving/Available
State SectorState
Sector abi.SectorID
Type abi.RegisteredSealProof
////////
// SEALING METADATA
// (allows lotus to continue the sealing process)
// Required in Packing and later
Pieces []SectorPiece // todo better type?
// Required in PreCommitting and later
TicketValue abi.SealRandomness
TicketEpoch abi.ChainEpoch
PreCommit1Out storiface.PreCommit1Out // todo specify better
CommD *cid.Cid
CommR *cid.Cid // SectorKey
// Required in SubmitCommit and later
PreCommitInfo *miner.SectorPreCommitInfo
PreCommitDeposit big.Int
PreCommitMessage *cid.Cid
PreCommitTipSet types.TipSetKey
SeedValue abi.InteractiveSealRandomness
SeedEpoch abi.ChainEpoch
CommitProof []byte
// Required in Proving/Available
CommitMessage *cid.Cid
// Optional sector metadata to import
Log []SectorLog
////////
// SECTOR DATA SOURCE
// Sector urls - lotus will use those for fetching files into local storage
// Required in all states
DataUnsealed *SectorData
// Required in PreCommitting and later
DataSealed *SectorData
DataCache *SectorData
////////
// SEALING SERVICE HOOKS
// todo Commit1Provider
// todo OnDone / OnStateChange
}
type SectorData struct {
// Local when set to true indicates to lotus that sector data is already
// available locally; When set lotus will skip fetching sector data, and
// only check that sector data exists in sector storage
Local bool
// URL to the sector data
// For sealed/unsealed sector, lotus expects octet-stream
// For cache, lotus expects a tar archive with cache files (todo maybe use not-tar; specify what files with what paths must be present)
// Valid schemas:
// - http:// / https://
URL string
// optional http headers to use when requesting sector data
Headers http.Header
}

View File

@ -6,6 +6,7 @@ import (
"go/ast"
"go/parser"
"go/token"
"net/http"
"path/filepath"
"reflect"
"strings"
@ -340,6 +341,9 @@ func init() {
"": bitfield.NewFromSet([]uint64{5, 6, 7, 10}),
})
addExample(http.Header{
"Authorization": []string{"Bearer ey.."},
})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -846,6 +846,8 @@ type StorageMinerStruct struct {
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorReceive func(p0 context.Context, p1 RemoteSectorMeta) error ``
SectorRemove func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
SectorSetExpectedSealDuration func(p0 context.Context, p1 time.Duration) error `perm:"write"`
@ -5039,6 +5041,17 @@ func (s *StorageMinerStub) SectorPreCommitPending(p0 context.Context) ([]abi.Sec
return *new([]abi.SectorID), ErrNotSupported
}
func (s *StorageMinerStruct) SectorReceive(p0 context.Context, p1 RemoteSectorMeta) error {
if s.Internal.SectorReceive == nil {
return ErrNotSupported
}
return s.Internal.SectorReceive(p0, p1)
}
func (s *StorageMinerStub) SectorReceive(p0 context.Context, p1 RemoteSectorMeta) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) SectorRemove(p0 context.Context, p1 abi.SectorNumber) error {
if s.Internal.SectorRemove == nil {
return ErrNotSupported

Binary file not shown.

Binary file not shown.

View File

@ -150,6 +150,7 @@
* [SectorNumReserveCount](#SectorNumReserveCount)
* [SectorPreCommitFlush](#SectorPreCommitFlush)
* [SectorPreCommitPending](#SectorPreCommitPending)
* [SectorReceive](#SectorReceive)
* [SectorRemove](#SectorRemove)
* [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration)
* [SectorSetSealDelay](#SectorSetSealDelay)
@ -3151,6 +3152,131 @@ Response:
]
```
### SectorReceive
There are not yet any comments for this method.
Perms:
Inputs:
```json
[
{
"State": "Proving",
"Sector": {
"Miner": 1000,
"Number": 9
},
"Type": 8,
"Pieces": [
{
"Piece": {
"Size": 1032,
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
},
"DealInfo": {
"PublishCid": null,
"DealID": 5432,
"DealProposal": {
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"PieceSize": 1032,
"VerifiedDeal": true,
"Client": "f01234",
"Provider": "f01234",
"Label": "",
"StartEpoch": 10101,
"EndEpoch": 10101,
"StoragePricePerEpoch": "0",
"ProviderCollateral": "0",
"ClientCollateral": "0"
},
"DealSchedule": {
"StartEpoch": 10101,
"EndEpoch": 10101
},
"KeepUnsealed": true
}
}
],
"TicketValue": "Bw==",
"TicketEpoch": 10101,
"PreCommit1Out": "Bw==",
"CommD": null,
"CommR": null,
"PreCommitInfo": {
"SealProof": 8,
"SectorNumber": 9,
"SealedCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"SealRandEpoch": 10101,
"DealIDs": [
5432
],
"Expiration": 10101,
"ReplaceCapacity": true,
"ReplaceSectorDeadline": 42,
"ReplaceSectorPartition": 42,
"ReplaceSectorNumber": 9
},
"PreCommitDeposit": "0",
"PreCommitMessage": null,
"PreCommitTipSet": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"SeedValue": "Bw==",
"SeedEpoch": 10101,
"CommitProof": "Ynl0ZSBhcnJheQ==",
"CommitMessage": null,
"Log": [
{
"Kind": "string value",
"Timestamp": 42,
"Trace": "string value",
"Message": "string value"
}
],
"DataUnsealed": {
"Local": true,
"URL": "string value",
"Headers": {
"Authorization": [
"Bearer ey.."
]
}
},
"DataSealed": {
"Local": true,
"URL": "string value",
"Headers": {
"Authorization": [
"Bearer ey.."
]
}
},
"DataCache": {
"Local": true,
"URL": "string value",
"Headers": {
"Authorization": [
"Bearer ey.."
]
}
}
}
]
```
Response: `{}`
### SectorRemove
SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.

View File

@ -448,6 +448,10 @@ func (sm *StorageMinerAPI) SectorNumFree(ctx context.Context, name string) error
return sm.Miner.NumFree(ctx, name)
}
func (sm *StorageMinerAPI) SectorReceive(ctx context.Context, meta api.RemoteSectorMeta) error {
return sm.Miner.Receive(ctx, meta)
}
func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]minertypes.SubmitWindowedPoStParams, error) {
var ts *types.TipSet
var err error

View File

@ -41,7 +41,7 @@ type ErrCommitWaitFailed struct{ error }
type ErrBadRU struct{ error }
type ErrBadPR struct{ error }
func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error {
func checkPieces(ctx context.Context, maddr address.Address, sn abi.SectorNumber, pieces []Piece, api SealingAPI, mustHaveDeals bool) error {
ts, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
@ -49,13 +49,13 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
dealCount := 0
for i, p := range si.Pieces {
for i, p := range pieces {
// if no deal is associated with the piece, ensure that we added it as
// filler (i.e. ensure that it has a zero PieceCID)
if p.DealInfo == nil {
exp := zerocomm.ZeroPieceCommitment(p.Piece.Size.Unpadded())
if !p.Piece.PieceCID.Equals(exp) {
return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", si.SectorNumber, i, p.Piece.PieceCID)}
return &ErrInvalidPiece{xerrors.Errorf("sector %d piece %d had non-zero PieceCID %+v", sn, i, p.Piece.PieceCID)}
}
continue
}
@ -68,24 +68,24 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
}
if deal.Proposal.Provider != maddr {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, deal.Proposal.Provider, maddr)}
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong provider: %s != %s", i, len(pieces), sn, p.DealInfo.DealID, deal.Proposal.Provider, maddr)}
}
if deal.Proposal.PieceCID != p.Piece.PieceCID {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, deal.Proposal.PieceCID)}
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(pieces), sn, p.DealInfo.DealID, p.Piece.PieceCID, deal.Proposal.PieceCID)}
}
if p.Piece.Size != deal.Proposal.PieceSize {
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.Size, deal.Proposal.PieceSize)}
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(pieces), sn, p.DealInfo.DealID, p.Piece.Size, deal.Proposal.PieceSize)}
}
if ts.Height() >= deal.Proposal.StartEpoch {
return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, deal.Proposal.StartEpoch, ts.Height())}
return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(pieces), sn, p.DealInfo.DealID, deal.Proposal.StartEpoch, ts.Height())}
}
}
if mustHaveDeals && dealCount <= 0 {
return &ErrNoDeals{(xerrors.Errorf("sector %d must have deals, but does not", si.SectorNumber))}
return &ErrNoDeals{xerrors.Errorf("sector %d must have deals, but does not", sn)}
}
return nil
@ -95,7 +95,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
//
// matches pieces, and that the seal ticket isn't expired
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, height abi.ChainEpoch, api SealingAPI) (err error) {
if err := checkPieces(ctx, maddr, si, api, false); err != nil {
if err := checkPieces(ctx, maddr, si.SectorNumber, si.Pieces, api, false); err != nil {
return err
}
@ -210,7 +210,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")}
}
if err := checkPieces(ctx, m.maddr, si, m.Api, false); err != nil {
if err := checkPieces(ctx, m.maddr, si.SectorNumber, si.Pieces, m.Api, false); err != nil {
return err
}
@ -220,7 +220,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
// check that sector info is good after running a replica update
func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, api SealingAPI) error {
if err := checkPieces(ctx, maddr, si, api, true); err != nil {
if err := checkPieces(ctx, maddr, si.SectorNumber, si.Pieces, api, true); err != nil {
return err
}
if !si.CCUpdate {

View File

@ -0,0 +1,78 @@
package sealing
import (
"context"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
)
func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error {
if err := m.checkSectorMeta(ctx, meta); err != nil {
return err
}
panic("impl me")
}
func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) error {
{
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
panic(err)
}
if meta.Sector.Miner != abi.ActorID(mid) {
return xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner)
}
}
{
// initial sanity check, doesn't prevent races
_, err := m.GetSectorInfo(meta.Sector.Number)
if err != nil && !xerrors.Is(err, datastore.ErrNotFound) {
return err
}
if err == nil {
return xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number)
}
}
{
spt, err := m.currentSealProof(ctx)
if err != nil {
return err
}
if meta.Type != spt {
return xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt)
}
}
switch SectorState(meta.State) {
case Packing:
//checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false)
fallthrough
case GetTicket:
fallthrough
case PreCommitting:
fallthrough
case SubmitCommit:
fallthrough
case Proving, Available:
return nil
default:
return xerrors.Errorf("imported sector State in not supported")
}
}

View File

@ -26,7 +26,7 @@ func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorAbortUpgrade{xerrors.New("sector had no deals")})
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}
out, err := m.sealer.ReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.pieceInfos())
@ -66,7 +66,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
return ctx.Send(SectorProveReplicaUpdateFailed{xerrors.Errorf("prove replica update (1) failed: %w", err)})
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector)
}

View File

@ -207,7 +207,7 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e
}
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, false); err != nil { // Sanity check state
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)