diff --git a/cbor_gen.go b/cbor_gen.go index fe25daa2d..3cd9b6451 100644 --- a/cbor_gen.go +++ b/cbor_gen.go @@ -310,8 +310,14 @@ func (t *Piece) MarshalCBOR(w io.Writer) error { return err } - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { - return err + if t.DealID == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(*t.DealID))); err != nil { + return err + } } // t.Size (abi.UnpaddedPieceSize) (uint64) @@ -388,25 +394,45 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error { // t.DealID (abi.DealID) (uint64) case "DealID": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + typed := abi.DealID(extra) + t.DealID = &typed + } + } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.DealID = abi.DealID(extra) // t.Size (abi.UnpaddedPieceSize) (uint64) case "Size": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Size = abi.UnpaddedPieceSize(extra) + } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Size = abi.UnpaddedPieceSize(extra) // t.CommP ([]uint8) (slice) case "CommP": @@ -765,36 +791,48 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { // t.State (uint64) (uint64) case "State": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) // t.SectorID (abi.SectorNumber) (uint64) case "SectorID": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.SectorID = abi.SectorNumber(extra) + } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.SectorID = abi.SectorNumber(extra) // t.Nonce (uint64) (uint64) case "Nonce": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Nonce = uint64(extra) + } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Nonce = uint64(extra) // t.Pieces ([]sealing.Piece) (slice) case "Pieces": @@ -1147,14 +1185,18 @@ func (t *Log) UnmarshalCBOR(r io.Reader) error { // t.Timestamp (uint64) (uint64) case "Timestamp": - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Timestamp = uint64(extra) + } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Timestamp = uint64(extra) // t.Trace (string) (string) case "Trace": diff --git a/checks.go b/checks.go index 6cfddee26..1c5df91bc 100644 --- a/checks.go +++ b/checks.go @@ -1,7 +1,12 @@ package sealing import ( + "bytes" "context" + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/lotus/lib/zerocomm" + "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/multiformats/go-multihash" @@ -19,6 +24,7 @@ import ( type ErrApi struct{ error } type ErrInvalidDeals struct{ error } +type ErrInvalidPiece struct{ error } type ErrExpiredDeals struct{ error } type ErrBadCommD struct{ error } @@ -36,7 +42,14 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error { } for i, piece := range si.Pieces { - deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil) + if piece.DealID == nil { + exp := zerocomm.ForSize(piece.Size) + if string(piece.CommP) != string(exp[:]) { + return &ErrInvalidPiece{xerrors.Errorf("deal %d piece %d had non-zero CommP %+v", piece.DealID, i, piece.CommP)} + } + continue + } + deal, err := api.StateMarketStorageDeal(ctx, *piece.DealID, nil) if err != nil { return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)} } @@ -99,7 +112,17 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se if r.ExitCode != 0 { return &ErrBadCommD{xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)} } - if string(r.Return) != string(si.CommD) { + + var c cbg.CborCid + if err := c.UnmarshalCBOR(bytes.NewReader(r.Return)); err != nil { + return err + } + cd, err := commcid.CIDToDataCommitmentV1(cid.Cid(c)) + if err != nil { + return err + } + + if string(cd) != string(si.CommD) { return &ErrBadCommD{xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)} } diff --git a/fsm.go b/fsm.go index 59449303b..06ba11354 100644 --- a/fsm.go +++ b/fsm.go @@ -228,7 +228,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error { } for _, sector := range trackedSectors { - if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil { + if err := m.sectors.Send(uint64(sector.SectorID), SectorRestart{}); err != nil { log.Errorf("restarting sector %d: %+v", sector.SectorID, err) } } diff --git a/garbage.go b/garbage.go index cb68eda48..ecf0a1179 100644 --- a/garbage.go +++ b/garbage.go @@ -1,37 +1,25 @@ package sealing import ( - "bytes" "context" "io" - "math" - "math/bits" - "math/rand" - commcid "github.com/filecoin-project/go-fil-commcid" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/market" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" ) -func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize, parts uint64) io.Reader { - parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - if uint64(size)/parts < 127 { - parts = uint64(size) / 127 +type nullReader struct {} + +func (nullReader) Read(out []byte) (int, error) { + for i := range out { + out[i] = 0 } + return len(out), nil +} - piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded() - - readers := make([]io.Reader, parts) - for i := range readers { - readers[i] = io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)) - } - - return io.MultiReader(readers...) +func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader { + return io.LimitReader(&nullReader{}, int64(size)) } func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) { @@ -41,70 +29,9 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes) - deals := make([]market.ClientDealProposal, len(sizes)) - for i, size := range sizes { - commP, err := m.fastPledgeCommitment(size, uint64(1)) - if err != nil { - return nil, err - } - - sdp := market.DealProposal{ - PieceCID: commcid.PieceCommitmentV1ToCID(commP[:]), - PieceSize: size.Padded(), - Client: m.worker, - Provider: m.maddr, - StartEpoch: math.MaxInt64, - EndEpoch: math.MaxInt64, - StoragePricePerEpoch: types.NewInt(0), - ProviderCollateral: types.NewInt(0), - } - - deals[i] = market.ClientDealProposal{ - Proposal: sdp, - } - } - - log.Infof("Publishing deals for %d", sectorID) - - params, aerr := actors.SerializeParams(&market.PublishStorageDealsParams{ - Deals: deals, - }) - if aerr != nil { - return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", aerr) - } - - smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{ - To: actors.StorageMarketAddress, - From: m.worker, - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: types.NewInt(1000000), - Method: builtin.MethodsMarket.PublishStorageDeals, - Params: params, - }) - if err != nil { - return nil, err - } - r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) // TODO: more finality - if err != nil { - return nil, err - } - if r.Receipt.ExitCode != 0 { - log.Error(xerrors.Errorf("publishing deal (ts %s) %s failed: exit %d", r.TipSet.Key(), smsg.Cid(), r.Receipt.ExitCode)) - } - var resp actors.PublishStorageDealResponse - if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { - return nil, err - } - if len(resp.IDs) != len(sizes) { - return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") - } - - log.Infof("Deals for sector %d: %+v", sectorID, resp.IDs) - out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(1)), existingPieceSizes) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes) if err != nil { return nil, xerrors.Errorf("add piece: %w", err) } @@ -112,9 +39,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e existingPieceSizes = append(existingPieceSizes, size) out[i] = Piece{ - DealID: resp.IDs[i], - Size: abi.UnpaddedPieceSize(ppi.Size), - CommP: ppi.CommP[:], + Size: ppi.Size, + CommP: ppi.CommP[:], } } @@ -141,7 +67,7 @@ func (m *Sealing) PledgeSector() error { return } - if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil { + if err := m.newSector(sid, pieces); err != nil { log.Errorf("%+v", err) return } diff --git a/sealing.go b/sealing.go index 9f230d578..1ad733935 100644 --- a/sealing.go +++ b/sealing.go @@ -120,20 +120,20 @@ func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r i return xerrors.Errorf("adding piece to sector: %w", err) } - return m.newSector(ctx, sectorID, dealID, ppi) + return m.newSector(sectorID, []Piece{ + { + DealID: &dealID, + + Size: ppi.Size, + CommP: ppi.CommP[:], + }, + },) } -func (m *Sealing) newSector(ctx context.Context, sid abi.SectorNumber, dealID abi.DealID, ppi sectorbuilder.PublicPieceInfo) error { +func (m *Sealing) newSector(sid abi.SectorNumber, pieces []Piece) error { log.Infof("Start sealing %d", sid) - return m.sectors.Send(sid, SectorStart{ + return m.sectors.Send(uint64(sid), SectorStart{ id: sid, - pieces: []Piece{ - { - DealID: dealID, - - Size: abi.UnpaddedPieceSize(ppi.Size), - CommP: ppi.CommP[:], - }, - }, + pieces: pieces, }) } diff --git a/states.go b/states.go index 4cd6549a8..273f9fd4f 100644 --- a/states.go +++ b/states.go @@ -99,8 +99,9 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf } params := &miner.SectorPreCommitInfo{ - Expiration: 0, + Expiration: 10000000, // TODO: implement SectorNumber: sector.SectorID, + RegisteredProof: abi.RegisteredProof_StackedDRG32GiBSeal, SealedCID: commcid.ReplicaCommitmentV1ToCID(sector.CommR), SealRandEpoch: sector.Ticket.BlockHeight, diff --git a/types.go b/types.go index ccfe50427..e2a2c049d 100644 --- a/types.go +++ b/types.go @@ -34,7 +34,7 @@ func (t *SealSeed) Equals(o *SealSeed) bool { } type Piece struct { - DealID abi.DealID + DealID *abi.DealID Size abi.UnpaddedPieceSize CommP []byte @@ -97,9 +97,12 @@ func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { } func (t *SectorInfo) deals() []abi.DealID { - out := make([]abi.DealID, len(t.Pieces)) - for i, piece := range t.Pieces { - out[i] = piece.DealID + out := make([]abi.DealID, 0, len(t.Pieces)) + for _, piece := range t.Pieces { + if piece.DealID == nil { + continue + } + out = append(out, *piece.DealID) } return out } diff --git a/utils.go b/utils.go index 0bfbb30fe..cfc17734c 100644 --- a/utils.go +++ b/utils.go @@ -1,15 +1,8 @@ package sealing import ( - "io" - "math/bits" - "math/rand" - "sync" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/hashicorp/go-multierror" - - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "math/bits" ) func fillersFromRem(in abi.UnpaddedPieceSize) ([]abi.UnpaddedPieceSize, error) { @@ -48,44 +41,6 @@ func fillersFromRem(in abi.UnpaddedPieceSize) ([]abi.UnpaddedPieceSize, error) { return out, nil } -func (m *Sealing) fastPledgeCommitment(size abi.UnpaddedPieceSize, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { - parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - if uint64(size)/parts < 127 { - parts = uint64(size) / 127 - } - - piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded() - out := make([]sectorbuilder.PublicPieceInfo, parts) - var lk sync.Mutex - - var wg sync.WaitGroup - wg.Add(int(parts)) - for i := uint64(0); i < parts; i++ { - go func(i uint64) { - defer wg.Done() - - commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece) - - lk.Lock() - if perr != nil { - err = multierror.Append(err, perr) - } - out[i] = sectorbuilder.PublicPieceInfo{ - Size: piece, - CommP: commP, - } - lk.Unlock() - }(i) - } - wg.Wait() - - if err != nil { - return [32]byte{}, err - } - - return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out) -} - func (m *Sealing) ListSectors() ([]SectorInfo, error) { var sectors []SectorInfo if err := m.sectors.List(§ors); err != nil { @@ -96,6 +51,6 @@ func (m *Sealing) ListSectors() ([]SectorInfo, error) { func (m *Sealing) GetSectorInfo(sid abi.SectorNumber) (SectorInfo, error) { var out SectorInfo - err := m.sectors.Get(sid).Get(&out) + err := m.sectors.Get(uint64(sid)).Get(&out) return out, err }