Use state store for sectors

This commit is contained in:
Łukasz Magiera 2019-11-01 14:58:48 +01:00
parent 080a84970c
commit 82344649d3
14 changed files with 830 additions and 174 deletions

View File

@ -2,15 +2,17 @@ package api
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type SectorState int
// alias because cbor-gen doesn't like non-alias types
type SectorState = uint64
const (
Undefined SectorState = iota
UndefinedSectorState SectorState = iota
Empty // TODO: Is this useful
Packing // sector not in sealStore, and not on chain
@ -19,8 +21,33 @@ const (
PreCommitting // on chain pre-commit
PreCommitted // waiting for seed
Committing
Proving
SectorNoUpdate = UndefinedSectorState
)
func SectorStateStr(s SectorState) string {
switch s {
case UndefinedSectorState:
return "UndefinedSectorState"
case Empty:
return "Empty"
case Packing:
return "Packing"
case Unsealed:
return "Unsealed"
case PreCommitting:
return "PreCommitting"
case PreCommitted:
return "PreCommitted"
case Committing:
return "Committing"
case Proving:
return "Proving"
}
return fmt.Sprintf("<Unknown %d>", s)
}
// StorageMiner is a low-level interface to the Filecoin network storage miner node
type StorageMiner interface {
Common

View File

@ -1470,3 +1470,118 @@ func (t *StorageAsk) UnmarshalCBOR(r io.Reader) error {
t.SeqNo = extra
return nil
}
func (t *ExpTipSet) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{131}); err != nil {
return err
}
// t.t.Cids ([]cid.Cid)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Cids)))); err != nil {
return err
}
for _, v := range t.Cids {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.Cids: %w", err)
}
}
// t.t.Blocks ([]*types.BlockHeader)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Blocks)))); err != nil {
return err
}
for _, v := range t.Blocks {
if err := v.MarshalCBOR(w); err != nil {
return err
}
}
// t.t.Height (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.Height)); err != nil {
return err
}
return nil
}
func (t *ExpTipSet) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Cids ([]cid.Cid)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.Cids: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Cids = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("reading cid field t.Cids failed: %w", err)
}
t.Cids[i] = c
}
// t.t.Blocks ([]*types.BlockHeader)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.Blocks: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Blocks = make([]*BlockHeader, extra)
}
for i := 0; i < int(extra); i++ {
var v BlockHeader
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Blocks[i] = &v
}
// t.t.Height (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Height = extra
return nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"sort"
"github.com/ipfs/go-cid"
@ -20,14 +21,14 @@ type TipSet struct {
// why didnt i just export the fields? Because the struct has methods with the
// same names already
type expTipSet struct {
type ExpTipSet struct {
Cids []cid.Cid
Blocks []*BlockHeader
Height uint64
}
func (ts *TipSet) MarshalJSON() ([]byte, error) {
return json.Marshal(expTipSet{
return json.Marshal(ExpTipSet{
Cids: ts.cids,
Blocks: ts.blks,
Height: ts.height,
@ -35,7 +36,7 @@ func (ts *TipSet) MarshalJSON() ([]byte, error) {
}
func (ts *TipSet) UnmarshalJSON(b []byte) error {
var ets expTipSet
var ets ExpTipSet
if err := json.Unmarshal(b, &ets); err != nil {
return err
}
@ -50,6 +51,30 @@ func (ts *TipSet) UnmarshalJSON(b []byte) error {
return nil
}
func (ts *TipSet) MarshalCBOR(w io.Writer) error {
return (&ExpTipSet{
Cids: ts.cids,
Blocks: ts.blks,
Height: ts.height,
}).MarshalCBOR(w)
}
func (ts *TipSet) UnmarshalCBOR(r io.Reader) error {
var ets ExpTipSet
if err := ets.UnmarshalCBOR(r); err != nil {
return err
}
ots, err := NewTipSet(ets.Blocks)
if err != nil {
return err
}
*ts = *ots
return nil
}
func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool {
return func(i, j int) bool {
ti := blks[i].LastTicket()

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage"
)
func main() {
@ -26,6 +27,7 @@ func main() {
types.BlockMsg{},
types.SignedStorageAsk{},
types.StorageAsk{},
types.ExpTipSet{},
)
if err != nil {
fmt.Println(err)
@ -109,4 +111,13 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./storage/cbor_gen.go", "storage",
storage.SealTicket{},
storage.SectorInfo{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

View File

@ -3,11 +3,13 @@ package statestore
import (
"bytes"
"fmt"
"github.com/filecoin-project/lotus/lib/cborrpc"
"reflect"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
"reflect"
"github.com/filecoin-project/lotus/lib/cborrpc"
)
type StateStore struct {
@ -81,7 +83,7 @@ func cborMutator(mutator interface{}) func([]byte) ([]byte, error) {
}
// mutator func(*T) error
func (st *StateStore) Mutate(i fmt.Stringer, mutator interface{}) error {
func (st *StateStore) Mutate(i interface{}, mutator interface{}) error {
return st.mutate(i, cborMutator(mutator))
}

View File

@ -47,7 +47,7 @@ func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) error {
return
}
if err := sm.Miner.SealSector(ctx, sectorId); err != nil {
if err := sm.Miner.SealSector(context.TODO(), sectorId); err != nil {
log.Error(err)
return
}

View File

@ -90,6 +90,7 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
OnStart: func(context.Context) error {
return sm.Run(ctx)
},
OnStop: sm.Stop,
})
return sm, nil

319
storage/cbor_gen.go Normal file
View File

@ -0,0 +1,319 @@
package storage
import (
"fmt"
"github.com/filecoin-project/lotus/chain/types"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
"io"
)
/* This file was generated by github.com/whyrusleeping/cbor-gen */
var _ = xerrors.Errorf
func (t *SealTicket) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{130}); err != nil {
return err
}
// t.t.BlockHeight (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.BlockHeight)); err != nil {
return err
}
// t.t.TicketBytes ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.TicketBytes)))); err != nil {
return err
}
if _, err := w.Write(t.TicketBytes); err != nil {
return err
}
return nil
}
func (t *SealTicket) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.BlockHeight (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.BlockHeight = extra
// t.t.TicketBytes ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.TicketBytes: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.TicketBytes = make([]byte, extra)
if _, err := io.ReadFull(br, t.TicketBytes); err != nil {
return err
}
return nil
}
func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{137}); err != nil {
return err
}
// t.t.State (api.SectorState)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.State)); err != nil {
return err
}
// t.t.SectorID (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorID)); err != nil {
return err
}
// t.t.CommD ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommD)))); err != nil {
return err
}
if _, err := w.Write(t.CommD); err != nil {
return err
}
// t.t.CommR ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommR)))); err != nil {
return err
}
if _, err := w.Write(t.CommR); err != nil {
return err
}
// t.t.Ticket (storage.SealTicket)
if err := t.Ticket.MarshalCBOR(w); err != nil {
return err
}
// t.t.PreCommitMessage (cid.Cid)
if t.PreCommitMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.PreCommitMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.PreCommitMessage: %w", err)
}
}
// t.t.RandHeight (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.RandHeight)); err != nil {
return err
}
// t.t.RandTs (types.TipSet)
if err := t.RandTs.MarshalCBOR(w); err != nil {
return err
}
// t.t.CommitMessage (cid.Cid)
if t.CommitMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.CommitMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.CommitMessage: %w", err)
}
}
return nil
}
func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 9 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.State (api.SectorState)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = extra
// t.t.SectorID (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = extra
// t.t.CommD ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.CommD: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommD = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommD); err != nil {
return err
}
// t.t.CommR ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.CommR: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommR = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommR); err != nil {
return err
}
// t.t.Ticket (storage.SealTicket)
{
if err := t.Ticket.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.t.PreCommitMessage (cid.Cid)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PreCommitMessage: %w", err)
}
t.PreCommitMessage = &c
}
}
// t.t.RandHeight (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.RandHeight = extra
// t.t.RandTs (types.TipSet)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.RandTs = new(types.TipSet)
if err := t.RandTs.UnmarshalCBOR(br); err != nil {
return err
}
}
}
// t.t.CommitMessage (cid.Cid)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err)
}
t.CommitMessage = &c
}
}
return nil
}

View File

@ -2,6 +2,8 @@ package storage
import (
"context"
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/ipfs/go-datastore/namespace"
"sync"
"github.com/ipfs/go-cid"
@ -15,7 +17,6 @@ import (
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/storage/sector"
)
@ -26,19 +27,23 @@ const PoStConfidence = 3
type Miner struct {
api storageMinerApi
events *events.Events
h host.Host
secst *sector.Store
maddr address.Address
worker address.Address
h host.Host
ds datastore.Batching
schedLk sync.Mutex
// PoSt
postLk sync.Mutex
schedPost uint64
// Sealing
sectors *statestore.StateStore
sectorIncoming chan *SectorInfo
sectorUpdated chan sectorUpdate
stop chan struct{}
stopped chan struct{}
}
type storageMinerApi interface {
@ -72,8 +77,9 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto
maddr: addr,
h: h,
ds: ds,
secst: secst,
sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey("/sectors"))),
}, nil
}
@ -85,11 +91,18 @@ func (m *Miner) Run(ctx context.Context) error {
m.events = events.NewEvents(ctx, m.api)
go m.beginPosting(ctx)
go m.sectorStateLoop(ctx)
return nil
}
func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSealingStatus) error {
return m.SealSector(ctx, sinfo.SectorID)
func (m *Miner) Stop(ctx context.Context) error {
close(m.stop)
select {
case <-m.stopped:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (m *Miner) runPreflightChecks(ctx context.Context) error {

View File

@ -30,10 +30,10 @@ func (m *Miner) beginPosting(ctx context.Context) {
return
}
m.schedLk.Lock()
m.postLk.Lock()
if m.schedPost > 0 {
log.Warnf("PoSts already running %d", m.schedPost)
m.schedLk.Unlock()
m.postLk.Unlock()
return
}
@ -42,7 +42,7 @@ func (m *Miner) beginPosting(ctx context.Context) {
ppe, _ = actors.ProvingPeriodEnd(ppe, ts.Height()+1)
m.schedPost = ppe
m.schedLk.Unlock()
m.postLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime)
err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert
@ -71,16 +71,16 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
ppe = headPPE
}
m.schedLk.Lock()
m.postLk.Lock()
if m.schedPost >= ppe {
// this probably can't happen
log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe)
m.schedLk.Unlock()
m.postLk.Unlock()
return
}
m.schedPost = ppe
m.schedLk.Unlock()
m.postLk.Unlock()
log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime,
"height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod)

View File

@ -2,154 +2,120 @@ package storage
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cid "github.com/ipfs/go-cid"
"github.com/pkg/errors"
"golang.org/x/xerrors"
)
func (m *Miner) SealSector(ctx context.Context, sid uint64) error {
log.Info("committing sector: ", sid)
ssize, err := m.SectorSize(ctx)
if err != nil {
return xerrors.Errorf("failed to check out own sector size: %w", err)
}
_ = ssize
log.Info("performing sector replication...")
if err := m.secst.SealPreCommit(ctx, sid); err != nil {
return xerrors.Errorf("seal pre commit failed: %w", err)
}
sinfo, err := m.secst.SectorStatus(sid)
if err != nil {
return xerrors.Errorf("failed to check status for sector %d: %w", sid, err)
}
params := &actors.SectorPreCommitInfo{
CommD: sinfo.CommD[:],
CommR: sinfo.CommR[:],
Epoch: sinfo.Ticket.BlockHeight,
//DealIDs: deals,
SectorNumber: sinfo.SectorID,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return errors.Wrap(aerr, "could not serialize commit sector parameters")
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.PreCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("submitting precommit for sector: ", sid)
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return errors.Wrap(err, "pushing message to mpool")
}
go m.waitForPreCommitMessage(context.TODO(), sinfo.SectorID, smsg.Cid())
// TODO: maybe return a wait channel?
return nil
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
func (m *Miner) waitForPreCommitMessage(ctx context.Context, sid uint64, mcid cid.Cid) {
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
mw, err := m.api.StateWaitMsg(ctx, mcid)
if err != nil {
return
}
type SectorInfo struct {
State api.SectorState
SectorID uint64
if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
return
}
// PreCommit
CommD []byte
CommR []byte
Ticket SealTicket
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay
log.Infof("precommit for sector %d made it on chain, will start post computation at height %d", sid, randHeight)
PreCommitMessage *cid.Cid
err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error {
return m.scheduleComputeProof(ctx, sid, ts, randHeight)
}, func(ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step")
return nil
}, 3, mw.TipSet.Height()+build.InteractivePoRepDelay)
if err != nil {
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
}
// PreCommitted
RandHeight uint64
RandTs *types.TipSet
// Committing
CommitMessage *cid.Cid
}
func (m *Miner) scheduleComputeProof(ctx context.Context, sid uint64, ts *types.TipSet, rheight uint64) error {
log.Info("scheduling post computation...")
type sectorUpdate struct {
newState api.SectorState
id uint64
err error
mut func(*SectorInfo)
}
func (m *Miner) sectorStateLoop(ctx context.Context) {
// TODO: restore state
go func() {
rand, err := m.api.ChainGetRandomness(ctx, ts, nil, int(ts.Height()-rheight))
if err != nil {
log.Error(fmt.Errorf("failed to get randomness for computing seal proof: %w", err))
defer log.Warn("quitting deal provider loop")
defer close(m.stopped)
for {
select {
case sector := <-m.sectorIncoming:
m.onSectorIncoming(sector)
case update := <-m.sectorUpdated:
m.onSectorUpdated(ctx, update)
case <-m.stop:
return
}
proof, err := m.secst.SealComputeProof(ctx, sid, rheight, rand)
if err != nil {
log.Error(fmt.Errorf("computing seal proof failed: %w", err))
return
}
params := &actors.SectorProveCommitInfo{
Proof: proof,
SectorID: sid,
//DealIDs: deals,
}
_ = params
enc, aerr := actors.SerializeParams(nil)
if aerr != nil {
log.Error(errors.Wrap(aerr, "could not serialize commit sector parameters"))
return
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.ProveCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
log.Error(errors.Wrap(err, "pushing message to mpool"))
}
// TODO: now wait for this to get included and handle errors?
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
log.Errorf("failed to wait for porep inclusion: %s", err)
return
}
if mw.Receipt.ExitCode != 0 {
log.Error("UNHANDLED: submitting sector proof failed")
return
}
m.beginPosting(ctx)
}()
return nil
}
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
m.failSector(sector.SectorID, err)
return
}
go func() {
m.sectorUpdated <- sectorUpdate{
newState: api.Unsealed,
id: sector.SectorID,
}
}()
}
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
log.Infof("Sector %s updated state to %s", update.id, api.SectorStateStr(update.newState))
var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState
sector = *s
return nil
})
if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err)
m.failSector(update.id, update.err)
return
}
if err != nil {
m.failSector(update.id, err)
return
}
switch update.newState {
case api.Unsealed:
m.handle(ctx, sector, m.sealPreCommit, api.PreCommitting)
case api.PreCommitting:
m.handle(ctx, sector, m.preCommit, api.PreCommitted)
case api.PreCommitted:
m.handle(ctx, sector, m.preCommitted, api.SectorNoUpdate)
case api.Committing:
m.handle(ctx, sector, m.committing, api.Proving)
}
}
func (m *Miner) failSector(id uint64, err error) {
panic(err) // todo: better error handling strategy
}
func (m *Miner) SealSector(ctx context.Context, sid uint64) error {
select {
case m.sectorIncoming <- &SectorInfo{
State: api.UndefinedSectorState,
SectorID: sid,
}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -131,19 +131,13 @@ func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) {
}
}
func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) error {
func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) (sectorbuilder.SealPreCommitOutput, error) {
tkt, err := s.tktFn(ctx)
if err != nil {
return err
return sectorbuilder.SealPreCommitOutput{}, err
}
// TODO: That's not async, is it?
// - If not then we probably can drop this wait-for-seal hack below
_, err = s.sb.SealPreCommit(sectorID, *tkt)
if err != nil {
return err
}
return nil
return s.sb.SealPreCommit(sectorID, *tkt)
}
func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height uint64, rand []byte) ([]byte, error) {
@ -160,7 +154,7 @@ func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height ui
return sco.Proof, nil
}
func (s *Store) Commited() ([]sectorbuilder.SectorSealingStatus, error) {
func (s *Store) Committed() ([]sectorbuilder.SectorSealingStatus, error) {
l, err := s.sb.GetAllStagedSectors()
if err != nil {
return nil, err

183
storage/sector_states.go Normal file
View File

@ -0,0 +1,183 @@
package storage
import (
"context"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/pkg/errors"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
)
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error)
func (m *Miner) handle(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) {
go func() {
mut, err := cb(ctx, sector)
if err == nil && next == api.SectorNoUpdate {
return
}
select {
case m.sectorUpdated <- sectorUpdate{
newState: next,
id: sector.SectorID,
err: err,
mut: mut,
}:
case <-m.stop:
}
}()
}
func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
log.Infow("performing sector replication...", "sector", sector.SectorID)
sinfo, err := m.secst.SealPreCommit(ctx, sector.SectorID)
if err != nil {
return nil, xerrors.Errorf("seal pre commit failed: %w", err)
}
return func(info *SectorInfo) {
info.CommD = sinfo.CommD[:]
info.CommR = sinfo.CommR[:]
info.Ticket = SealTicket{
BlockHeight: sinfo.Ticket.BlockHeight,
TicketBytes: sinfo.Ticket.TicketBytes[:],
}
}, nil
}
func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
params := &actors.SectorPreCommitInfo{
CommD: sector.CommD,
CommR: sector.CommR,
Epoch: sector.Ticket.BlockHeight,
SectorNumber: sector.SectorID,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.PreCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return nil, xerrors.Errorf("pushing message to mpool: %w", err)
}
return func(info *SectorInfo) {
mcid := smsg.Cid()
info.PreCommitMessage = &mcid
}, nil
}
func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
if err != nil {
return nil, err
}
if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
return nil, err
}
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay
log.Infof("precommit for sector %d made it on chain, will start post computation at height %d", sector.SectorID, randHeight)
err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error {
m.sectorUpdated <- sectorUpdate{
newState: api.Committing,
id: sector.SectorID,
mut: func(info *SectorInfo) {
info.RandHeight = randHeight
info.RandTs = ts
},
}
return nil
}, func(ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step")
return nil
}, 3, mw.TipSet.Height()+build.InteractivePoRepDelay)
if err != nil {
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
}
return nil, nil
}
func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
log.Info("scheduling seal proof computation...")
rand, err := m.api.ChainGetRandomness(ctx, sector.RandTs, nil, int(sector.RandTs.Height()-sector.RandHeight))
if err != nil {
return nil, xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
}
proof, err := m.secst.SealComputeProof(ctx, sector.SectorID, sector.RandHeight, rand)
if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err)
}
params := &actors.SectorProveCommitInfo{
Proof: proof,
SectorID: sector.SectorID,
//DealIDs: deals,
}
_ = params
enc, aerr := actors.SerializeParams(nil)
if aerr != nil {
return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.ProveCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
log.Error(errors.Wrap(err, "pushing message to mpool"))
}
// TODO: Separate state before this wait, so we persist message cid?
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, xerrors.Errorf("failed to wait for porep inclusion: %w", err)
}
if mw.Receipt.ExitCode != 0 {
log.Error("UNHANDLED: submitting sector proof failed")
return nil, xerrors.New("UNHANDLED: submitting sector proof failed")
}
m.beginPosting(ctx)
return func(info *SectorInfo) {
mcid := smsg.Cid()
info.CommitMessage = &mcid
}, nil
}