diff --git a/api/api_common.go b/api/api_common.go index 376aef86d..c83a4c260 100644 --- a/api/api_common.go +++ b/api/api_common.go @@ -49,4 +49,4 @@ type Version struct { func (v Version) String() string { vM, vm, vp := build.VersionInts(v.APIVersion) return fmt.Sprintf("%s+api%d.%d.%d", v.Version, vM, vm, vp) -} \ No newline at end of file +} diff --git a/api/api_storage.go b/api/api_storage.go index 166f33b4a..7a392455f 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -2,15 +2,17 @@ package api import ( "context" + "fmt" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -type SectorState int +// alias because cbor-gen doesn't like non-alias types +type SectorState = uint64 const ( - Undefined SectorState = iota + UndefinedSectorState SectorState = iota Empty // TODO: Is this useful Packing // sector not in sealStore, and not on chain @@ -19,8 +21,33 @@ const ( PreCommitting // on chain pre-commit PreCommitted // waiting for seed Committing + Proving + + SectorNoUpdate = UndefinedSectorState ) +func SectorStateStr(s SectorState) string { + switch s { + case UndefinedSectorState: + return "UndefinedSectorState" + case Empty: + return "Empty" + case Packing: + return "Packing" + case Unsealed: + return "Unsealed" + case PreCommitting: + return "PreCommitting" + case PreCommitted: + return "PreCommitted" + case Committing: + return "Committing" + case Proving: + return "Proving" + } + return fmt.Sprintf("", s) +} + // StorageMiner is a low-level interface to the Filecoin network storage miner node type StorageMiner interface { Common diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index ab63b28a1..cb9c85959 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -1470,3 +1470,118 @@ func (t *StorageAsk) UnmarshalCBOR(r io.Reader) error { t.SeqNo = extra return nil } + +func (t *ExpTipSet) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{131}); err != nil { + return err + } + + // t.t.Cids ([]cid.Cid) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Cids)))); err != nil { + return err + } + for _, v := range t.Cids { + if err := cbg.WriteCid(w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.Cids: %w", err) + } + } + + // t.t.Blocks ([]*types.BlockHeader) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Blocks)))); err != nil { + return err + } + for _, v := range t.Blocks { + if err := v.MarshalCBOR(w); err != nil { + return err + } + } + + // t.t.Height (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.Height)); err != nil { + return err + } + return nil +} + +func (t *ExpTipSet) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.Cids ([]cid.Cid) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.Cids: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.Cids = make([]cid.Cid, extra) + } + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("reading cid field t.Cids failed: %w", err) + } + t.Cids[i] = c + } + + // t.t.Blocks ([]*types.BlockHeader) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.Blocks: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.Blocks = make([]*BlockHeader, extra) + } + for i := 0; i < int(extra); i++ { + + var v BlockHeader + if err := v.UnmarshalCBOR(br); err != nil { + return err + } + + t.Blocks[i] = &v + } + + // t.t.Height (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Height = extra + return nil +} diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 3733a6f09..0939fef1c 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "sort" "github.com/ipfs/go-cid" @@ -20,14 +21,14 @@ type TipSet struct { // why didnt i just export the fields? Because the struct has methods with the // same names already -type expTipSet struct { +type ExpTipSet struct { Cids []cid.Cid Blocks []*BlockHeader Height uint64 } func (ts *TipSet) MarshalJSON() ([]byte, error) { - return json.Marshal(expTipSet{ + return json.Marshal(ExpTipSet{ Cids: ts.cids, Blocks: ts.blks, Height: ts.height, @@ -35,7 +36,7 @@ func (ts *TipSet) MarshalJSON() ([]byte, error) { } func (ts *TipSet) UnmarshalJSON(b []byte) error { - var ets expTipSet + var ets ExpTipSet if err := json.Unmarshal(b, &ets); err != nil { return err } @@ -50,6 +51,30 @@ func (ts *TipSet) UnmarshalJSON(b []byte) error { return nil } +func (ts *TipSet) MarshalCBOR(w io.Writer) error { + return (&ExpTipSet{ + Cids: ts.cids, + Blocks: ts.blks, + Height: ts.height, + }).MarshalCBOR(w) +} + +func (ts *TipSet) UnmarshalCBOR(r io.Reader) error { + var ets ExpTipSet + if err := ets.UnmarshalCBOR(r); err != nil { + return err + } + + ots, err := NewTipSet(ets.Blocks) + if err != nil { + return err + } + + *ts = *ots + + return nil +} + func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool { return func(i, j int) bool { ti := blks[i].LastTicket() diff --git a/gen/main.go b/gen/main.go index a588dfadb..335aa0584 100644 --- a/gen/main.go +++ b/gen/main.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage" ) func main() { @@ -26,6 +27,7 @@ func main() { types.BlockMsg{}, types.SignedStorageAsk{}, types.StorageAsk{}, + types.ExpTipSet{}, ) if err != nil { fmt.Println(err) @@ -109,4 +111,13 @@ func main() { fmt.Println(err) os.Exit(1) } + + err = gen.WriteTupleEncodersToFile("./storage/cbor_gen.go", "storage", + storage.SealTicket{}, + storage.SectorInfo{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } } diff --git a/lib/statestore/store.go b/lib/statestore/store.go index 2fd4a4384..52734bd2b 100644 --- a/lib/statestore/store.go +++ b/lib/statestore/store.go @@ -3,11 +3,13 @@ package statestore import ( "bytes" "fmt" - "github.com/filecoin-project/lotus/lib/cborrpc" + "reflect" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" "golang.org/x/xerrors" - "reflect" + + "github.com/filecoin-project/lotus/lib/cborrpc" ) type StateStore struct { @@ -81,7 +83,7 @@ func cborMutator(mutator interface{}) func([]byte) ([]byte, error) { } // mutator func(*T) error -func (st *StateStore) Mutate(i fmt.Stringer, mutator interface{}) error { +func (st *StateStore) Mutate(i interface{}, mutator interface{}) error { return st.mutate(i, cborMutator(mutator)) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b8b101ec3..d9ee34463 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -47,7 +47,7 @@ func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) error { return } - if err := sm.Miner.SealSector(ctx, sectorId); err != nil { + if err := sm.Miner.SealSector(context.TODO(), sectorId); err != nil { log.Error(err) return } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 73d55c5d1..a7580f84c 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -90,6 +90,7 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h OnStart: func(context.Context) error { return sm.Run(ctx) }, + OnStop: sm.Stop, }) return sm, nil diff --git a/storage/cbor_gen.go b/storage/cbor_gen.go new file mode 100644 index 000000000..518db6692 --- /dev/null +++ b/storage/cbor_gen.go @@ -0,0 +1,319 @@ +package storage + +import ( + "fmt" + "github.com/filecoin-project/lotus/chain/types" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" + "io" +) + +/* This file was generated by github.com/whyrusleeping/cbor-gen */ + +var _ = xerrors.Errorf + +func (t *SealTicket) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.t.BlockHeight (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.BlockHeight)); err != nil { + return err + } + + // t.t.TicketBytes ([]uint8) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.TicketBytes)))); err != nil { + return err + } + if _, err := w.Write(t.TicketBytes); err != nil { + return err + } + return nil +} + +func (t *SealTicket) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.BlockHeight (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.BlockHeight = extra + // t.t.TicketBytes ([]uint8) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.TicketBytes: array too large (%d)", extra) + } + + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.TicketBytes = make([]byte, extra) + if _, err := io.ReadFull(br, t.TicketBytes); err != nil { + return err + } + return nil +} + +func (t *SectorInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{137}); err != nil { + return err + } + + // t.t.State (api.SectorState) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil { + return err + } + + // t.t.SectorID (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorID)); err != nil { + return err + } + + // t.t.CommD ([]uint8) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommD)))); err != nil { + return err + } + if _, err := w.Write(t.CommD); err != nil { + return err + } + + // t.t.CommR ([]uint8) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommR)))); err != nil { + return err + } + if _, err := w.Write(t.CommR); err != nil { + return err + } + + // t.t.Ticket (storage.SealTicket) + if err := t.Ticket.MarshalCBOR(w); err != nil { + return err + } + + // t.t.PreCommitMessage (cid.Cid) + + if t.PreCommitMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PreCommitMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PreCommitMessage: %w", err) + } + } + + // t.t.RandHeight (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.RandHeight)); err != nil { + return err + } + + // t.t.RandTs (types.TipSet) + if err := t.RandTs.MarshalCBOR(w); err != nil { + return err + } + + // t.t.CommitMessage (cid.Cid) + + if t.CommitMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.CommitMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err) + } + } + + return nil +} + +func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 9 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.State (api.SectorState) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = extra + // t.t.SectorID (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.SectorID = extra + // t.t.CommD ([]uint8) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.CommD: array too large (%d)", extra) + } + + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.CommD = make([]byte, extra) + if _, err := io.ReadFull(br, t.CommD); err != nil { + return err + } + // t.t.CommR ([]uint8) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.CommR: array too large (%d)", extra) + } + + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.CommR = make([]byte, extra) + if _, err := io.ReadFull(br, t.CommR); err != nil { + return err + } + // t.t.Ticket (storage.SealTicket) + + { + + if err := t.Ticket.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.t.PreCommitMessage (cid.Cid) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PreCommitMessage: %w", err) + } + + t.PreCommitMessage = &c + } + + } + // t.t.RandHeight (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.RandHeight = extra + // t.t.RandTs (types.TipSet) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.RandTs = new(types.TipSet) + if err := t.RandTs.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + // t.t.CommitMessage (cid.Cid) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err) + } + + t.CommitMessage = &c + } + + } + return nil +} diff --git a/storage/miner.go b/storage/miner.go index bb3be8f8c..18449a42c 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,6 +2,8 @@ package storage import ( "context" + "github.com/filecoin-project/lotus/lib/statestore" + "github.com/ipfs/go-datastore/namespace" "sync" "github.com/ipfs/go-cid" @@ -15,7 +17,6 @@ import ( "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/storage/sector" ) @@ -26,19 +27,23 @@ const PoStConfidence = 3 type Miner struct { api storageMinerApi events *events.Events + h host.Host + secst *sector.Store - secst *sector.Store - - maddr address.Address - + maddr address.Address worker address.Address - h host.Host - - ds datastore.Batching - - schedLk sync.Mutex + // PoSt + postLk sync.Mutex schedPost uint64 + + // Sealing + sectors *statestore.StateStore + + sectorIncoming chan *SectorInfo + sectorUpdated chan sectorUpdate + stop chan struct{} + stopped chan struct{} } type storageMinerApi interface { @@ -72,8 +77,9 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto maddr: addr, h: h, - ds: ds, secst: secst, + + sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey("/sectors"))), }, nil } @@ -85,11 +91,18 @@ func (m *Miner) Run(ctx context.Context) error { m.events = events.NewEvents(ctx, m.api) go m.beginPosting(ctx) + go m.sectorStateLoop(ctx) return nil } -func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSealingStatus) error { - return m.SealSector(ctx, sinfo.SectorID) +func (m *Miner) Stop(ctx context.Context) error { + close(m.stop) + select { + case <-m.stopped: + return nil + case <-ctx.Done(): + return ctx.Err() + } } func (m *Miner) runPreflightChecks(ctx context.Context) error { diff --git a/storage/post.go b/storage/post.go index ee024acbb..9d3f892a7 100644 --- a/storage/post.go +++ b/storage/post.go @@ -30,10 +30,10 @@ func (m *Miner) beginPosting(ctx context.Context) { return } - m.schedLk.Lock() + m.postLk.Lock() if m.schedPost > 0 { log.Warnf("PoSts already running %d", m.schedPost) - m.schedLk.Unlock() + m.postLk.Unlock() return } @@ -42,7 +42,7 @@ func (m *Miner) beginPosting(ctx context.Context) { ppe, _ = actors.ProvingPeriodEnd(ppe, ts.Height()+1) m.schedPost = ppe - m.schedLk.Unlock() + m.postLk.Unlock() log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime) err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert @@ -71,16 +71,16 @@ func (m *Miner) scheduleNextPost(ppe uint64) { ppe = headPPE } - m.schedLk.Lock() + m.postLk.Lock() if m.schedPost >= ppe { // this probably can't happen log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe) - m.schedLk.Unlock() + m.postLk.Unlock() return } m.schedPost = ppe - m.schedLk.Unlock() + m.postLk.Unlock() log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime, "height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod) diff --git a/storage/sealing.go b/storage/sealing.go index 2a3d02d7f..651dbf6ca 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -2,154 +2,120 @@ package storage import ( "context" - "fmt" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" cid "github.com/ipfs/go-cid" - "github.com/pkg/errors" - "golang.org/x/xerrors" ) -func (m *Miner) SealSector(ctx context.Context, sid uint64) error { - log.Info("committing sector: ", sid) - - ssize, err := m.SectorSize(ctx) - if err != nil { - return xerrors.Errorf("failed to check out own sector size: %w", err) - } - - _ = ssize - - log.Info("performing sector replication...") - if err := m.secst.SealPreCommit(ctx, sid); err != nil { - return xerrors.Errorf("seal pre commit failed: %w", err) - } - - sinfo, err := m.secst.SectorStatus(sid) - if err != nil { - return xerrors.Errorf("failed to check status for sector %d: %w", sid, err) - } - - params := &actors.SectorPreCommitInfo{ - CommD: sinfo.CommD[:], - CommR: sinfo.CommR[:], - Epoch: sinfo.Ticket.BlockHeight, - - //DealIDs: deals, - SectorNumber: sinfo.SectorID, - } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return errors.Wrap(aerr, "could not serialize commit sector parameters") - } - - msg := &types.Message{ - To: m.maddr, - From: m.worker, - Method: actors.MAMethods.PreCommitSector, - Params: enc, - Value: types.NewInt(0), // TODO: need to ensure sufficient collateral - GasLimit: types.NewInt(1000000 /* i dont know help */), - GasPrice: types.NewInt(1), - } - - log.Info("submitting precommit for sector: ", sid) - smsg, err := m.api.MpoolPushMessage(ctx, msg) - if err != nil { - return errors.Wrap(err, "pushing message to mpool") - } - - go m.waitForPreCommitMessage(context.TODO(), sinfo.SectorID, smsg.Cid()) - - // TODO: maybe return a wait channel? - return nil +type SealTicket struct { + BlockHeight uint64 + TicketBytes []byte } -func (m *Miner) waitForPreCommitMessage(ctx context.Context, sid uint64, mcid cid.Cid) { - // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts - mw, err := m.api.StateWaitMsg(ctx, mcid) - if err != nil { - return - } +type SectorInfo struct { + State api.SectorState + SectorID uint64 - if mw.Receipt.ExitCode != 0 { - log.Error("sector precommit failed: ", mw.Receipt.ExitCode) - return - } + // PreCommit + CommD []byte + CommR []byte + Ticket SealTicket - randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - log.Infof("precommit for sector %d made it on chain, will start post computation at height %d", sid, randHeight) + PreCommitMessage *cid.Cid - err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error { - return m.scheduleComputeProof(ctx, sid, ts, randHeight) - }, func(ts *types.TipSet) error { - log.Warn("revert in interactive commit sector step") - return nil - }, 3, mw.TipSet.Height()+build.InteractivePoRepDelay) - if err != nil { - log.Warn("waitForPreCommitMessage ChainAt errored: ", err) - } + // PreCommitted + RandHeight uint64 + RandTs *types.TipSet + + // Committing + CommitMessage *cid.Cid } -func (m *Miner) scheduleComputeProof(ctx context.Context, sid uint64, ts *types.TipSet, rheight uint64) error { - log.Info("scheduling post computation...") +type sectorUpdate struct { + newState api.SectorState + id uint64 + err error + mut func(*SectorInfo) +} + +func (m *Miner) sectorStateLoop(ctx context.Context) { + // TODO: restore state + go func() { - rand, err := m.api.ChainGetRandomness(ctx, ts, nil, int(ts.Height()-rheight)) - if err != nil { - log.Error(fmt.Errorf("failed to get randomness for computing seal proof: %w", err)) - return - } + defer log.Warn("quitting deal provider loop") + defer close(m.stopped) - proof, err := m.secst.SealComputeProof(ctx, sid, rheight, rand) - if err != nil { - log.Error(fmt.Errorf("computing seal proof failed: %w", err)) - return + for { + select { + case sector := <-m.sectorIncoming: + m.onSectorIncoming(sector) + case update := <-m.sectorUpdated: + m.onSectorUpdated(ctx, update) + case <-m.stop: + return + } } - - params := &actors.SectorProveCommitInfo{ - Proof: proof, - SectorID: sid, - //DealIDs: deals, - } - - _ = params - enc, aerr := actors.SerializeParams(nil) - if aerr != nil { - log.Error(errors.Wrap(aerr, "could not serialize commit sector parameters")) - return - } - - msg := &types.Message{ - To: m.maddr, - From: m.worker, - Method: actors.MAMethods.ProveCommitSector, - Params: enc, - Value: types.NewInt(0), // TODO: need to ensure sufficient collateral - GasLimit: types.NewInt(1000000 /* i dont know help */), - GasPrice: types.NewInt(1), - } - - smsg, err := m.api.MpoolPushMessage(ctx, msg) - if err != nil { - log.Error(errors.Wrap(err, "pushing message to mpool")) - } - - // TODO: now wait for this to get included and handle errors? - mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) - if err != nil { - log.Errorf("failed to wait for porep inclusion: %s", err) - return - } - - if mw.Receipt.ExitCode != 0 { - log.Error("UNHANDLED: submitting sector proof failed") - return - } - - m.beginPosting(ctx) }() - - return nil +} + +func (m *Miner) onSectorIncoming(sector *SectorInfo) { + if err := m.sectors.Begin(sector.SectorID, sector); err != nil { + // We may have re-sent the proposal + log.Errorf("deal tracking failed: %s", err) + m.failSector(sector.SectorID, err) + return + } + + go func() { + m.sectorUpdated <- sectorUpdate{ + newState: api.Unsealed, + id: sector.SectorID, + } + }() +} + +func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { + log.Infof("Sector %s updated state to %s", update.id, api.SectorStateStr(update.newState)) + var sector SectorInfo + err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { + s.State = update.newState + sector = *s + return nil + }) + if update.err != nil { + log.Errorf("deal %s failed: %s", update.id, update.err) + m.failSector(update.id, update.err) + return + } + if err != nil { + m.failSector(update.id, err) + return + } + + switch update.newState { + case api.Unsealed: + m.handle(ctx, sector, m.sealPreCommit, api.PreCommitting) + case api.PreCommitting: + m.handle(ctx, sector, m.preCommit, api.PreCommitted) + case api.PreCommitted: + m.handle(ctx, sector, m.preCommitted, api.SectorNoUpdate) + case api.Committing: + m.handle(ctx, sector, m.committing, api.Proving) + } +} + +func (m *Miner) failSector(id uint64, err error) { + panic(err) // todo: better error handling strategy +} + +func (m *Miner) SealSector(ctx context.Context, sid uint64) error { + select { + case m.sectorIncoming <- &SectorInfo{ + State: api.UndefinedSectorState, + SectorID: sid, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/storage/sector/store.go b/storage/sector/store.go index 855fe2c39..52c45deef 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -131,19 +131,13 @@ func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) { } } -func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) error { +func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) (sectorbuilder.SealPreCommitOutput, error) { tkt, err := s.tktFn(ctx) if err != nil { - return err + return sectorbuilder.SealPreCommitOutput{}, err } - // TODO: That's not async, is it? - // - If not then we probably can drop this wait-for-seal hack below - _, err = s.sb.SealPreCommit(sectorID, *tkt) - if err != nil { - return err - } - return nil + return s.sb.SealPreCommit(sectorID, *tkt) } func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height uint64, rand []byte) ([]byte, error) { @@ -160,7 +154,7 @@ func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height ui return sco.Proof, nil } -func (s *Store) Commited() ([]sectorbuilder.SectorSealingStatus, error) { +func (s *Store) Committed() ([]sectorbuilder.SectorSealingStatus, error) { l, err := s.sb.GetAllStagedSectors() if err != nil { return nil, err diff --git a/storage/sector_states.go b/storage/sector_states.go new file mode 100644 index 000000000..ce4a8b28c --- /dev/null +++ b/storage/sector_states.go @@ -0,0 +1,183 @@ +package storage + +import ( + "context" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/pkg/errors" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" +) + +type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error) + +func (m *Miner) handle(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) { + go func() { + mut, err := cb(ctx, sector) + + if err == nil && next == api.SectorNoUpdate { + return + } + + select { + case m.sectorUpdated <- sectorUpdate{ + newState: next, + id: sector.SectorID, + err: err, + mut: mut, + }: + case <-m.stop: + } + }() +} + +func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { + log.Infow("performing sector replication...", "sector", sector.SectorID) + sinfo, err := m.secst.SealPreCommit(ctx, sector.SectorID) + if err != nil { + return nil, xerrors.Errorf("seal pre commit failed: %w", err) + } + + return func(info *SectorInfo) { + info.CommD = sinfo.CommD[:] + info.CommR = sinfo.CommR[:] + info.Ticket = SealTicket{ + BlockHeight: sinfo.Ticket.BlockHeight, + TicketBytes: sinfo.Ticket.TicketBytes[:], + } + }, nil +} + +func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { + params := &actors.SectorPreCommitInfo{ + CommD: sector.CommD, + CommR: sector.CommR, + Epoch: sector.Ticket.BlockHeight, + + SectorNumber: sector.SectorID, + } + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.PreCommitSector, + Params: enc, + Value: types.NewInt(0), // TODO: need to ensure sufficient collateral + GasLimit: types.NewInt(1000000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + log.Info("submitting precommit for sector: ", sector.SectorID) + smsg, err := m.api.MpoolPushMessage(ctx, msg) + if err != nil { + return nil, xerrors.Errorf("pushing message to mpool: %w", err) + } + + return func(info *SectorInfo) { + mcid := smsg.Cid() + info.PreCommitMessage = &mcid + }, nil +} + +func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { + // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts + mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage) + if err != nil { + return nil, err + } + + if mw.Receipt.ExitCode != 0 { + log.Error("sector precommit failed: ", mw.Receipt.ExitCode) + return nil, err + } + + randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay + log.Infof("precommit for sector %d made it on chain, will start post computation at height %d", sector.SectorID, randHeight) + + err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error { + m.sectorUpdated <- sectorUpdate{ + newState: api.Committing, + id: sector.SectorID, + mut: func(info *SectorInfo) { + info.RandHeight = randHeight + info.RandTs = ts + }, + } + + return nil + }, func(ts *types.TipSet) error { + log.Warn("revert in interactive commit sector step") + return nil + }, 3, mw.TipSet.Height()+build.InteractivePoRepDelay) + if err != nil { + log.Warn("waitForPreCommitMessage ChainAt errored: ", err) + } + + return nil, nil +} + +func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { + log.Info("scheduling seal proof computation...") + + rand, err := m.api.ChainGetRandomness(ctx, sector.RandTs, nil, int(sector.RandTs.Height()-sector.RandHeight)) + if err != nil { + return nil, xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + } + + proof, err := m.secst.SealComputeProof(ctx, sector.SectorID, sector.RandHeight, rand) + if err != nil { + return nil, xerrors.Errorf("computing seal proof failed: %w", err) + } + + params := &actors.SectorProveCommitInfo{ + Proof: proof, + SectorID: sector.SectorID, + //DealIDs: deals, + } + + _ = params + enc, aerr := actors.SerializeParams(nil) + if aerr != nil { + return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.ProveCommitSector, + Params: enc, + Value: types.NewInt(0), // TODO: need to ensure sufficient collateral + GasLimit: types.NewInt(1000000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + smsg, err := m.api.MpoolPushMessage(ctx, msg) + if err != nil { + log.Error(errors.Wrap(err, "pushing message to mpool")) + } + + // TODO: Separate state before this wait, so we persist message cid? + + mw, err := m.api.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return nil, xerrors.Errorf("failed to wait for porep inclusion: %w", err) + } + + if mw.Receipt.ExitCode != 0 { + log.Error("UNHANDLED: submitting sector proof failed") + return nil, xerrors.New("UNHANDLED: submitting sector proof failed") + } + + m.beginPosting(ctx) + + return func(info *SectorInfo) { + mcid := smsg.Cid() + info.CommitMessage = &mcid + }, nil +}