diff --git a/api/api_storage.go b/api/api_storage.go index 1131f45a0..e1ef8ab6b 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -54,6 +54,13 @@ type StorageMiner interface { // Get the status of a given sector by ID SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error) //perm:read + // Add piece to an open sector. If no sectors with enough space are open, + // either a new sector will be created, or this call will block until more + // sectors can be created. + SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d PieceDealInfo) (SectorOffset, error) //perm:admin + + SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error //perm:admin + // List all staged sectors SectorsList(context.Context) ([]abi.SectorNumber, error) //perm:read @@ -124,8 +131,8 @@ type StorageMiner interface { StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]stores.StorageInfo, error) //perm:admin StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error //perm:admin StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) //perm:admin + StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) //perm:admin - StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) //perm:admin StorageLocal(ctx context.Context) (map[stores.ID]string, error) //perm:admin StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) //perm:admin @@ -289,3 +296,25 @@ type PendingDealInfo struct { PublishPeriodStart time.Time PublishPeriod time.Duration } + +type SectorOffset struct { + Sector abi.SectorNumber + Offset abi.PaddedPieceSize +} + +// DealInfo is a tuple of deal identity and its schedule +type PieceDealInfo struct { + PublishCid *cid.Cid + DealID abi.DealID + DealProposal *market.DealProposal + DealSchedule DealSchedule + KeepUnsealed bool +} + +// DealSchedule communicates the time interval of a storage deal. The deal must +// appear in a sealed (proven) sector no later than StartEpoch, otherwise it +// is invalid. +type DealSchedule struct { + StartEpoch abi.ChainEpoch + EndEpoch abi.ChainEpoch +} diff --git a/api/cbor_gen.go b/api/cbor_gen.go index 808e516ad..4434b45ed 100644 --- a/api/cbor_gen.go +++ b/api/cbor_gen.go @@ -8,6 +8,7 @@ import ( "sort" abi "github.com/filecoin-project/go-state-types/abi" + market "github.com/filecoin-project/specs-actors/actors/builtin/market" paych "github.com/filecoin-project/specs-actors/actors/builtin/paych" cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" @@ -738,3 +739,381 @@ func (t *SealSeed) UnmarshalCBOR(r io.Reader) error { return nil } +func (t *PieceDealInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{165}); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.PublishCid (cid.Cid) (struct) + if len("PublishCid") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PublishCid\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PublishCid"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PublishCid")); err != nil { + return err + } + + if t.PublishCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCidBuf(scratch, w, *t.PublishCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err) + } + } + + // t.DealID (abi.DealID) (uint64) + if len("DealID") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"DealID\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealID"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("DealID")); err != nil { + return err + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.DealID)); err != nil { + return err + } + + // t.DealProposal (market.DealProposal) (struct) + if len("DealProposal") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"DealProposal\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealProposal"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("DealProposal")); err != nil { + return err + } + + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + + // t.DealSchedule (api.DealSchedule) (struct) + if len("DealSchedule") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"DealSchedule\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealSchedule"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("DealSchedule")); err != nil { + return err + } + + if err := t.DealSchedule.MarshalCBOR(w); err != nil { + return err + } + + // t.KeepUnsealed (bool) (bool) + if len("KeepUnsealed") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"KeepUnsealed\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("KeepUnsealed"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("KeepUnsealed")); err != nil { + return err + } + + if err := cbg.WriteBool(w, t.KeepUnsealed); err != nil { + return err + } + return nil +} + +func (t *PieceDealInfo) UnmarshalCBOR(r io.Reader) error { + *t = PieceDealInfo{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("PieceDealInfo: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.PublishCid (cid.Cid) (struct) + case "PublishCid": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err) + } + + t.PublishCid = &c + } + + } + // t.DealID (abi.DealID) (uint64) + case "DealID": + + { + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = abi.DealID(extra) + + } + // t.DealProposal (market.DealProposal) (struct) + case "DealProposal": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + t.DealProposal = new(market.DealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) + } + } + + } + // t.DealSchedule (api.DealSchedule) (struct) + case "DealSchedule": + + { + + if err := t.DealSchedule.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.DealSchedule: %w", err) + } + + } + // t.KeepUnsealed (bool) (bool) + case "KeepUnsealed": + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.KeepUnsealed = false + case 21: + t.KeepUnsealed = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + + default: + // Field doesn't exist on this type, so ignore it + cbg.ScanForLinks(r, func(cid.Cid) {}) + } + } + + return nil +} +func (t *DealSchedule) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{162}); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.StartEpoch (abi.ChainEpoch) (int64) + if len("StartEpoch") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"StartEpoch\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("StartEpoch"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("StartEpoch")); err != nil { + return err + } + + if t.StartEpoch >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.StartEpoch)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.StartEpoch-1)); err != nil { + return err + } + } + + // t.EndEpoch (abi.ChainEpoch) (int64) + if len("EndEpoch") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"EndEpoch\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("EndEpoch"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("EndEpoch")); err != nil { + return err + } + + if t.EndEpoch >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.EndEpoch)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.EndEpoch-1)); err != nil { + return err + } + } + return nil +} + +func (t *DealSchedule) UnmarshalCBOR(r io.Reader) error { + *t = DealSchedule{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("DealSchedule: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.StartEpoch (abi.ChainEpoch) (int64) + case "StartEpoch": + { + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.StartEpoch = abi.ChainEpoch(extraI) + } + // t.EndEpoch (abi.ChainEpoch) (int64) + case "EndEpoch": + { + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.EndEpoch = abi.ChainEpoch(extraI) + } + + default: + // Field doesn't exist on this type, so ignore it + cbg.ScanForLinks(r, func(cid.Cid) {}) + } + } + + return nil +} diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 2d4d41503..eb96851a9 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -657,6 +657,8 @@ type StorageMinerStruct struct { SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` + SectorAddPieceToAny func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) `perm:"admin"` + SectorGetExpectedSealDuration func(p0 context.Context) (time.Duration, error) `perm:"read"` SectorGetSealDelay func(p0 context.Context) (time.Duration, error) `perm:"read"` @@ -687,6 +689,8 @@ type StorageMinerStruct struct { SectorsSummary func(p0 context.Context) (map[SectorState]int, error) `perm:"read"` + SectorsUnsealPiece func(p0 context.Context, p1 storage.SectorRef, p2 storiface.UnpaddedByteIndex, p3 abi.UnpaddedPieceSize, p4 abi.SealRandomness, p5 *cid.Cid) error `perm:"admin"` + SectorsUpdate func(p0 context.Context, p1 abi.SectorNumber, p2 SectorState) error `perm:"admin"` StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"` @@ -3107,6 +3111,14 @@ func (s *StorageMinerStub) SealingSchedDiag(p0 context.Context, p1 bool) (interf return nil, xerrors.New("method not supported") } +func (s *StorageMinerStruct) SectorAddPieceToAny(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) { + return s.Internal.SectorAddPieceToAny(p0, p1, p2, p3) +} + +func (s *StorageMinerStub) SectorAddPieceToAny(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) { + return *new(SectorOffset), xerrors.New("method not supported") +} + func (s *StorageMinerStruct) SectorGetExpectedSealDuration(p0 context.Context) (time.Duration, error) { return s.Internal.SectorGetExpectedSealDuration(p0) } @@ -3227,6 +3239,14 @@ func (s *StorageMinerStub) SectorsSummary(p0 context.Context) (map[SectorState]i return *new(map[SectorState]int), xerrors.New("method not supported") } +func (s *StorageMinerStruct) SectorsUnsealPiece(p0 context.Context, p1 storage.SectorRef, p2 storiface.UnpaddedByteIndex, p3 abi.UnpaddedPieceSize, p4 abi.SealRandomness, p5 *cid.Cid) error { + return s.Internal.SectorsUnsealPiece(p0, p1, p2, p3, p4, p5) +} + +func (s *StorageMinerStub) SectorsUnsealPiece(p0 context.Context, p1 storage.SectorRef, p2 storiface.UnpaddedByteIndex, p3 abi.UnpaddedPieceSize, p4 abi.SealRandomness, p5 *cid.Cid) error { + return xerrors.New("method not supported") +} + func (s *StorageMinerStruct) SectorsUpdate(p0 context.Context, p1 abi.SectorNumber, p2 SectorState) error { return s.Internal.SectorsUpdate(p0, p1, p2) } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index a02520116..76132a7a2 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -120,7 +120,7 @@ var initCmd = &cli.Command{ }, }, Subcommands: []*cli.Command{ - initRestoreCmd, + restoreCmd, }, Action: func(cctx *cli.Context) error { log.Info("Initializing lotus miner") @@ -316,10 +316,10 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string Size: abi.PaddedPieceSize(meta.SectorSize), PieceCID: commD, }, - DealInfo: &sealing.DealInfo{ + DealInfo: &lapi.PieceDealInfo{ DealID: dealID, DealProposal: §or.Deal, - DealSchedule: sealing.DealSchedule{ + DealSchedule: lapi.DealSchedule{ StartEpoch: sector.Deal.StartEpoch, EndEpoch: sector.Deal.EndEpoch, }, diff --git a/cmd/lotus-storage-miner/init_restore.go b/cmd/lotus-storage-miner/init_restore.go index eec7b8413..c609fd2a4 100644 --- a/cmd/lotus-storage-miner/init_restore.go +++ b/cmd/lotus-storage-miner/init_restore.go @@ -30,7 +30,7 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) -var initRestoreCmd = &cli.Command{ +var restoreCmd = &cli.Command{ Name: "restore", Usage: "Initialize a lotus miner repo from a backup", Flags: []cli.Flag{ diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index d3fef8533..70d55ee2e 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -336,6 +336,49 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage. return nil } +func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil { + return xerrors.Errorf("acquiring unseal sector lock: %w", err) + } + + unsealFetch := func(ctx context.Context, worker Worker) error { + if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { + return xerrors.Errorf("copy sealed/cache sector data: %w", err) + } + + return nil + } + + if unsealed == nil { + return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size) + } + + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return xerrors.Errorf("getting sector size: %w", err) + } + + selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, false) + + err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { + // TODO: make restartable + + // NOTE: we're unsealing the whole sector here as with SDR we can't really + // unseal the sector partially. Requesting the whole sector here can + // save us some work in case another piece is requested from here + _, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, *unsealed)) + return err + }) + if err != nil { + return err + } + + return nil +} + func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error { log.Warnf("stub NewSector") return nil diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 9e12b8649..b71c2863c 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -8,7 +8,7 @@ import ( "sort" abi "github.com/filecoin-project/go-state-types/abi" - market "github.com/filecoin-project/specs-actors/actors/builtin/market" + api "github.com/filecoin-project/lotus/api" miner "github.com/filecoin-project/specs-actors/actors/builtin/miner" cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" @@ -46,7 +46,7 @@ func (t *Piece) MarshalCBOR(w io.Writer) error { return err } - // t.DealInfo (sealing.DealInfo) (struct) + // t.DealInfo (api.PieceDealInfo) (struct) if len("DealInfo") > cbg.MaxLength { return xerrors.Errorf("Value in field \"DealInfo\" was too long") } @@ -107,7 +107,7 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error { } } - // t.DealInfo (sealing.DealInfo) (struct) + // t.DealInfo (api.PieceDealInfo) (struct) case "DealInfo": { @@ -120,7 +120,7 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error { if err := br.UnreadByte(); err != nil { return err } - t.DealInfo = new(DealInfo) + t.DealInfo = new(api.PieceDealInfo) if err := t.DealInfo.UnmarshalCBOR(br); err != nil { return xerrors.Errorf("unmarshaling t.DealInfo pointer: %w", err) } @@ -136,384 +136,6 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *DealInfo) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{165}); err != nil { - return err - } - - scratch := make([]byte, 9) - - // t.PublishCid (cid.Cid) (struct) - if len("PublishCid") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"PublishCid\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PublishCid"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("PublishCid")); err != nil { - return err - } - - if t.PublishCid == nil { - if _, err := w.Write(cbg.CborNull); err != nil { - return err - } - } else { - if err := cbg.WriteCidBuf(scratch, w, *t.PublishCid); err != nil { - return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err) - } - } - - // t.DealID (abi.DealID) (uint64) - if len("DealID") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"DealID\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealID"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("DealID")); err != nil { - return err - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.DealID)); err != nil { - return err - } - - // t.DealProposal (market.DealProposal) (struct) - if len("DealProposal") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"DealProposal\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealProposal"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("DealProposal")); err != nil { - return err - } - - if err := t.DealProposal.MarshalCBOR(w); err != nil { - return err - } - - // t.DealSchedule (sealing.DealSchedule) (struct) - if len("DealSchedule") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"DealSchedule\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("DealSchedule"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("DealSchedule")); err != nil { - return err - } - - if err := t.DealSchedule.MarshalCBOR(w); err != nil { - return err - } - - // t.KeepUnsealed (bool) (bool) - if len("KeepUnsealed") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"KeepUnsealed\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("KeepUnsealed"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("KeepUnsealed")); err != nil { - return err - } - - if err := cbg.WriteBool(w, t.KeepUnsealed); err != nil { - return err - } - return nil -} - -func (t *DealInfo) UnmarshalCBOR(r io.Reader) error { - *t = DealInfo{} - - br := cbg.GetPeeker(r) - scratch := make([]byte, 8) - - maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("DealInfo: map struct too large (%d)", extra) - } - - var name string - n := extra - - for i := uint64(0); i < n; i++ { - - { - sval, err := cbg.ReadStringBuf(br, scratch) - if err != nil { - return err - } - - name = string(sval) - } - - switch name { - // t.PublishCid (cid.Cid) (struct) - case "PublishCid": - - { - - b, err := br.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := br.UnreadByte(); err != nil { - return err - } - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err) - } - - t.PublishCid = &c - } - - } - // t.DealID (abi.DealID) (uint64) - case "DealID": - - { - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.DealID = abi.DealID(extra) - - } - // t.DealProposal (market.DealProposal) (struct) - case "DealProposal": - - { - - b, err := br.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := br.UnreadByte(); err != nil { - return err - } - t.DealProposal = new(market.DealProposal) - if err := t.DealProposal.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) - } - } - - } - // t.DealSchedule (sealing.DealSchedule) (struct) - case "DealSchedule": - - { - - if err := t.DealSchedule.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.DealSchedule: %w", err) - } - - } - // t.KeepUnsealed (bool) (bool) - case "KeepUnsealed": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajOther { - return fmt.Errorf("booleans must be major type 7") - } - switch extra { - case 20: - t.KeepUnsealed = false - case 21: - t.KeepUnsealed = true - default: - return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) - } - - default: - // Field doesn't exist on this type, so ignore it - cbg.ScanForLinks(r, func(cid.Cid) {}) - } - } - - return nil -} -func (t *DealSchedule) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{162}); err != nil { - return err - } - - scratch := make([]byte, 9) - - // t.StartEpoch (abi.ChainEpoch) (int64) - if len("StartEpoch") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"StartEpoch\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("StartEpoch"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("StartEpoch")); err != nil { - return err - } - - if t.StartEpoch >= 0 { - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.StartEpoch)); err != nil { - return err - } - } else { - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.StartEpoch-1)); err != nil { - return err - } - } - - // t.EndEpoch (abi.ChainEpoch) (int64) - if len("EndEpoch") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"EndEpoch\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("EndEpoch"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("EndEpoch")); err != nil { - return err - } - - if t.EndEpoch >= 0 { - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.EndEpoch)); err != nil { - return err - } - } else { - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.EndEpoch-1)); err != nil { - return err - } - } - return nil -} - -func (t *DealSchedule) UnmarshalCBOR(r io.Reader) error { - *t = DealSchedule{} - - br := cbg.GetPeeker(r) - scratch := make([]byte, 8) - - maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("DealSchedule: map struct too large (%d)", extra) - } - - var name string - n := extra - - for i := uint64(0); i < n; i++ { - - { - sval, err := cbg.ReadStringBuf(br, scratch) - if err != nil { - return err - } - - name = string(sval) - } - - switch name { - // t.StartEpoch (abi.ChainEpoch) (int64) - case "StartEpoch": - { - maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) - var extraI int64 - if err != nil { - return err - } - switch maj { - case cbg.MajUnsignedInt: - extraI = int64(extra) - if extraI < 0 { - return fmt.Errorf("int64 positive overflow") - } - case cbg.MajNegativeInt: - extraI = int64(extra) - if extraI < 0 { - return fmt.Errorf("int64 negative oveflow") - } - extraI = -1 - extraI - default: - return fmt.Errorf("wrong type for int64 field: %d", maj) - } - - t.StartEpoch = abi.ChainEpoch(extraI) - } - // t.EndEpoch (abi.ChainEpoch) (int64) - case "EndEpoch": - { - maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) - var extraI int64 - if err != nil { - return err - } - switch maj { - case cbg.MajUnsignedInt: - extraI = int64(extra) - if extraI < 0 { - return fmt.Errorf("int64 positive overflow") - } - case cbg.MajNegativeInt: - extraI = int64(extra) - if extraI < 0 { - return fmt.Errorf("int64 negative oveflow") - } - extraI = -1 - extraI - default: - return fmt.Errorf("wrong type for int64 field: %d", maj) - } - - t.EndEpoch = abi.ChainEpoch(extraI) - } - - default: - // Field doesn't exist on this type, so ignore it - cbg.ScanForLinks(r, func(cid.Cid) {}) - } - } - - return nil -} func (t *SectorInfo) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 44d2e8275..8f80b257f 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/api" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" @@ -224,34 +225,34 @@ func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorIn return nil } -func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { +func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid) if (padreader.PaddedSize(uint64(size))) != size { - return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") + return api.SectorOffset{}, xerrors.Errorf("cannot allocate unpadded piece") } sp, err := m.currentSealProof(ctx) if err != nil { - return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err) + return api.SectorOffset{}, xerrors.Errorf("getting current seal proof type: %w", err) } ssize, err := sp.SectorSize() if err != nil { - return 0, 0, err + return api.SectorOffset{}, err } if size > abi.PaddedPieceSize(ssize).Unpadded() { - return 0, 0, xerrors.Errorf("piece cannot fit into a sector") + return api.SectorOffset{}, xerrors.Errorf("piece cannot fit into a sector") } if _, err := deal.DealProposal.Cid(); err != nil { - return 0, 0, xerrors.Errorf("getting proposal CID: %w", err) + return api.SectorOffset{}, xerrors.Errorf("getting proposal CID: %w", err) } m.inputLk.Lock() if _, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() - return 0, 0, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) + return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) } resCh := make(chan struct { @@ -283,7 +284,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec res := <-resCh - return res.sn, res.offset.Padded(), res.err + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err } // called with m.inputLk @@ -425,7 +426,7 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorStartPacking{}) } -func proposalCID(deal DealInfo) cid.Cid { +func proposalCID(deal api.PieceDealInfo) cid.Cid { pc, err := deal.DealProposal.Cid() if err != nil { log.Errorf("DealProposal.Cid error: %+v", err) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 8feca3b7b..8556a4902 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -122,7 +122,7 @@ type openSector struct { type pendingPiece struct { size abi.UnpaddedPieceSize - deal DealInfo + deal api.PieceDealInfo data storage.Data diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 58c35cf36..c5aed505a 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -11,39 +11,22 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" - "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" ) // Piece is a tuple of piece and deal info type PieceWithDealInfo struct { Piece abi.PieceInfo - DealInfo DealInfo + DealInfo api.PieceDealInfo } // Piece is a tuple of piece info and optional deal type Piece struct { Piece abi.PieceInfo - DealInfo *DealInfo // nil for pieces which do not appear in deals (e.g. filler pieces) -} - -// DealInfo is a tuple of deal identity and its schedule -type DealInfo struct { - PublishCid *cid.Cid - DealID abi.DealID - DealProposal *market.DealProposal - DealSchedule DealSchedule - KeepUnsealed bool -} - -// DealSchedule communicates the time interval of a storage deal. The deal must -// appear in a sealed (proven) sector no later than StartEpoch, otherwise it -// is invalid. -type DealSchedule struct { - StartEpoch abi.ChainEpoch - EndEpoch abi.ChainEpoch + DealInfo *api.PieceDealInfo // nil for pieces which do not appear in deals (e.g. filler pieces) } type Log struct { diff --git a/gen/main.go b/gen/main.go index 9548344fd..0018b241d 100644 --- a/gen/main.go +++ b/gen/main.go @@ -53,6 +53,8 @@ func main() { api.SealedRefs{}, api.SealTicket{}, api.SealSeed{}, + api.PieceDealInfo{}, + api.DealSchedule{}, ) if err != nil { fmt.Println(err) diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index fbeaf3b3d..b899c0810 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -95,11 +95,11 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema return nil, xerrors.Errorf("deal.PublishCid can't be nil") } - sdInfo := sealing.DealInfo{ + sdInfo := api.PieceDealInfo{ DealID: deal.DealID, DealProposal: &deal.Proposal, PublishCid: deal.PublishCid, - DealSchedule: sealing.DealSchedule{ + DealSchedule: api.DealSchedule{ StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch, EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch, }, @@ -240,19 +240,19 @@ func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context // TODO: better strategy (e.g. look for already unsealed) var best api.SealedRef - var bestSi sealing.SectorInfo + var bestSi api.SectorInfo for _, r := range refs { - si, err := n.secb.Miner.GetSectorInfo(r.SectorID) + si, err := n.secb.SectorBuilder.SectorsStatus(ctx, r.SectorID, false) if err != nil { return 0, 0, 0, xerrors.Errorf("getting sector info: %w", err) } - if si.State == sealing.Proving { + if si.State == api.SectorState(sealing.Proving) { best = r bestSi = si break } } - if bestSi.State == sealing.UndefinedSectorState { + if bestSi.State == api.SectorState(sealing.UndefinedSectorState) { return 0, 0, 0, xerrors.New("no sealed sector found") } return best.SectorID, best.Offset, best.Size.Padded(), nil diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 27ab1af5f..bf1c44fc2 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -237,6 +237,14 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb return sInfo, nil } +func (sm *StorageMinerAPI) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r sto.Data, d api.PieceDealInfo) (api.SectorOffset, error) { + return sm.Miner.SectorAddPieceToAny(ctx, size, r, d) +} + +func (sm *StorageMinerAPI) SectorsUnsealPiece(ctx context.Context, sector sto.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error { + return sm.StorageMgr.SectorsUnsealPiece(ctx, sector, offset, size, randomness, commd) +} + // List all staged sectors func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, error) { sectors, err := sm.Miner.ListSectors() diff --git a/storage/sealing.go b/storage/sealing.go index 8981c3738..fdf8f25fa 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -2,14 +2,16 @@ package storage import ( "context" - "io" "github.com/ipfs/go-cid" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/api" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" ) @@ -19,10 +21,6 @@ func (m *Miner) Address() address.Address { return m.sealing.Address() } -func (m *Miner) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d sealing.DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - return m.sealing.AddPieceToAnySector(ctx, size, r, d) -} - func (m *Miner) StartPackingSector(sectorNum abi.SectorNumber) error { return m.sealing.StartPacking(sectorNum) } @@ -66,3 +64,71 @@ func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error { func (m *Miner) IsMarkedForUpgrade(id abi.SectorNumber) bool { return m.sealing.IsMarkedForUpgrade(id) } + +func (m *Miner) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error) { + return m.sealing.SectorAddPieceToAny(ctx, size, r, d) +} + +func (m *Miner) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) { + if showOnChainInfo { + return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported") + } + + info, err := m.sealing.GetSectorInfo(sid) + if err != nil { + return api.SectorInfo{}, err + } + + deals := make([]abi.DealID, len(info.Pieces)) + for i, piece := range info.Pieces { + if piece.DealInfo == nil { + continue + } + deals[i] = piece.DealInfo.DealID + } + + log := make([]api.SectorLog, len(info.Log)) + for i, l := range info.Log { + log[i] = api.SectorLog{ + Kind: l.Kind, + Timestamp: l.Timestamp, + Trace: l.Trace, + Message: l.Message, + } + } + + sInfo := api.SectorInfo{ + SectorID: sid, + State: api.SectorState(info.State), + CommD: info.CommD, + CommR: info.CommR, + Proof: info.Proof, + Deals: deals, + Ticket: api.SealTicket{ + Value: info.TicketValue, + Epoch: info.TicketEpoch, + }, + Seed: api.SealSeed{ + Value: info.SeedValue, + Epoch: info.SeedEpoch, + }, + PreCommitMsg: info.PreCommitMessage, + CommitMsg: info.CommitMessage, + Retries: info.InvalidProofs, + ToUpgrade: m.IsMarkedForUpgrade(sid), + + LastErr: info.LastErr, + Log: log, + // on chain info + SealProof: 0, + Activation: 0, + Expiration: 0, + DealWeight: big.Zero(), + VerifiedDealWeight: big.Zero(), + InitialPledge: big.Zero(), + OnTime: 0, + Early: 0, + } + + return sInfo, nil +} diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index bc8456a1f..ccf2c67d2 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -16,11 +16,10 @@ import ( cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" - sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/storage" ) type SealSerialization uint8 @@ -48,17 +47,22 @@ func DsKeyToDealID(key datastore.Key) (uint64, error) { return dealID, nil } +type SectorBuilder interface { // todo: apify, make work remote + SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error) + SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) +} + type SectorBlocks struct { - *storage.Miner + SectorBuilder keys datastore.Batching keyLk sync.Mutex } -func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks { +func NewSectorBlocks(sb SectorBuilder, ds dtypes.MetadataDS) *SectorBlocks { sbc := &SectorBlocks{ - Miner: miner, - keys: namespace.Wrap(ds, dsPrefix), + SectorBuilder: sb, + keys: namespace.Wrap(ds, dsPrefix), } return sbc @@ -96,19 +100,19 @@ func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, o return st.keys.Put(DealIDToDsKey(dealID), newRef) // TODO: batch somehow } -func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d sealing.DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - sn, offset, err := st.Miner.AddPieceToAnySector(ctx, size, r, d) +func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + so, err := st.SectorBuilder.SectorAddPieceToAny(ctx, size, r, d) if err != nil { return 0, 0, err } // TODO: DealID has very low finality here - err = st.writeRef(d.DealID, sn, offset, size) + err = st.writeRef(d.DealID, so.Sector, so.Offset, size) if err != nil { return 0, 0, xerrors.Errorf("writeRef: %w", err) } - return sn, offset, nil + return so.Sector, so.Offset, nil } func (st *SectorBlocks) List() (map[uint64][]api.SealedRef, error) {