From 9b5968f1705a31c689d5b8ad41a85ebf18adb32e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 8 Feb 2020 03:18:32 +0100 Subject: [PATCH] Spec Actors integration --- cbor_gen.go | 14 +------------- checks.go | 16 +++++++++++----- fsm_events.go | 3 ++- garbage.go | 45 ++++++++++++++++++++++++--------------------- sealing.go | 21 +++++++++++---------- states.go | 18 +++++++++--------- types.go | 21 +++++++++++---------- utils.go | 19 ++++++++++--------- 8 files changed, 79 insertions(+), 78 deletions(-) diff --git a/cbor_gen.go b/cbor_gen.go index af1a18da2..21e60741f 100644 --- a/cbor_gen.go +++ b/cbor_gen.go @@ -218,9 +218,7 @@ func (t *SealSeed) UnmarshalCBOR(r io.Reader) error { } if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") - } - t.BlockHeight = uint64(extra) - // t.TicketBytes ([]uint8) (slice) + } // t.TicketBytes ([]uint8) (slice) case "TicketBytes": maj, extra, err = cbg.CborReadHeader(br) @@ -352,18 +350,9 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.DealID = uint64(extra) // t.Size (uint64) (uint64) case "Size": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Size = uint64(extra) // t.CommP ([]uint8) (slice) case "CommP": @@ -740,7 +729,6 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.SectorID = uint64(extra) // t.Nonce (uint64) (uint64) case "Nonce": diff --git a/checks.go b/checks.go index 9baf2d993..a94cd20da 100644 --- a/checks.go +++ b/checks.go @@ -3,6 +3,7 @@ package sealing import ( "context" + "github.com/multiformats/go-multihash" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -39,16 +40,21 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)} } - if string(deal.PieceRef) != string(piece.CommP) { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with wrong CommP: %x != %x", i, len(si.Pieces), si.SectorID, piece.DealID, piece.CommP, deal.PieceRef)} + h, err := multihash.Decode(deal.PieceCID.Hash()) + if err != nil { + return &ErrInvalidDeals{xerrors.Errorf("decoding piece CID: %w", err)} } - if piece.Size != deal.PieceSize { + if string(h.Digest) != string(piece.CommP) { + return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with wrong CommP: %x != %x", i, len(si.Pieces), si.SectorID, piece.DealID, piece.CommP, h.Digest)} + } + + if piece.Size != deal.PieceSize.Unpadded() { return &ErrInvalidDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorID, piece.DealID, piece.Size, deal.PieceSize)} } - if head.Height() >= deal.ProposalExpiration { - return &ErrExpiredDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers expired deal %d - expires %d, head %d", i, len(si.Pieces), si.SectorID, piece.DealID, deal.ProposalExpiration, head.Height())} + if head.Height() >= deal.StartEpoch { + return &ErrExpiredDeals{xerrors.Errorf("piece %d (or %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorID, piece.DealID, deal.StartEpoch, head.Height())} } } diff --git a/fsm_events.go b/fsm_events.go index 84b1120c8..528da1234 100644 --- a/fsm_events.go +++ b/fsm_events.go @@ -1,6 +1,7 @@ package sealing import ( + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/api" @@ -45,7 +46,7 @@ func (evt SectorForceState) applyGlobal(state *SectorInfo) bool { // Normal path type SectorStart struct { - id uint64 + id abi.SectorNumber pieces []Piece } diff --git a/garbage.go b/garbage.go index ca8c1e454..52f2eb9cb 100644 --- a/garbage.go +++ b/garbage.go @@ -8,20 +8,22 @@ import ( "math/bits" "math/rand" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) -func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { +func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize, parts uint64) io.Reader { parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - if size/parts < 127 { - parts = size / 127 + if uint64(size)/parts < 127 { + parts = uint64(size) / 127 } - piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) + piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded() readers := make([]io.Reader, parts) for i := range readers { @@ -31,33 +33,34 @@ func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { return io.MultiReader(readers...) } -func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { +func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) { if len(sizes) == 0 { return nil, nil } log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes) - deals := make([]actors.StorageDealProposal, len(sizes)) + deals := make([]market.ClientDealProposal, len(sizes)) for i, size := range sizes { commP, err := m.fastPledgeCommitment(size, uint64(1)) if err != nil { return nil, err } - sdp := actors.StorageDealProposal{ - PieceRef: commP[:], - PieceSize: size, + sdp := market.DealProposal{ + PieceCID: commcid.PieceCommitmentV1ToCID(commP[:]), + PieceSize: size.Padded(), Client: m.worker, Provider: m.maddr, - ProposalExpiration: math.MaxUint64, - Duration: math.MaxUint64 / 2, // /2 because overflows + StartEpoch: math.MaxInt64, + EndEpoch: math.MaxInt64, StoragePricePerEpoch: types.NewInt(0), - StorageCollateral: types.NewInt(0), - ProposerSignature: nil, // nil because self dealing + ProviderCollateral: types.NewInt(0), } - deals[i] = sdp + deals[i] = market.ClientDealProposal{ + Proposal: sdp, + } } log.Infof("Publishing deals for %d", sectorID) @@ -92,11 +95,11 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { return nil, err } - if len(resp.DealIDs) != len(sizes) { + if len(resp.IDs) != len(sizes) { return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") } - log.Infof("Deals for sector %d: %+v", sectorID, resp.DealIDs) + log.Infof("Deals for sector %d: %+v", sectorID, resp.IDs) out := make([]Piece, len(sizes)) for i, size := range sizes { @@ -108,8 +111,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie existingPieceSizes = append(existingPieceSizes, size) out[i] = Piece{ - DealID: resp.DealIDs[i], - Size: ppi.Size, + DealID: resp.IDs[i], + Size: abi.UnpaddedPieceSize(ppi.Size), CommP: ppi.CommP[:], } } @@ -123,7 +126,7 @@ func (m *Sealing) PledgeSector() error { // this, as we run everything here async, and it's cancelled when the // command exits - size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) + size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() sid, err := m.sb.AcquireSectorId() if err != nil { @@ -131,7 +134,7 @@ func (m *Sealing) PledgeSector() error { return } - pieces, err := m.pledgeSector(ctx, sid, []uint64{}, size) + pieces, err := m.pledgeSector(ctx, sid, []abi.UnpaddedPieceSize{}, abi.UnpaddedPieceSize(size)) if err != nil { log.Errorf("%+v", err) return diff --git a/sealing.go b/sealing.go index 6e562001d..a9e080328 100644 --- a/sealing.go +++ b/sealing.go @@ -6,6 +6,8 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -13,7 +15,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -31,21 +32,21 @@ type sealingApi interface { // TODO: trim down // Call a read only method on actors (no interaction with the chain required) StateCall(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) - StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (abi.ChainEpoch, error) StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) - StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (abi.SectorSize, error) StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) - StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateMarketStorageDeal(context.Context, abi.DealID, *types.TipSet) (*market.DealProposal, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) ChainHead(context.Context) (*types.TipSet, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error) - ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, *types.TipSet) (*types.TipSet, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) @@ -95,7 +96,7 @@ func (m *Sealing) Stop(ctx context.Context) error { return m.sectors.Stop(ctx) } -func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { +func (m *Sealing) AllocatePiece(size uint64) (sectorID abi.SectorNumber, offset uint64, err error) { if padreader.PaddedSize(size) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } @@ -109,10 +110,10 @@ func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, er return sid, 0, nil } -func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { +func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error { log.Infof("Seal piece for deal %d", dealID) - ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []uint64{}) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{}) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } @@ -120,7 +121,7 @@ func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, secto return m.newSector(ctx, sectorID, dealID, ppi) } -func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { +func (m *Sealing) newSector(ctx context.Context, sid abi.SectorNumber, dealID abi.DealID, ppi sectorbuilder.PublicPieceInfo) error { log.Infof("Start sealing %d", sid) return m.sectors.Send(sid, SectorStart{ id: sid, @@ -128,7 +129,7 @@ func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi { DealID: dealID, - Size: ppi.Size, + Size: abi.UnpaddedPieceSize(ppi.Size), CommP: ppi.CommP[:], }, }, diff --git a/states.go b/states.go index 723a2d843..7288770cc 100644 --- a/states.go +++ b/states.go @@ -3,8 +3,8 @@ package sealing import ( "context" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder/fs" + "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" @@ -16,12 +16,12 @@ import ( func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) - var allocated uint64 + var allocated abi.UnpaddedPieceSize for _, piece := range sector.Pieces { allocated += piece.Size } - ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) + ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() if allocated > ubytes { return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) @@ -100,7 +100,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf CommR: sector.CommR, SealEpoch: sector.Ticket.BlockHeight, - DealIDs: sector.deals(), + DealIDs: nil, // sector.deals(), // TODO: REFACTOR } enc, aerr := actors.SerializeParams(params) if aerr != nil { @@ -144,7 +144,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight) - err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH uint64) error { + err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), int64(randHeight)) if err != nil { err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) @@ -181,13 +181,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) // TODO: Consider splitting states and persist proof for faster recovery - params := &actors.SectorProveCommitInfo{ + /*params := &actors.SectorProveCommitInfo{ Proof: proof, SectorID: sector.SectorID, DealIDs: sector.deals(), - } + }*/ - enc, aerr := actors.SerializeParams(params) + enc, aerr := actors.SerializeParams(nil) // TODO: REFACTOR: Fix if aerr != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) } @@ -255,7 +255,7 @@ func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) erro // TODO: coalesce faulty sector reporting bf := types.NewBitField() - bf.Set(sector.SectorID) + bf.Set(uint64(sector.SectorID)) enc, aerr := actors.SerializeParams(&actors.DeclareFaultsParams{bf}) if aerr != nil { diff --git a/types.go b/types.go index f0fbe09a4..ebec1ee1c 100644 --- a/types.go +++ b/types.go @@ -3,6 +3,7 @@ package sealing import ( sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-cid" ) @@ -18,12 +19,12 @@ func (t *SealTicket) SB() sectorbuilder.SealTicket { } type SealSeed struct { - BlockHeight uint64 + BlockHeight abi.ChainEpoch TicketBytes []byte } func (t *SealSeed) SB() sectorbuilder.SealSeed { - out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight} + out := sectorbuilder.SealSeed{BlockHeight: uint64(t.BlockHeight)} copy(out.TicketBytes[:], t.TicketBytes) return out } @@ -33,14 +34,14 @@ func (t *SealSeed) Equals(o *SealSeed) bool { } type Piece struct { - DealID uint64 + DealID abi.DealID - Size uint64 + Size abi.UnpaddedPieceSize CommP []byte } func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { - out.Size = p.Size + out.Size = uint64(p.Size) copy(out.CommP[:], p.CommP) return out } @@ -57,7 +58,7 @@ type Log struct { type SectorInfo struct { State api.SectorState - SectorID uint64 + SectorID abi.SectorNumber Nonce uint64 // TODO: remove // Packing @@ -95,16 +96,16 @@ func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { return out } -func (t *SectorInfo) deals() []uint64 { - out := make([]uint64, len(t.Pieces)) +func (t *SectorInfo) deals() []abi.DealID { + out := make([]abi.DealID, len(t.Pieces)) for i, piece := range t.Pieces { out[i] = piece.DealID } return out } -func (t *SectorInfo) existingPieces() []uint64 { - out := make([]uint64, len(t.Pieces)) +func (t *SectorInfo) existingPieces() []abi.UnpaddedPieceSize { + out := make([]abi.UnpaddedPieceSize, len(t.Pieces)) for i, piece := range t.Pieces { out[i] = piece.Size } diff --git a/utils.go b/utils.go index b2221b278..d03393405 100644 --- a/utils.go +++ b/utils.go @@ -6,12 +6,13 @@ import ( "math/rand" "sync" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/hashicorp/go-multierror" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" ) -func fillersFromRem(toFill uint64) ([]uint64, error) { +func fillersFromRem(in abi.UnpaddedPieceSize) ([]abi.UnpaddedPieceSize, error) { // Convert to in-sector bytes for easier math: // // Sector size to user bytes ratio is constant, e.g. for 1024B we have 1016B @@ -24,13 +25,13 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { // // (we convert to sector bytes as they are nice round binary numbers) - toFill += toFill / 127 + toFill := uint64(in + (in / 127)) // We need to fill the sector with pieces that are powers of 2. Conveniently // computers store numbers in binary, which means we can look at 1s to get // all the piece sizes we need to fill the sector. It also means that number // of pieces is the number of 1s in the number of remaining bytes to fill - out := make([]uint64, bits.OnesCount64(toFill)) + out := make([]abi.UnpaddedPieceSize, bits.OnesCount64(toFill)) for i := range out { // Extract the next lowest non-zero bit next := bits.TrailingZeros64(toFill) @@ -42,18 +43,18 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { toFill ^= psize // Add the piece size to the list of pieces we need to create - out[i] = sectorbuilder.UserBytesForSectorSize(psize) + out[i] = abi.PaddedPieceSize(psize).Unpadded() } return out, nil } -func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { +func (m *Sealing) fastPledgeCommitment(size abi.UnpaddedPieceSize, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - if size/parts < 127 { - parts = size / 127 + if uint64(size)/parts < 127 { + parts = uint64(size) / 127 } - piece := sectorbuilder.UserBytesForSectorSize((size + size/127) / parts) + piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded() out := make([]sectorbuilder.PublicPieceInfo, parts) var lk sync.Mutex @@ -70,7 +71,7 @@ func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sector err = multierror.Append(err, perr) } out[i] = sectorbuilder.PublicPieceInfo{ - Size: piece, + Size: uint64(piece), CommP: commP, } lk.Unlock()