Implement committed capacity sectors

This commit is contained in:
Łukasz Magiera 2020-02-23 01:47:47 +01:00
parent da236a26fc
commit d04c304c93
8 changed files with 148 additions and 198 deletions

View File

@ -310,8 +310,14 @@ func (t *Piece) MarshalCBOR(w io.Writer) error {
return err
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil {
return err
if t.DealID == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(*t.DealID))); err != nil {
return err
}
}
// t.Size (abi.UnpaddedPieceSize) (uint64)
@ -388,25 +394,45 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error {
// t.DealID (abi.DealID) (uint64)
case "DealID":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
typed := abi.DealID(extra)
t.DealID = &typed
}
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = abi.DealID(extra)
// t.Size (abi.UnpaddedPieceSize) (uint64)
case "Size":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = abi.UnpaddedPieceSize(extra)
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = abi.UnpaddedPieceSize(extra)
// t.CommP ([]uint8) (slice)
case "CommP":
@ -765,36 +791,48 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
// t.State (uint64) (uint64)
case "State":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
// t.SectorID (abi.SectorNumber) (uint64)
case "SectorID":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = abi.SectorNumber(extra)
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = abi.SectorNumber(extra)
// t.Nonce (uint64) (uint64)
case "Nonce":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Nonce = uint64(extra)
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Nonce = uint64(extra)
// t.Pieces ([]sealing.Piece) (slice)
case "Pieces":
@ -1147,14 +1185,18 @@ func (t *Log) UnmarshalCBOR(r io.Reader) error {
// t.Timestamp (uint64) (uint64)
case "Timestamp":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Timestamp = uint64(extra)
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Timestamp = uint64(extra)
// t.Trace (string) (string)
case "Trace":

View File

@ -1,7 +1,12 @@
package sealing
import (
"bytes"
"context"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/lotus/lib/zerocomm"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/multiformats/go-multihash"
@ -19,6 +24,7 @@ import (
type ErrApi struct{ error }
type ErrInvalidDeals struct{ error }
type ErrInvalidPiece struct{ error }
type ErrExpiredDeals struct{ error }
type ErrBadCommD struct{ error }
@ -36,7 +42,14 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error {
}
for i, piece := range si.Pieces {
deal, err := api.StateMarketStorageDeal(ctx, piece.DealID, nil)
if piece.DealID == nil {
exp := zerocomm.ForSize(piece.Size)
if string(piece.CommP) != string(exp[:]) {
return &ErrInvalidPiece{xerrors.Errorf("deal %d piece %d had non-zero CommP %+v", piece.DealID, i, piece.CommP)}
}
continue
}
deal, err := api.StateMarketStorageDeal(ctx, *piece.DealID, nil)
if err != nil {
return &ErrApi{xerrors.Errorf("getting deal %d for piece %d: %w", piece.DealID, i, err)}
}
@ -99,7 +112,17 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se
if r.ExitCode != 0 {
return &ErrBadCommD{xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)}
}
if string(r.Return) != string(si.CommD) {
var c cbg.CborCid
if err := c.UnmarshalCBOR(bytes.NewReader(r.Return)); err != nil {
return err
}
cd, err := commcid.CIDToDataCommitmentV1(cid.Cid(c))
if err != nil {
return err
}
if string(cd) != string(si.CommD) {
return &ErrBadCommD{xerrors.Errorf("on chain CommD differs from sector: %x != %x", r.Return, si.CommD)}
}

2
fsm.go
View File

@ -228,7 +228,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
}
for _, sector := range trackedSectors {
if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil {
if err := m.sectors.Send(uint64(sector.SectorID), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorID, err)
}
}

View File

@ -1,37 +1,25 @@
package sealing
import (
"bytes"
"context"
"io"
"math"
"math/bits"
"math/rand"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
)
func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize, parts uint64) io.Reader {
parts = 1 << bits.Len64(parts) // round down to nearest power of 2
if uint64(size)/parts < 127 {
parts = uint64(size) / 127
type nullReader struct {}
func (nullReader) Read(out []byte) (int, error) {
for i := range out {
out[i] = 0
}
return len(out), nil
}
piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded()
readers := make([]io.Reader, parts)
for i := range readers {
readers[i] = io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece))
}
return io.MultiReader(readers...)
func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader {
return io.LimitReader(&nullReader{}, int64(size))
}
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) {
@ -41,70 +29,9 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
deals := make([]market.ClientDealProposal, len(sizes))
for i, size := range sizes {
commP, err := m.fastPledgeCommitment(size, uint64(1))
if err != nil {
return nil, err
}
sdp := market.DealProposal{
PieceCID: commcid.PieceCommitmentV1ToCID(commP[:]),
PieceSize: size.Padded(),
Client: m.worker,
Provider: m.maddr,
StartEpoch: math.MaxInt64,
EndEpoch: math.MaxInt64,
StoragePricePerEpoch: types.NewInt(0),
ProviderCollateral: types.NewInt(0),
}
deals[i] = market.ClientDealProposal{
Proposal: sdp,
}
}
log.Infof("Publishing deals for %d", sectorID)
params, aerr := actors.SerializeParams(&market.PublishStorageDealsParams{
Deals: deals,
})
if aerr != nil {
return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", aerr)
}
smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: m.worker,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: builtin.MethodsMarket.PublishStorageDeals,
Params: params,
})
if err != nil {
return nil, err
}
r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) // TODO: more finality
if err != nil {
return nil, err
}
if r.Receipt.ExitCode != 0 {
log.Error(xerrors.Errorf("publishing deal (ts %s) %s failed: exit %d", r.TipSet.Key(), smsg.Cid(), r.Receipt.ExitCode))
}
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return nil, err
}
if len(resp.IDs) != len(sizes) {
return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals")
}
log.Infof("Deals for sector %d: %+v", sectorID, resp.IDs)
out := make([]Piece, len(sizes))
for i, size := range sizes {
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(1)), existingPieceSizes)
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes)
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}
@ -112,9 +39,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e
existingPieceSizes = append(existingPieceSizes, size)
out[i] = Piece{
DealID: resp.IDs[i],
Size: abi.UnpaddedPieceSize(ppi.Size),
CommP: ppi.CommP[:],
Size: ppi.Size,
CommP: ppi.CommP[:],
}
}
@ -141,7 +67,7 @@ func (m *Sealing) PledgeSector() error {
return
}
if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil {
if err := m.newSector(sid, pieces); err != nil {
log.Errorf("%+v", err)
return
}

View File

@ -120,20 +120,20 @@ func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r i
return xerrors.Errorf("adding piece to sector: %w", err)
}
return m.newSector(ctx, sectorID, dealID, ppi)
return m.newSector(sectorID, []Piece{
{
DealID: &dealID,
Size: ppi.Size,
CommP: ppi.CommP[:],
},
},)
}
func (m *Sealing) newSector(ctx context.Context, sid abi.SectorNumber, dealID abi.DealID, ppi sectorbuilder.PublicPieceInfo) error {
func (m *Sealing) newSector(sid abi.SectorNumber, pieces []Piece) error {
log.Infof("Start sealing %d", sid)
return m.sectors.Send(sid, SectorStart{
return m.sectors.Send(uint64(sid), SectorStart{
id: sid,
pieces: []Piece{
{
DealID: dealID,
Size: abi.UnpaddedPieceSize(ppi.Size),
CommP: ppi.CommP[:],
},
},
pieces: pieces,
})
}

View File

@ -99,8 +99,9 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
}
params := &miner.SectorPreCommitInfo{
Expiration: 0,
Expiration: 10000000, // TODO: implement
SectorNumber: sector.SectorID,
RegisteredProof: abi.RegisteredProof_StackedDRG32GiBSeal,
SealedCID: commcid.ReplicaCommitmentV1ToCID(sector.CommR),
SealRandEpoch: sector.Ticket.BlockHeight,

View File

@ -34,7 +34,7 @@ func (t *SealSeed) Equals(o *SealSeed) bool {
}
type Piece struct {
DealID abi.DealID
DealID *abi.DealID
Size abi.UnpaddedPieceSize
CommP []byte
@ -97,9 +97,12 @@ func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
}
func (t *SectorInfo) deals() []abi.DealID {
out := make([]abi.DealID, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.DealID
out := make([]abi.DealID, 0, len(t.Pieces))
for _, piece := range t.Pieces {
if piece.DealID == nil {
continue
}
out = append(out, *piece.DealID)
}
return out
}

View File

@ -1,15 +1,8 @@
package sealing
import (
"io"
"math/bits"
"math/rand"
"sync"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/hashicorp/go-multierror"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"math/bits"
)
func fillersFromRem(in abi.UnpaddedPieceSize) ([]abi.UnpaddedPieceSize, error) {
@ -48,44 +41,6 @@ func fillersFromRem(in abi.UnpaddedPieceSize) ([]abi.UnpaddedPieceSize, error) {
return out, nil
}
func (m *Sealing) fastPledgeCommitment(size abi.UnpaddedPieceSize, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) {
parts = 1 << bits.Len64(parts) // round down to nearest power of 2
if uint64(size)/parts < 127 {
parts = uint64(size) / 127
}
piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded()
out := make([]sectorbuilder.PublicPieceInfo, parts)
var lk sync.Mutex
var wg sync.WaitGroup
wg.Add(int(parts))
for i := uint64(0); i < parts; i++ {
go func(i uint64) {
defer wg.Done()
commP, perr := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece)), piece)
lk.Lock()
if perr != nil {
err = multierror.Append(err, perr)
}
out[i] = sectorbuilder.PublicPieceInfo{
Size: piece,
CommP: commP,
}
lk.Unlock()
}(i)
}
wg.Wait()
if err != nil {
return [32]byte{}, err
}
return sectorbuilder.GenerateDataCommitment(m.sb.SectorSize(), out)
}
func (m *Sealing) ListSectors() ([]SectorInfo, error) {
var sectors []SectorInfo
if err := m.sectors.List(&sectors); err != nil {
@ -96,6 +51,6 @@ func (m *Sealing) ListSectors() ([]SectorInfo, error) {
func (m *Sealing) GetSectorInfo(sid abi.SectorNumber) (SectorInfo, error) {
var out SectorInfo
err := m.sectors.Get(sid).Get(&out)
err := m.sectors.Get(uint64(sid)).Get(&out)
return out, err
}