From 03f31d74f18b61ea13292e7f81bdf5af2dd6e314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 15 Jan 2020 21:49:11 +0100 Subject: [PATCH] Move miner sealing logic into a separate package --- cbor_gen.go | 1092 ++++++++++++++++++++++++++++++++++++++++++ garbage.go | 127 +++++ sector_fsm.go | 264 ++++++++++ sector_states.go | 254 ++++++++++ sector_types.go | 106 ++++ sector_utils.go | 57 +++ sector_utils_test.go | 46 ++ sectors.go | 136 ++++++ sectors_fsm_test.go | 79 +++ sectors_test.go | 56 +++ 10 files changed, 2217 insertions(+) create mode 100644 cbor_gen.go create mode 100644 garbage.go create mode 100644 sector_fsm.go create mode 100644 sector_states.go create mode 100644 sector_types.go create mode 100644 sector_utils.go create mode 100644 sector_utils_test.go create mode 100644 sectors.go create mode 100644 sectors_fsm_test.go create mode 100644 sectors_test.go diff --git a/cbor_gen.go b/cbor_gen.go new file mode 100644 index 000000000..62c6241dc --- /dev/null +++ b/cbor_gen.go @@ -0,0 +1,1092 @@ +package sealing + +import ( + "fmt" + "io" + + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +var _ = xerrors.Errorf + +func (t *SealTicket) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{162}); err != nil { + return err + } + + // t.BlockHeight (uint64) (uint64) + if len("BlockHeight") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"BlockHeight\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("BlockHeight")))); err != nil { + return err + } + if _, err := w.Write([]byte("BlockHeight")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.BlockHeight))); err != nil { + return err + } + + // t.TicketBytes ([]uint8) (slice) + if len("TicketBytes") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TicketBytes\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("TicketBytes")))); err != nil { + return err + } + if _, err := w.Write([]byte("TicketBytes")); err != nil { + return err + } + + if len(t.TicketBytes) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.TicketBytes was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.TicketBytes)))); err != nil { + return err + } + if _, err := w.Write(t.TicketBytes); err != nil { + return err + } + return nil +} + +func (t *SealTicket) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + var name string + + // t.BlockHeight (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "BlockHeight" { + return fmt.Errorf("expected struct map entry %s to be BlockHeight", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.BlockHeight = uint64(extra) + // t.TicketBytes ([]uint8) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "TicketBytes" { + return fmt.Errorf("expected struct map entry %s to be TicketBytes", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.TicketBytes: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.TicketBytes = make([]byte, extra) + if _, err := io.ReadFull(br, t.TicketBytes); err != nil { + return err + } + return nil +} + +func (t *SealSeed) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{162}); err != nil { + return err + } + + // t.BlockHeight (uint64) (uint64) + if len("BlockHeight") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"BlockHeight\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("BlockHeight")))); err != nil { + return err + } + if _, err := w.Write([]byte("BlockHeight")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.BlockHeight))); err != nil { + return err + } + + // t.TicketBytes ([]uint8) (slice) + if len("TicketBytes") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TicketBytes\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("TicketBytes")))); err != nil { + return err + } + if _, err := w.Write([]byte("TicketBytes")); err != nil { + return err + } + + if len(t.TicketBytes) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.TicketBytes was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.TicketBytes)))); err != nil { + return err + } + if _, err := w.Write(t.TicketBytes); err != nil { + return err + } + return nil +} + +func (t *SealSeed) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + var name string + + // t.BlockHeight (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "BlockHeight" { + return fmt.Errorf("expected struct map entry %s to be BlockHeight", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.BlockHeight = uint64(extra) + // t.TicketBytes ([]uint8) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "TicketBytes" { + return fmt.Errorf("expected struct map entry %s to be TicketBytes", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.TicketBytes: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.TicketBytes = make([]byte, extra) + if _, err := io.ReadFull(br, t.TicketBytes); err != nil { + return err + } + return nil +} + +func (t *Piece) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{163}); err != nil { + return err + } + + // t.DealID (uint64) (uint64) + if len("DealID") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"DealID\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("DealID")))); err != nil { + return err + } + if _, err := w.Write([]byte("DealID")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { + return err + } + + // t.Size (uint64) (uint64) + if len("Size") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Size\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Size")))); err != nil { + return err + } + if _, err := w.Write([]byte("Size")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil { + return err + } + + // t.CommP ([]uint8) (slice) + if len("CommP") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"CommP\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("CommP")))); err != nil { + return err + } + if _, err := w.Write([]byte("CommP")); err != nil { + return err + } + + if len(t.CommP) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.CommP was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommP)))); err != nil { + return err + } + if _, err := w.Write(t.CommP); err != nil { + return err + } + return nil +} + +func (t *Piece) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + var name string + + // t.DealID (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "DealID" { + return fmt.Errorf("expected struct map entry %s to be DealID", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = uint64(extra) + // t.Size (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "Size" { + return fmt.Errorf("expected struct map entry %s to be Size", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Size = uint64(extra) + // t.CommP ([]uint8) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "CommP" { + return fmt.Errorf("expected struct map entry %s to be CommP", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.CommP: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.CommP = make([]byte, extra) + if _, err := io.ReadFull(br, t.CommP); err != nil { + return err + } + return nil +} + +func (t *SectorInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{173}); err != nil { + return err + } + + // t.State (uint64) (uint64) + if len("State") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"State\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("State")))); err != nil { + return err + } + if _, err := w.Write([]byte("State")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.SectorID (uint64) (uint64) + if len("SectorID") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"SectorID\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("SectorID")))); err != nil { + return err + } + if _, err := w.Write([]byte("SectorID")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.SectorID))); err != nil { + return err + } + + // t.Nonce (uint64) (uint64) + if len("Nonce") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Nonce\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Nonce")))); err != nil { + return err + } + if _, err := w.Write([]byte("Nonce")); err != nil { + return err + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Nonce))); err != nil { + return err + } + + // t.Pieces ([]storage.Piece) (slice) + if len("Pieces") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Pieces\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Pieces")))); err != nil { + return err + } + if _, err := w.Write([]byte("Pieces")); err != nil { + return err + } + + if len(t.Pieces) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Pieces was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Pieces)))); err != nil { + return err + } + for _, v := range t.Pieces { + if err := v.MarshalCBOR(w); err != nil { + return err + } + } + + // t.CommD ([]uint8) (slice) + if len("CommD") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"CommD\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("CommD")))); err != nil { + return err + } + if _, err := w.Write([]byte("CommD")); err != nil { + return err + } + + if len(t.CommD) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.CommD was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommD)))); err != nil { + return err + } + if _, err := w.Write(t.CommD); err != nil { + return err + } + + // t.CommR ([]uint8) (slice) + if len("CommR") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"CommR\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("CommR")))); err != nil { + return err + } + if _, err := w.Write([]byte("CommR")); err != nil { + return err + } + + if len(t.CommR) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.CommR was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommR)))); err != nil { + return err + } + if _, err := w.Write(t.CommR); err != nil { + return err + } + + // t.Proof ([]uint8) (slice) + if len("Proof") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Proof\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Proof")))); err != nil { + return err + } + if _, err := w.Write([]byte("Proof")); err != nil { + return err + } + + if len(t.Proof) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Proof was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Proof)))); err != nil { + return err + } + if _, err := w.Write(t.Proof); err != nil { + return err + } + + // t.Ticket (storage.SealTicket) (struct) + if len("Ticket") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Ticket\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Ticket")))); err != nil { + return err + } + if _, err := w.Write([]byte("Ticket")); err != nil { + return err + } + + if err := t.Ticket.MarshalCBOR(w); err != nil { + return err + } + + // t.PreCommitMessage (cid.Cid) (struct) + if len("PreCommitMessage") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PreCommitMessage\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("PreCommitMessage")))); err != nil { + return err + } + if _, err := w.Write([]byte("PreCommitMessage")); err != nil { + return err + } + + if t.PreCommitMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PreCommitMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PreCommitMessage: %w", err) + } + } + + // t.Seed (storage.SealSeed) (struct) + if len("Seed") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Seed\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Seed")))); err != nil { + return err + } + if _, err := w.Write([]byte("Seed")); err != nil { + return err + } + + if err := t.Seed.MarshalCBOR(w); err != nil { + return err + } + + // t.CommitMessage (cid.Cid) (struct) + if len("CommitMessage") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"CommitMessage\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("CommitMessage")))); err != nil { + return err + } + if _, err := w.Write([]byte("CommitMessage")); err != nil { + return err + } + + if t.CommitMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.CommitMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err) + } + } + + // t.FaultReportMsg (cid.Cid) (struct) + if len("FaultReportMsg") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"FaultReportMsg\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("FaultReportMsg")))); err != nil { + return err + } + if _, err := w.Write([]byte("FaultReportMsg")); err != nil { + return err + } + + if t.FaultReportMsg == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.FaultReportMsg); err != nil { + return xerrors.Errorf("failed to write cid field t.FaultReportMsg: %w", err) + } + } + + // t.LastErr (string) (string) + if len("LastErr") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"LastErr\" was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("LastErr")))); err != nil { + return err + } + if _, err := w.Write([]byte("LastErr")); err != nil { + return err + } + + if len(t.LastErr) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.LastErr was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.LastErr)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.LastErr)); err != nil { + return err + } + return nil +} + +func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra != 13 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + var name string + + // t.State (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "State" { + return fmt.Errorf("expected struct map entry %s to be State", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.SectorID (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "SectorID" { + return fmt.Errorf("expected struct map entry %s to be SectorID", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.SectorID = uint64(extra) + // t.Nonce (uint64) (uint64) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "Nonce" { + return fmt.Errorf("expected struct map entry %s to be Nonce", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Nonce = uint64(extra) + // t.Pieces ([]storage.Piece) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "Pieces" { + return fmt.Errorf("expected struct map entry %s to be Pieces", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Pieces: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.Pieces = make([]Piece, extra) + } + for i := 0; i < int(extra); i++ { + + var v Piece + if err := v.UnmarshalCBOR(br); err != nil { + return err + } + + t.Pieces[i] = v + } + + // t.CommD ([]uint8) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "CommD" { + return fmt.Errorf("expected struct map entry %s to be CommD", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.CommD: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.CommD = make([]byte, extra) + if _, err := io.ReadFull(br, t.CommD); err != nil { + return err + } + // t.CommR ([]uint8) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "CommR" { + return fmt.Errorf("expected struct map entry %s to be CommR", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.CommR: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.CommR = make([]byte, extra) + if _, err := io.ReadFull(br, t.CommR); err != nil { + return err + } + // t.Proof ([]uint8) (slice) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "Proof" { + return fmt.Errorf("expected struct map entry %s to be Proof", name) + } + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Proof: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.Proof = make([]byte, extra) + if _, err := io.ReadFull(br, t.Proof); err != nil { + return err + } + // t.Ticket (storage.SealTicket) (struct) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "Ticket" { + return fmt.Errorf("expected struct map entry %s to be Ticket", name) + } + + { + + if err := t.Ticket.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.PreCommitMessage (cid.Cid) (struct) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "PreCommitMessage" { + return fmt.Errorf("expected struct map entry %s to be PreCommitMessage", name) + } + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PreCommitMessage: %w", err) + } + + t.PreCommitMessage = &c + } + + } + // t.Seed (storage.SealSeed) (struct) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "Seed" { + return fmt.Errorf("expected struct map entry %s to be Seed", name) + } + + { + + if err := t.Seed.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.CommitMessage (cid.Cid) (struct) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "CommitMessage" { + return fmt.Errorf("expected struct map entry %s to be CommitMessage", name) + } + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err) + } + + t.CommitMessage = &c + } + + } + // t.FaultReportMsg (cid.Cid) (struct) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "FaultReportMsg" { + return fmt.Errorf("expected struct map entry %s to be FaultReportMsg", name) + } + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.FaultReportMsg: %w", err) + } + + t.FaultReportMsg = &c + } + + } + // t.LastErr (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + name = string(sval) + } + + if name != "LastErr" { + return fmt.Errorf("expected struct map entry %s to be LastErr", name) + } + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.LastErr = string(sval) + } + return nil +} diff --git a/garbage.go b/garbage.go new file mode 100644 index 000000000..1c3925671 --- /dev/null +++ b/garbage.go @@ -0,0 +1,127 @@ +package sealing + +import ( + "bytes" + "context" + "io" + "math" + "math/rand" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" +) + +func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { + if len(sizes) == 0 { + return nil, nil + } + + deals := make([]actors.StorageDealProposal, len(sizes)) + for i, size := range sizes { + release := m.sb.RateLimit() + commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) + release() + + if err != nil { + return nil, err + } + + sdp := actors.StorageDealProposal{ + PieceRef: commP[:], + PieceSize: size, + Client: m.worker, + Provider: m.maddr, + ProposalExpiration: math.MaxUint64, + Duration: math.MaxUint64 / 2, // /2 because overflows + StoragePricePerEpoch: types.NewInt(0), + StorageCollateral: types.NewInt(0), + ProposerSignature: nil, // nil because self dealing + } + + deals[i] = sdp + } + + params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{ + Deals: deals, + }) + if aerr != nil { + return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", aerr) + } + + smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: m.worker, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.PublishStorageDeals, + Params: params, + }) + if err != nil { + return nil, err + } + r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return nil, err + } + if r.Receipt.ExitCode != 0 { + log.Error(xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)) + } + var resp actors.PublishStorageDealResponse + if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { + return nil, err + } + if len(resp.DealIDs) != len(sizes) { + return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") + } + + out := make([]Piece, len(sizes)) + + for i, size := range sizes { + ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) + if err != nil { + return nil, err + } + + existingPieceSizes = append(existingPieceSizes, size) + + out[i] = Piece{ + DealID: resp.DealIDs[i], + Size: ppi.Size, + CommP: ppi.CommP[:], + } + } + + return out, nil +} + +func (m *Sealing) PledgeSector() error { + go func() { + ctx := context.TODO() // we can't use the context from command which invokes + // this, as we run everything here async, and it's cancelled when the + // command exits + + size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) + + sid, err := m.sb.AcquireSectorId() + if err != nil { + log.Errorf("%+v", err) + return + } + + pieces, err := m.pledgeSector(ctx, sid, []uint64{}, size) + if err != nil { + log.Errorf("%+v", err) + return + } + + if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil { + log.Errorf("%+v", err) + return + } + }() + return nil +} diff --git a/sector_fsm.go b/sector_fsm.go new file mode 100644 index 000000000..50b62844a --- /dev/null +++ b/sector_fsm.go @@ -0,0 +1,264 @@ +package sealing + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +type SectorStart struct { + id uint64 + pieces []Piece +} +type SectorRestart struct{} + +type SectorFatalError struct{ error } + +type SectorPacked struct{ pieces []Piece } + +type SectorSealed struct { + commR []byte + commD []byte + ticket SealTicket +} +type SectorSealFailed struct{ error } + +type SectorPreCommitFailed struct{ error } +type SectorPreCommitted struct { + message cid.Cid +} + +type SectorSeedReady struct { + seed SealSeed +} + +type SectorSealCommitFailed struct{ error } +type SectorCommitFailed struct{ error } +type SectorCommitted struct { + message cid.Cid + proof []byte +} + +type SectorProving struct{} + +type SectorFaultReported struct{ reportMsg cid.Cid } +type SectorFaultedFinal struct{} + +type SectorForceState struct { + state api.SectorState +} + +func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) { + next, err := m.plan(events, user.(*SectorInfo)) + if err != nil || next == nil { + return nil, err + } + + return func(ctx statemachine.Context, si SectorInfo) error { + err := next(ctx, si) + if err != nil { + if err := ctx.Send(SectorFatalError{error: err}); err != nil { + return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err) + } + } + + return nil + }, nil +} + +func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { + ///// + // First process all events + + for _, event := range events { + if err, ok := event.User.(error); ok { + state.LastErr = fmt.Sprintf("%+v", err) + } + + switch event := event.User.(type) { + case SectorStart: + // TODO: check if state is clean + state.SectorID = event.id + state.Pieces = event.pieces + state.State = api.Packing + + case SectorRestart: + // noop + case SectorFatalError: + log.Errorf("Fatal error on sector %d: %+v", state.SectorID, event.error) + // TODO: Do we want to mark the state as unrecoverable? + // I feel like this should be a softer error, where the user would + // be able to send a retry event of some kind + return nil, nil + + // // TODO: Incoming + // TODO: for those - look at dealIDs matching chain + + // // + // Packing + + case SectorPacked: + // TODO: assert state + state.Pieces = append(state.Pieces, event.pieces...) + state.State = api.Unsealed + + // // Unsealed + + case SectorSealFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.SealFailed + + case SectorSealed: + state.CommD = event.commD + state.CommR = event.commR + state.Ticket = event.ticket + state.State = api.PreCommitting + + // // PreCommit + + case SectorPreCommitFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.PreCommitFailed + case SectorPreCommitted: + state.PreCommitMessage = &event.message + state.State = api.PreCommitted + + case SectorSeedReady: + state.Seed = event.seed + state.State = api.Committing + + // // Commit + + case SectorSealCommitFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.SealCommitFailed + case SectorCommitFailed: + // TODO: try to find out the reason, maybe retry + state.State = api.SealFailed + case SectorCommitted: + state.Proof = event.proof + state.CommitMessage = &event.message + state.State = api.CommitWait + case SectorProving: + state.State = api.Proving + + case SectorFaultReported: + state.FaultReportMsg = &event.reportMsg + state.State = api.FaultReported + case SectorFaultedFinal: + state.State = api.FaultedFinal + + // // Debug triggers + case SectorForceState: + state.State = event.state + } + } + + ///// + // Now decide what to do next + + /* + + * Empty + | | + | v + *<- Packing <- incoming + | | + | v + *<- Unsealed <--> SealFailed + | | + | v + * PreCommitting <--> PreCommitFailed + | | ^ + | v | + *<- PreCommitted ------/ + | ||| + | vvv v--> SealCommitFailed + *<- Committing + | | ^--> CommitFailed + | v ^ + *<- CommitWait ---/ + | | + | v + *<- Proving + | + v + FailedUnrecoverable + + UndefinedSectorState <- ¯\_(ツ)_/¯ + | ^ + *---------------------/ + + */ + + switch state.State { + // Happy path + case api.Packing: + return m.handlePacking, nil + case api.Unsealed: + return m.handleUnsealed, nil + case api.PreCommitting: + return m.handlePreCommitting, nil + case api.PreCommitted: + return m.handlePreCommitted, nil + case api.Committing: + return m.handleCommitting, nil + case api.CommitWait: + return m.handleCommitWait, nil + case api.Proving: + // TODO: track sector health / expiration + log.Infof("Proving sector %d", state.SectorID) + + // Handled failure modes + case api.SealFailed: + log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID) + case api.PreCommitFailed: + log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID) + case api.SealCommitFailed: + log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID) + case api.CommitFailed: + log.Warnf("sector %d entered unimplemented state 'CommitFailed'", state.SectorID) + + // Faults + case api.Faulty: + return m.handleFaulty, nil + case api.FaultReported: + return m.handleFaultReported, nil + + // Fatal errors + case api.UndefinedSectorState: + log.Error("sector update with undefined state!") + case api.FailedUnrecoverable: + log.Errorf("sector %d failed unrecoverably", state.SectorID) + default: + log.Errorf("unexpected sector update state: %d", state.State) + } + + return nil, nil +} + +func (m *Sealing) restartSectors(ctx context.Context) error { + trackedSectors, err := m.ListSectors() + if err != nil { + log.Errorf("loading sector list: %+v", err) + } + + for _, sector := range trackedSectors { + if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil { + log.Errorf("restarting sector %d: %+v", sector.SectorID, err) + } + } + + // TODO: Grab on-chain sector set and diff with trackedSectors + + return nil +} + +func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error { + return m.sectors.Send(id, SectorForceState{state}) +} diff --git a/sector_states.go b/sector_states.go new file mode 100644 index 000000000..1dbc6e4d4 --- /dev/null +++ b/sector_states.go @@ -0,0 +1,254 @@ +package sealing + +import ( + "context" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { + log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) + + var allocated uint64 + for _, piece := range sector.Pieces { + allocated += piece.Size + } + + ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize()) + + if allocated > ubytes { + return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) + } + + fillerSizes, err := fillersFromRem(ubytes - allocated) + if err != nil { + return err + } + + if len(fillerSizes) > 0 { + log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) + } + + pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...) + if err != nil { + return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) + } + + return ctx.Send(SectorPacked{pieces: pieces}) +} + +func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { + log.Infow("performing sector replication...", "sector", sector.SectorID) + ticket, err := m.tktFn(ctx.Context()) + if err != nil { + return err + } + + rspco, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, *ticket, sector.pieceInfos()) + if err != nil { + return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) + } + + return ctx.Send(SectorSealed{ + commD: rspco.CommD[:], + commR: rspco.CommR[:], + ticket: SealTicket{ + BlockHeight: ticket.BlockHeight, + TicketBytes: ticket.TicketBytes[:], + }, + }) +} + +func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { + params := &actors.SectorPreCommitInfo{ + SectorNumber: sector.SectorID, + + CommR: sector.CommR, + SealEpoch: sector.Ticket.BlockHeight, + DealIDs: sector.deals(), + } + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.PreCommitSector, + Params: enc, + Value: types.NewInt(0), // TODO: need to ensure sufficient collateral + GasLimit: types.NewInt(1000000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + log.Info("submitting precommit for sector: ", sector.SectorID) + smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) + if err != nil { + return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) + } + + return ctx.Send(SectorPreCommitted{message: smsg.Cid()}) +} + +func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error { + // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts + log.Info("Sector precommitted: ", sector.SectorID) + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) + if err != nil { + return ctx.Send(SectorPreCommitFailed{err}) + } + + if mw.Receipt.ExitCode != 0 { + log.Error("sector precommit failed: ", mw.Receipt.ExitCode) + err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode) + return ctx.Send(SectorPreCommitFailed{err}) + } + log.Info("precommit message landed on chain: ", sector.SectorID) + + randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied + log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight) + + err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH uint64) error { + rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), int64(randHeight)) + if err != nil { + err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + + ctx.Send(SectorFatalError{error: err}) + return err + } + + ctx.Send(SectorSeedReady{seed: SealSeed{ + BlockHeight: randHeight, + TicketBytes: rand, + }}) + + return nil + }, func(ctx context.Context, ts *types.TipSet) error { + log.Warn("revert in interactive commit sector step") + // TODO: need to cancel running process and restart... + return nil + }, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay) + if err != nil { + log.Warn("waitForPreCommitMessage ChainAt errored: ", err) + } + + return nil +} + +func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { + log.Info("scheduling seal proof computation...") + + proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) + if err != nil { + return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) + } + + // TODO: Consider splitting states and persist proof for faster recovery + + params := &actors.SectorProveCommitInfo{ + Proof: proof, + SectorID: sector.SectorID, + DealIDs: sector.deals(), + } + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)}) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.ProveCommitSector, + Params: enc, + Value: types.NewInt(0), // TODO: need to ensure sufficient collateral + GasLimit: types.NewInt(1000000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) + if err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) + } + + return ctx.Send(SectorCommitted{ + proof: proof, + message: smsg.Cid(), + }) +} + +func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error { + if sector.CommitMessage == nil { + log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")}) + } + + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage) + if err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)}) + } + + if mw.Receipt.ExitCode != 0 { + log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof) + return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode) + } + + return ctx.Send(SectorProving{}) +} + +func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { + // TODO: check if the fault has already been reported, and that this sector is even valid + + // TODO: coalesce faulty sector reporting + bf := types.NewBitField() + bf.Set(sector.SectorID) + + fp := &actors.DeclareFaultsParams{bf} + _ = fp + enc, aerr := actors.SerializeParams(nil) + if aerr != nil { + return xerrors.Errorf("failed to serialize declare fault params: %w", aerr) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.DeclareFaults, + Params: enc, + Value: types.NewInt(0), // TODO: need to ensure sufficient collateral + GasLimit: types.NewInt(1000000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg) + if err != nil { + return xerrors.Errorf("failed to push declare faults message to network: %w", err) + } + + return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()}) +} + +func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error { + if sector.FaultReportMsg == nil { + return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid") + } + + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg) + if err != nil { + return xerrors.Errorf("failed to wait for fault declaration: %w", err) + } + + if mw.Receipt.ExitCode != 0 { + log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID) + return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode) + } + + return ctx.Send(SectorFaultedFinal{}) +} diff --git a/sector_types.go b/sector_types.go new file mode 100644 index 000000000..c2d6b3359 --- /dev/null +++ b/sector_types.go @@ -0,0 +1,106 @@ +package sealing + +import ( + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" +) + +type SealTicket struct { + BlockHeight uint64 + TicketBytes []byte +} + +func (t *SealTicket) SB() sectorbuilder.SealTicket { + out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight} + copy(out.TicketBytes[:], t.TicketBytes) + return out +} + +type SealSeed struct { + BlockHeight uint64 + TicketBytes []byte +} + +func (t *SealSeed) SB() sectorbuilder.SealSeed { + out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight} + copy(out.TicketBytes[:], t.TicketBytes) + return out +} + +type Piece struct { + DealID uint64 + + Size uint64 + CommP []byte +} + +func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) { + out.Size = p.Size + copy(out.CommP[:], p.CommP) + return out +} + +type SectorInfo struct { + State api.SectorState + SectorID uint64 + Nonce uint64 + + // Packing + + Pieces []Piece + + // PreCommit + CommD []byte + CommR []byte + Proof []byte + Ticket SealTicket + + PreCommitMessage *cid.Cid + + // PreCommitted + Seed SealSeed + + // Committing + CommitMessage *cid.Cid + + // Faults + FaultReportMsg *cid.Cid + + // Debug + LastErr string +} + +func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo { + out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.ppi() + } + return out +} + +func (t *SectorInfo) deals() []uint64 { + out := make([]uint64, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.DealID + } + return out +} + +func (t *SectorInfo) existingPieces() []uint64 { + out := make([]uint64, len(t.Pieces)) + for i, piece := range t.Pieces { + out[i] = piece.Size + } + return out +} + +func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { + var out sectorbuilder.RawSealPreCommitOutput + + copy(out.CommD[:], t.CommD) + copy(out.CommR[:], t.CommR) + + return out +} diff --git a/sector_utils.go b/sector_utils.go new file mode 100644 index 000000000..8fa887d3c --- /dev/null +++ b/sector_utils.go @@ -0,0 +1,57 @@ +package sealing + +import ( + "math/bits" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" +) + +func fillersFromRem(toFill uint64) ([]uint64, error) { + // Convert to in-sector bytes for easier math: + // + // Sector size to user bytes ratio is constant, e.g. for 1024B we have 1016B + // of user-usable data. + // + // (1024/1016 = 128/127) + // + // Given that we can get sector size by simply adding 1/127 of the user + // bytes + // + // (we convert to sector bytes as they are nice round binary numbers) + + toFill += toFill / 127 + + // We need to fill the sector with pieces that are powers of 2. Conveniently + // computers store numbers in binary, which means we can look at 1s to get + // all the piece sizes we need to fill the sector. It also means that number + // of pieces is the number of 1s in the number of remaining bytes to fill + out := make([]uint64, bits.OnesCount64(toFill)) + for i := range out { + // Extract the next lowest non-zero bit + next := bits.TrailingZeros64(toFill) + psize := uint64(1) << next + // e.g: if the number is 0b010100, psize will be 0b000100 + + // set that bit to 0 by XORing it, so the next iteration looks at the + // next bit + toFill ^= psize + + // Add the piece size to the list of pieces we need to create + out[i] = sectorbuilder.UserBytesForSectorSize(psize) + } + return out, nil +} + +func (m *Sealing) ListSectors() ([]SectorInfo, error) { + var sectors []SectorInfo + if err := m.sectors.List(§ors); err != nil { + return nil, err + } + return sectors, nil +} + +func (m *Sealing) GetSectorInfo(sid uint64) (SectorInfo, error) { + var out SectorInfo + err := m.sectors.Get(sid).Get(&out) + return out, err +} diff --git a/sector_utils_test.go b/sector_utils_test.go new file mode 100644 index 000000000..02746a3d8 --- /dev/null +++ b/sector_utils_test.go @@ -0,0 +1,46 @@ +package sealing + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" +) + +func testFill(t *testing.T, n uint64, exp []uint64) { + f, err := fillersFromRem(n) + assert.NoError(t, err) + assert.Equal(t, exp, f) + + var sum uint64 + for _, u := range f { + sum += u + } + assert.Equal(t, n, sum) +} + +func TestFillersFromRem(t *testing.T) { + for i := 8; i < 32; i++ { + // single + ub := sectorbuilder.UserBytesForSectorSize(uint64(1) << i) + testFill(t, ub, []uint64{ub}) + + // 2 + ub = sectorbuilder.UserBytesForSectorSize(uint64(5) << i) + ub1 := sectorbuilder.UserBytesForSectorSize(uint64(1) << i) + ub3 := sectorbuilder.UserBytesForSectorSize(uint64(4) << i) + testFill(t, ub, []uint64{ub1, ub3}) + + // 4 + ub = sectorbuilder.UserBytesForSectorSize(uint64(15) << i) + ub2 := sectorbuilder.UserBytesForSectorSize(uint64(2) << i) + ub4 := sectorbuilder.UserBytesForSectorSize(uint64(8) << i) + testFill(t, ub, []uint64{ub1, ub2, ub3, ub4}) + + // different 2 + ub = sectorbuilder.UserBytesForSectorSize(uint64(9) << i) + testFill(t, ub, []uint64{ub1, ub4}) + } + +} diff --git a/sectors.go b/sectors.go new file mode 100644 index 000000000..c915c27f1 --- /dev/null +++ b/sectors.go @@ -0,0 +1,136 @@ +package sealing + +import ( + "context" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "io" + + "github.com/filecoin-project/lotus/lib/padreader" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +const SectorStorePrefix = "/sectors" + +var log = logging.Logger("sectors") + +type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) + +type sectorsApi interface { // TODO: trim down + // Call a read only method on actors (no interaction with the chain required) + StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) + StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) + StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) + StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) + StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) + StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually + StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) + StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) + + MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) + + ChainHead(context.Context) (*types.TipSet, error) + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) + ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + + WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) +} + +type Sealing struct { + api sectorsApi + events *events.Events + + maddr address.Address + worker address.Address + + sb sectorbuilder.Interface + sectors *statemachine.StateGroup + tktFn TicketFn +} + +func New(api sectorsApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing { + s := &Sealing{ + api: api, + events: events, + + maddr: maddr, + worker: worker, + sb: sb, + tktFn: tktFn, + } + + s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) + + return s +} + +func (m *Sealing) Run(ctx context.Context) error { + m.events = events.NewEvents(ctx, m.api) + + if err := m.restartSectors(ctx); err != nil { + log.Errorf("%+v", err) + return xerrors.Errorf("failed load sector states: %w", err) + } + + return nil +} + +func (m *Sealing) Stop(ctx context.Context) error { + return m.sectors.Stop(ctx) +} + +func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { + if padreader.PaddedSize(size) != size { + return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") + } + + sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector + if err != nil { + return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) + } + + // offset hard-coded to 0 since we only put one thing in a sector for now + return sid, 0, nil +} + +func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { + log.Infof("Seal piece for deal %d", dealID) + + ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{}) + if err != nil { + return xerrors.Errorf("adding piece to sector: %w", err) + } + + return m.newSector(ctx, sectorID, dealID, ppi) +} + +func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { + return m.sectors.Send(sid, SectorStart{ + id: sid, + pieces: []Piece{ + { + DealID: dealID, + + Size: ppi.Size, + CommP: ppi.CommP[:], + }, + }, + }) +} + diff --git a/sectors_fsm_test.go b/sectors_fsm_test.go new file mode 100644 index 000000000..3eda54860 --- /dev/null +++ b/sectors_fsm_test.go @@ -0,0 +1,79 @@ +package sealing + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +func (t *test) planSingle(evt interface{}) { + _, err := t.m.plan([]statemachine.Event{{evt}}, t.state) + require.NoError(t.t, err) +} + +type test struct { + m *Miner + t *testing.T + state *SectorInfo + + next func(statemachine.Context, SectorInfo) error +} + +func TestHappyPath(t *testing.T) { + m := test{ + m: &Miner{}, + t: t, + state: &SectorInfo{State: api.Packing}, + } + + m.planSingle(SectorPacked{}) + require.Equal(m.t, m.state.State, api.Unsealed) + + m.planSingle(SectorSealed{}) + require.Equal(m.t, m.state.State, api.PreCommitting) + + m.planSingle(SectorPreCommitted{}) + require.Equal(m.t, m.state.State, api.PreCommitted) + + m.planSingle(SectorSeedReady{}) + require.Equal(m.t, m.state.State, api.Committing) + + m.planSingle(SectorCommitted{}) + require.Equal(m.t, m.state.State, api.CommitWait) + + m.planSingle(SectorProving{}) + require.Equal(m.t, m.state.State, api.Proving) +} + +func TestSeedRevert(t *testing.T) { + m := test{ + m: &Miner{}, + t: t, + state: &SectorInfo{State: api.Packing}, + } + + m.planSingle(SectorPacked{}) + require.Equal(m.t, m.state.State, api.Unsealed) + + m.planSingle(SectorSealed{}) + require.Equal(m.t, m.state.State, api.PreCommitting) + + m.planSingle(SectorPreCommitted{}) + require.Equal(m.t, m.state.State, api.PreCommitted) + + m.planSingle(SectorSeedReady{}) + require.Equal(m.t, m.state.State, api.Committing) + + _, err := m.m.plan([]statemachine.Event{{SectorSeedReady{}}, {SectorCommitted{}}}, m.state) + require.NoError(t, err) + require.Equal(m.t, m.state.State, api.Committing) + + m.planSingle(SectorCommitted{}) + require.Equal(m.t, m.state.State, api.CommitWait) + + m.planSingle(SectorProving{}) + require.Equal(m.t, m.state.State, api.Proving) +} diff --git a/sectors_test.go b/sectors_test.go new file mode 100644 index 000000000..f14051a15 --- /dev/null +++ b/sectors_test.go @@ -0,0 +1,56 @@ +package sealing + +import ( + "bytes" + "testing" + + "gotest.tools/assert" + + "github.com/filecoin-project/go-cbor-util" +) + +func TestSectorInfoSelialization(t *testing.T) { + si := &SectorInfo{ + State: 123, + SectorID: 234, + Nonce: 345, + Pieces: []Piece{{ + DealID: 1234, + Size: 5, + CommP: []byte{3}, + }}, + CommD: []byte{32, 4}, + CommR: nil, + Proof: nil, + Ticket: SealTicket{ + BlockHeight: 345, + TicketBytes: []byte{87, 78, 7, 87}, + }, + PreCommitMessage: nil, + Seed: SealSeed{}, + CommitMessage: nil, + FaultReportMsg: nil, + LastErr: "hi", + } + + b, err := cborutil.Dump(si) + if err != nil { + t.Fatal(err) + } + + var si2 SectorInfo + if err := cborutil.ReadCborRPC(bytes.NewReader(b), &si); err != nil { + return + } + + assert.Equal(t, si.State, si2.State) + assert.Equal(t, si.Nonce, si2.Nonce) + assert.Equal(t, si.SectorID, si2.SectorID) + + assert.Equal(t, si.Pieces, si2.Pieces) + assert.Equal(t, si.CommD, si2.CommD) + assert.Equal(t, si.Ticket, si2.Ticket) + + assert.Equal(t, si, si2) + +}