Merge pull request #1054 from filecoin-project/feat/event-states
Sector state machine changes
This commit is contained in:
commit
e94e64652b
@ -5,7 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
xerrors "golang.org/x/xerrors"
|
xerrors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
xerrors "golang.org/x/xerrors"
|
xerrors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
@ -11,7 +11,12 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
|
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
paramfetch "github.com/filecoin-project/go-paramfetch"
|
paramfetch "github.com/filecoin-project/go-paramfetch"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
badger "github.com/ipfs/go-ds-badger2"
|
badger "github.com/ipfs/go-ds-badger2"
|
||||||
@ -21,11 +26,6 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
lapi "github.com/filecoin-project/lotus/api"
|
lapi "github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
@ -38,6 +38,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var initCmd = &cli.Command{
|
var initCmd = &cli.Command{
|
||||||
@ -242,17 +243,17 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, sector := range meta.Sectors {
|
for _, sector := range meta.Sectors {
|
||||||
sectorKey := datastore.NewKey(storage.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
|
sectorKey := datastore.NewKey(sealing.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
|
||||||
|
|
||||||
dealID, err := findMarketDealID(ctx, api, sector.Deal)
|
dealID, err := findMarketDealID(ctx, api, sector.Deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding storage deal for pre-sealed sector %d: %w", sector.SectorID, err)
|
return xerrors.Errorf("finding storage deal for pre-sealed sector %d: %w", sector.SectorID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
info := &storage.SectorInfo{
|
info := &sealing.SectorInfo{
|
||||||
State: lapi.Proving,
|
State: lapi.Proving,
|
||||||
SectorID: sector.SectorID,
|
SectorID: sector.SectorID,
|
||||||
Pieces: []storage.Piece{
|
Pieces: []sealing.Piece{
|
||||||
{
|
{
|
||||||
DealID: dealID,
|
DealID: dealID,
|
||||||
Size: meta.SectorSize,
|
Size: meta.SectorSize,
|
||||||
@ -262,9 +263,9 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
|
|||||||
CommD: sector.CommD[:],
|
CommD: sector.CommD[:],
|
||||||
CommR: sector.CommR[:],
|
CommR: sector.CommR[:],
|
||||||
Proof: nil,
|
Proof: nil,
|
||||||
Ticket: storage.SealTicket{},
|
Ticket: sealing.SealTicket{},
|
||||||
PreCommitMessage: nil,
|
PreCommitMessage: nil,
|
||||||
Seed: storage.SealSeed{},
|
Seed: sealing.SealSeed{},
|
||||||
CommitMessage: nil,
|
CommitMessage: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
22
gen/main.go
22
gen/main.go
@ -10,8 +10,9 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -121,14 +122,23 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = gen.WriteMapEncodersToFile("./storage/cbor_gen.go", "storage",
|
err = gen.WriteMapEncodersToFile("./storage/sectors/cbor_gen.go", "sectors",
|
||||||
storage.SealTicket{},
|
sealing.SealTicket{},
|
||||||
storage.SealSeed{},
|
sealing.SealSeed{},
|
||||||
storage.Piece{},
|
sealing.Piece{},
|
||||||
storage.SectorInfo{},
|
sealing.SectorInfo{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = gen.WriteMapEncodersToFile("./lib/statemachine/cbor_gen.go", "statemachine",
|
||||||
|
statemachine.TestState{},
|
||||||
|
statemachine.TestEvent{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("%+v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
18
go.sum
18
go.sum
@ -106,11 +106,27 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.0.0/go.mod h1:PAZ5tvSfMfWE327osqFX
|
|||||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
|
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
|
||||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
|
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
|
||||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||||
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY=
|
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY=
|
||||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww=
|
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww=
|
||||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 h1:g3oodvSz+Ou+ObwcVBB2wyt8SHdWpwzMiNJ19U1zZNA=
|
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 h1:g3oodvSz+Ou+ObwcVBB2wyt8SHdWpwzMiNJ19U1zZNA=
|
||||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
|
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||||
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
|
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||||
@ -118,6 +134,8 @@ github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4
|
|||||||
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200114015900-4103afa82689 h1:2cT5bhm/5I0RY+HBIPdRRrtjCwLj33Qx6DHRs9TCslY=
|
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200114015900-4103afa82689 h1:2cT5bhm/5I0RY+HBIPdRRrtjCwLj33Qx6DHRs9TCslY=
|
||||||
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200114015900-4103afa82689/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
|
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200114015900-4103afa82689/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
|
||||||
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
|
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
|
||||||
|
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
|
||||||
|
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||||
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
||||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||||
|
238
lib/statemachine/cbor_gen.go
Normal file
238
lib/statemachine/cbor_gen.go
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
package statemachine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
xerrors "golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
|
||||||
|
|
||||||
|
var _ = xerrors.Errorf
|
||||||
|
|
||||||
|
func (t *TestState) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte{162}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.A (uint64) (uint64)
|
||||||
|
if len("A") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"A\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("A")))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte("A")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.A))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.B (uint64) (uint64)
|
||||||
|
if len("B") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"B\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("B")))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte("B")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.B))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestState) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajMap {
|
||||||
|
return fmt.Errorf("cbor input should be of type map")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra != 2 {
|
||||||
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
|
}
|
||||||
|
|
||||||
|
var name string
|
||||||
|
|
||||||
|
// t.A (uint64) (uint64)
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadString(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if name != "A" {
|
||||||
|
return fmt.Errorf("expected struct map entry %s to be A", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajUnsignedInt {
|
||||||
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
|
}
|
||||||
|
t.A = uint64(extra)
|
||||||
|
// t.B (uint64) (uint64)
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadString(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if name != "B" {
|
||||||
|
return fmt.Errorf("expected struct map entry %s to be B", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajUnsignedInt {
|
||||||
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
|
}
|
||||||
|
t.B = uint64(extra)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestEvent) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte{162}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.A (string) (string)
|
||||||
|
if len("A") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"A\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("A")))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte("A")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(t.A) > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field t.A was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.A)))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte(t.A)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.Val (uint64) (uint64)
|
||||||
|
if len("Val") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"Val\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Val")))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte("Val")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Val))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestEvent) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajMap {
|
||||||
|
return fmt.Errorf("cbor input should be of type map")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra != 2 {
|
||||||
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
|
}
|
||||||
|
|
||||||
|
var name string
|
||||||
|
|
||||||
|
// t.A (string) (string)
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadString(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if name != "A" {
|
||||||
|
return fmt.Errorf("expected struct map entry %s to be A", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadString(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.A = string(sval)
|
||||||
|
}
|
||||||
|
// t.Val (uint64) (uint64)
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadString(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
if name != "Val" {
|
||||||
|
return fmt.Errorf("expected struct map entry %s to be Val", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajUnsignedInt {
|
||||||
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
|
}
|
||||||
|
t.Val = uint64(extra)
|
||||||
|
return nil
|
||||||
|
}
|
16
lib/statemachine/context.go
Normal file
16
lib/statemachine/context.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package statemachine
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type Context struct {
|
||||||
|
ctx context.Context
|
||||||
|
send func(evt interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *Context) Context() context.Context {
|
||||||
|
return ctx.ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *Context) Send(evt interface{}) error {
|
||||||
|
return ctx.send(evt)
|
||||||
|
}
|
116
lib/statemachine/group.go
Normal file
116
lib/statemachine/group.go
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
package statemachine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-statestore"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StateHandler interface {
|
||||||
|
// returns
|
||||||
|
Plan(events []Event, user interface{}) (interface{}, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StateGroup manages a group of state machines sharing the same logic
|
||||||
|
type StateGroup struct {
|
||||||
|
sts *statestore.StateStore
|
||||||
|
hnd StateHandler
|
||||||
|
stateType reflect.Type
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
sms map[datastore.Key]*StateMachine
|
||||||
|
}
|
||||||
|
|
||||||
|
// stateType: T - (MyStateStruct{})
|
||||||
|
func New(ds datastore.Datastore, hnd StateHandler, stateType interface{}) *StateGroup {
|
||||||
|
return &StateGroup{
|
||||||
|
sts: statestore.New(ds),
|
||||||
|
hnd: hnd,
|
||||||
|
stateType: reflect.TypeOf(stateType),
|
||||||
|
|
||||||
|
sms: map[datastore.Key]*StateMachine{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send sends an event to machine identified by `id`.
|
||||||
|
// `evt` is going to be passed into StateHandler.Planner, in the events[].User param
|
||||||
|
//
|
||||||
|
// If a state machine with the specified id doesn't exits, it's created, and it's
|
||||||
|
// state is set to zero-value of stateType provided in group constructor
|
||||||
|
func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) {
|
||||||
|
s.lk.Lock()
|
||||||
|
defer s.lk.Unlock()
|
||||||
|
|
||||||
|
sm, exist := s.sms[statestore.ToKey(id)]
|
||||||
|
if !exist {
|
||||||
|
sm, err = s.loadOrCreate(id)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("loadOrCreate state: %w", err)
|
||||||
|
}
|
||||||
|
s.sms[statestore.ToKey(id)] = sm
|
||||||
|
}
|
||||||
|
|
||||||
|
return sm.send(Event{User: evt})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StateGroup) loadOrCreate(name interface{}) (*StateMachine, error) {
|
||||||
|
exists, err := s.sts.Has(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
userState := reflect.New(s.stateType).Interface()
|
||||||
|
|
||||||
|
err = s.sts.Begin(name, userState)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("saving initial state: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res := &StateMachine{
|
||||||
|
planner: s.hnd.Plan,
|
||||||
|
eventsIn: make(chan Event),
|
||||||
|
|
||||||
|
name: name,
|
||||||
|
st: s.sts.Get(name),
|
||||||
|
stateType: s.stateType,
|
||||||
|
|
||||||
|
stageDone: make(chan struct{}),
|
||||||
|
closing: make(chan struct{}),
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
go res.run()
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops all state machines in this group
|
||||||
|
func (s *StateGroup) Stop(ctx context.Context) error {
|
||||||
|
s.lk.Lock()
|
||||||
|
defer s.lk.Unlock()
|
||||||
|
|
||||||
|
for _, sm := range s.sms {
|
||||||
|
if err := sm.stop(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List outputs states of all state machines in this group
|
||||||
|
// out: *[]StateT
|
||||||
|
func (s *StateGroup) List(out interface{}) error {
|
||||||
|
return s.sts.List(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get gets state for a single state machine
|
||||||
|
func (s *StateGroup) Get(id interface{}) *statestore.StoredState {
|
||||||
|
return s.sts.Get(id)
|
||||||
|
}
|
127
lib/statemachine/machine.go
Normal file
127
lib/statemachine/machine.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package statemachine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-statestore"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("evtsm")
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
User interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: This probably should be returning an int indicating partial event processing
|
||||||
|
// (or something like errPartial(nEvents))
|
||||||
|
// returns func(ctx Context, st <T>) (func(*<T>), error), where <T> is the typeOf(User) param
|
||||||
|
type Planner func(events []Event, user interface{}) (interface{}, error)
|
||||||
|
|
||||||
|
type StateMachine struct {
|
||||||
|
planner Planner
|
||||||
|
eventsIn chan Event
|
||||||
|
|
||||||
|
name interface{}
|
||||||
|
st *statestore.StoredState
|
||||||
|
stateType reflect.Type
|
||||||
|
|
||||||
|
stageDone chan struct{}
|
||||||
|
closing chan struct{}
|
||||||
|
closed chan struct{}
|
||||||
|
|
||||||
|
busy int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsm *StateMachine) run() {
|
||||||
|
defer close(fsm.closed)
|
||||||
|
|
||||||
|
var pendingEvents []Event
|
||||||
|
|
||||||
|
for {
|
||||||
|
// NOTE: This requires at least one event to be sent to trigger a stage
|
||||||
|
// This means that after restarting the state machine users of this
|
||||||
|
// code must send a 'restart' event
|
||||||
|
select {
|
||||||
|
case evt := <-fsm.eventsIn:
|
||||||
|
pendingEvents = append(pendingEvents, evt)
|
||||||
|
case <-fsm.stageDone:
|
||||||
|
if len(pendingEvents) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
case <-fsm.closing:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if atomic.CompareAndSwapInt32(&fsm.busy, 0, 1) {
|
||||||
|
var nextStep interface{}
|
||||||
|
var ustate interface{}
|
||||||
|
|
||||||
|
err := fsm.mutateUser(func(user interface{}) (err error) {
|
||||||
|
nextStep, err = fsm.planner(pendingEvents, user)
|
||||||
|
ustate = user
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Executing event planner failed: %+v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pendingEvents = nil
|
||||||
|
|
||||||
|
if nextStep == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := Context{
|
||||||
|
ctx: context.TODO(),
|
||||||
|
send: func(evt interface{}) error {
|
||||||
|
return fsm.send(Event{User: evt})
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()})
|
||||||
|
|
||||||
|
if res[0].Interface() != nil {
|
||||||
|
log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.StoreInt32(&fsm.busy, 0)
|
||||||
|
fsm.stageDone <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsm *StateMachine) mutateUser(cb func(user interface{}) error) error {
|
||||||
|
mutt := reflect.FuncOf([]reflect.Type{reflect.PtrTo(fsm.stateType)}, []reflect.Type{reflect.TypeOf(new(error)).Elem()}, false)
|
||||||
|
|
||||||
|
mutf := reflect.MakeFunc(mutt, func(args []reflect.Value) (results []reflect.Value) {
|
||||||
|
err := cb(args[0].Interface())
|
||||||
|
return []reflect.Value{reflect.ValueOf(&err).Elem()}
|
||||||
|
})
|
||||||
|
|
||||||
|
return fsm.st.Mutate(mutf.Interface())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsm *StateMachine) send(evt Event) error {
|
||||||
|
fsm.eventsIn <- evt // TODO: ctx, at least
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsm *StateMachine) stop(ctx context.Context) error {
|
||||||
|
close(fsm.closing)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-fsm.closed:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
105
lib/statemachine/machine_test.go
Normal file
105
lib/statemachine/machine_test.go
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package statemachine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
"gotest.tools/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
logging.SetLogLevel("*", "INFO")
|
||||||
|
}
|
||||||
|
|
||||||
|
type testHandler struct {
|
||||||
|
t *testing.T
|
||||||
|
proceed chan struct{}
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHandler) Plan(events []Event, state interface{}) (interface{}, error) {
|
||||||
|
return t.plan(events, state.(*TestState))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHandler) plan(events []Event, state *TestState) (func(Context, TestState) error, error) {
|
||||||
|
for _, event := range events {
|
||||||
|
e := event.User.(*TestEvent)
|
||||||
|
switch e.A {
|
||||||
|
case "restart":
|
||||||
|
case "start":
|
||||||
|
state.A = 1
|
||||||
|
case "b":
|
||||||
|
state.A = 2
|
||||||
|
state.B = e.Val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch state.A {
|
||||||
|
case 1:
|
||||||
|
return t.step0, nil
|
||||||
|
case 2:
|
||||||
|
return t.step1, nil
|
||||||
|
default:
|
||||||
|
t.t.Fatal(state.A)
|
||||||
|
}
|
||||||
|
panic("how?")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHandler) step0(ctx Context, st TestState) error {
|
||||||
|
ctx.Send(&TestEvent{A: "b", Val: 55})
|
||||||
|
<-t.proceed
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testHandler) step1(ctx Context, st TestState) error {
|
||||||
|
assert.Equal(t.t, uint64(2), st.A)
|
||||||
|
|
||||||
|
close(t.done)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBasic(t *testing.T) {
|
||||||
|
for i := 0; i < 1000; i++ { // run a few times to expose any races
|
||||||
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
th := &testHandler{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
|
||||||
|
close(th.proceed)
|
||||||
|
smm := New(ds, th, TestState{})
|
||||||
|
|
||||||
|
if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil {
|
||||||
|
t.Fatalf("%+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-th.done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPersist(t *testing.T) {
|
||||||
|
for i := 0; i < 1000; i++ { // run a few times to expose any races
|
||||||
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
th := &testHandler{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
|
||||||
|
smm := New(ds, th, TestState{})
|
||||||
|
|
||||||
|
if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil {
|
||||||
|
t.Fatalf("%+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := smm.Stop(context.Background()); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
smm = New(ds, th, TestState{})
|
||||||
|
if err := smm.Send(uint64(2), &TestEvent{A: "restart"}); err != nil {
|
||||||
|
t.Fatalf("%+v", err)
|
||||||
|
}
|
||||||
|
close(th.proceed)
|
||||||
|
|
||||||
|
<-th.done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ StateHandler = &testHandler{}
|
11
lib/statemachine/testing.go
Normal file
11
lib/statemachine/testing.go
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package statemachine
|
||||||
|
|
||||||
|
type TestState struct {
|
||||||
|
A uint64
|
||||||
|
B uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestEvent struct {
|
||||||
|
A string
|
||||||
|
Val uint64
|
||||||
|
}
|
@ -50,6 +50,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
"github.com/filecoin-project/lotus/peermgr"
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -245,7 +246,7 @@ func Online() Option {
|
|||||||
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
|
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
|
||||||
Override(new(sectorbuilder.Interface), modules.SectorBuilder),
|
Override(new(sectorbuilder.Interface), modules.SectorBuilder),
|
||||||
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
||||||
Override(new(storage.TicketFn), modules.SealTicketGen),
|
Override(new(sealing.TicketFn), modules.SealTicketGen),
|
||||||
Override(new(*storage.Miner), modules.StorageMiner),
|
Override(new(*storage.Miner), modules.StorageMiner),
|
||||||
|
|
||||||
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
||||||
|
@ -206,7 +206,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error {
|
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
return sm.Miner.UpdateSectorState(ctx, id, storage.NonceIncrement, state)
|
return sm.Miner.ForceSectorState(ctx, id, state)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
|
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
|
||||||
|
@ -5,7 +5,16 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||||
|
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
|
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
|
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
paramfetch "github.com/filecoin-project/go-paramfetch"
|
paramfetch "github.com/filecoin-project/go-paramfetch"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/filecoin-project/go-statestore"
|
||||||
"github.com/ipfs/go-bitswap"
|
"github.com/ipfs/go-bitswap"
|
||||||
"github.com/ipfs/go-bitswap/network"
|
"github.com/ipfs/go-bitswap/network"
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
@ -23,25 +32,16 @@ import (
|
|||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
||||||
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
||||||
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
"github.com/filecoin-project/go-statestore"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/gen"
|
"github.com/filecoin-project/lotus/chain/gen"
|
||||||
|
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
"github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -98,7 +98,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn storage.TicketFn) (*storage.Miner, error) {
|
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*storage.Miner, error) {
|
||||||
maddr, err := minerAddrFromDS(ds)
|
maddr, err := minerAddrFromDS(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -241,7 +241,7 @@ func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuil
|
|||||||
return sb, nil
|
return sb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SealTicketGen(api api.FullNode) storage.TicketFn {
|
func SealTicketGen(api api.FullNode) sealing.TicketFn {
|
||||||
return func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
|
return func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
|
||||||
ts, err := api.ChainHead(ctx)
|
ts, err := api.ChainHead(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -5,45 +5,39 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
"github.com/filecoin-project/go-statestore"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/events"
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
"github.com/filecoin-project/lotus/chain/gen"
|
"github.com/filecoin-project/lotus/chain/gen"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("storageminer")
|
var log = logging.Logger("storageminer")
|
||||||
|
|
||||||
const SectorStorePrefix = "/sectors"
|
|
||||||
|
|
||||||
type Miner struct {
|
type Miner struct {
|
||||||
api storageMinerApi
|
api storageMinerApi
|
||||||
events *events.Events
|
h host.Host
|
||||||
h host.Host
|
sb sectorbuilder.Interface
|
||||||
|
ds datastore.Batching
|
||||||
|
tktFn sealing.TicketFn
|
||||||
|
|
||||||
maddr address.Address
|
maddr address.Address
|
||||||
worker address.Address
|
worker address.Address
|
||||||
|
|
||||||
// Sealing
|
sealing *sealing.Sealing
|
||||||
sb sectorbuilder.Interface
|
|
||||||
sectors *statestore.StateStore
|
|
||||||
tktFn TicketFn
|
|
||||||
|
|
||||||
sectorIncoming chan *SectorInfo
|
stop chan struct{}
|
||||||
sectorUpdated chan sectorUpdate
|
stopped chan struct{}
|
||||||
stop chan struct{}
|
|
||||||
stopped chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type storageMinerApi interface {
|
type storageMinerApi interface {
|
||||||
@ -71,22 +65,21 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) (*Miner, error) {
|
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) {
|
||||||
return &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
|
|
||||||
maddr: addr,
|
|
||||||
h: h,
|
h: h,
|
||||||
sb: sb,
|
sb: sb,
|
||||||
|
ds: ds,
|
||||||
tktFn: tktFn,
|
tktFn: tktFn,
|
||||||
|
|
||||||
sectors: statestore.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix))),
|
maddr: addr,
|
||||||
|
|
||||||
sectorIncoming: make(chan *SectorInfo),
|
stop: make(chan struct{}),
|
||||||
sectorUpdated: make(chan sectorUpdate),
|
stopped: make(chan struct{}),
|
||||||
stop: make(chan struct{}),
|
}
|
||||||
stopped: make(chan struct{}),
|
|
||||||
}, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Run(ctx context.Context) error {
|
func (m *Miner) Run(ctx context.Context) error {
|
||||||
@ -94,25 +87,25 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
return xerrors.Errorf("miner preflight checks failed: %w", err)
|
return xerrors.Errorf("miner preflight checks failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.events = events.NewEvents(ctx, m.api)
|
|
||||||
|
|
||||||
fps := &fpostScheduler{
|
fps := &fpostScheduler{
|
||||||
api: m.api,
|
api: m.api,
|
||||||
sb: m.sb,
|
sb: m.sb,
|
||||||
|
|
||||||
actor: m.maddr,
|
actor: m.maddr,
|
||||||
worker: m.worker,
|
worker: m.worker,
|
||||||
}
|
}
|
||||||
|
|
||||||
go fps.run(ctx)
|
go fps.run(ctx)
|
||||||
if err := m.sectorStateLoop(ctx); err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
evts := events.NewEvents(ctx, m.api)
|
||||||
return xerrors.Errorf("failed to startup sector state loop: %w", err)
|
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn)
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Stop(ctx context.Context) error {
|
func (m *Miner) Stop(ctx context.Context) error {
|
||||||
|
defer m.sealing.Stop(ctx)
|
||||||
|
|
||||||
close(m.stop)
|
close(m.stop)
|
||||||
select {
|
select {
|
||||||
case <-m.stopped:
|
case <-m.stopped:
|
||||||
|
35
storage/sealing.go
Normal file
35
storage/sealing.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: refactor this to be direct somehow
|
||||||
|
|
||||||
|
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
||||||
|
return m.sealing.AllocatePiece(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
||||||
|
return m.sealing.SealPiece(ctx, size, r, sectorID, dealID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) ListSectors() ([]sealing.SectorInfo, error) {
|
||||||
|
return m.sealing.ListSectors()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) GetSectorInfo(sid uint64) (sealing.SectorInfo, error) {
|
||||||
|
return m.sealing.GetSectorInfo(sid)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) PledgeSector() error {
|
||||||
|
return m.sealing.PledgeSector()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
|
return m.sealing.ForceSectorState(ctx, id, state)
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
243
storage/sealing/fsm.go
Normal file
243
storage/sealing/fsm.go
Normal file
@ -0,0 +1,243 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) {
|
||||||
|
next, err := m.plan(events, user.(*SectorInfo))
|
||||||
|
if err != nil || next == nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(ctx statemachine.Context, si SectorInfo) error {
|
||||||
|
err := next(ctx, si)
|
||||||
|
if err != nil {
|
||||||
|
if err := ctx.Send(SectorFatalError{error: err}); err != nil {
|
||||||
|
return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{
|
||||||
|
api.UndefinedSectorState: planOne(on(SectorStart{}, api.Packing)),
|
||||||
|
api.Packing: planOne(on(SectorPacked{}, api.Unsealed)),
|
||||||
|
api.Unsealed: planOne(
|
||||||
|
on(SectorSealed{}, api.PreCommitting),
|
||||||
|
on(SectorSealFailed{}, api.SealFailed),
|
||||||
|
),
|
||||||
|
api.PreCommitting: planOne(
|
||||||
|
on(SectorPreCommitted{}, api.PreCommitted),
|
||||||
|
on(SectorPreCommitFailed{}, api.PreCommitFailed),
|
||||||
|
),
|
||||||
|
api.PreCommitted: planOne(
|
||||||
|
on(SectorSeedReady{}, api.Committing),
|
||||||
|
on(SectorPreCommitFailed{}, api.PreCommitFailed),
|
||||||
|
),
|
||||||
|
api.Committing: planCommitting,
|
||||||
|
api.CommitWait: planOne(on(SectorProving{}, api.Proving)),
|
||||||
|
|
||||||
|
api.Proving: planOne(
|
||||||
|
on(SectorFaultReported{}, api.FaultReported),
|
||||||
|
on(SectorFaulty{}, api.Faulty),
|
||||||
|
),
|
||||||
|
|
||||||
|
api.Faulty: planOne(
|
||||||
|
on(SectorFaultReported{}, api.FaultReported),
|
||||||
|
),
|
||||||
|
api.FaultedFinal: final,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
||||||
|
/////
|
||||||
|
// First process all events
|
||||||
|
|
||||||
|
p := fsmPlanners[state.State]
|
||||||
|
if p == nil {
|
||||||
|
return nil, xerrors.Errorf("planner for state %d not found", state.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p(events, state); err != nil {
|
||||||
|
return nil, xerrors.Errorf("running planner for state %s failed: %w", api.SectorStates[state.State], err)
|
||||||
|
}
|
||||||
|
|
||||||
|
/////
|
||||||
|
// Now decide what to do next
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
* Empty
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Packing <- incoming
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Unsealed <--> SealFailed
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
* PreCommitting <--> PreCommitFailed
|
||||||
|
| | ^
|
||||||
|
| v |
|
||||||
|
*<- PreCommitted ------/
|
||||||
|
| |||
|
||||||
|
| vvv v--> SealCommitFailed
|
||||||
|
*<- Committing
|
||||||
|
| | ^--> CommitFailed
|
||||||
|
| v ^
|
||||||
|
*<- CommitWait ---/
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Proving
|
||||||
|
|
|
||||||
|
v
|
||||||
|
FailedUnrecoverable
|
||||||
|
|
||||||
|
UndefinedSectorState <- ¯\_(ツ)_/¯
|
||||||
|
| ^
|
||||||
|
*---------------------/
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
switch state.State {
|
||||||
|
// Happy path
|
||||||
|
case api.Packing:
|
||||||
|
return m.handlePacking, nil
|
||||||
|
case api.Unsealed:
|
||||||
|
return m.handleUnsealed, nil
|
||||||
|
case api.PreCommitting:
|
||||||
|
return m.handlePreCommitting, nil
|
||||||
|
case api.PreCommitted:
|
||||||
|
return m.handlePreCommitted, nil
|
||||||
|
case api.Committing:
|
||||||
|
return m.handleCommitting, nil
|
||||||
|
case api.CommitWait:
|
||||||
|
return m.handleCommitWait, nil
|
||||||
|
case api.Proving:
|
||||||
|
// TODO: track sector health / expiration
|
||||||
|
log.Infof("Proving sector %d", state.SectorID)
|
||||||
|
|
||||||
|
// Handled failure modes
|
||||||
|
case api.SealFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID)
|
||||||
|
case api.PreCommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID)
|
||||||
|
case api.SealCommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID)
|
||||||
|
case api.CommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'CommitFailed'", state.SectorID)
|
||||||
|
|
||||||
|
// Faults
|
||||||
|
case api.Faulty:
|
||||||
|
return m.handleFaulty, nil
|
||||||
|
case api.FaultReported:
|
||||||
|
return m.handleFaultReported, nil
|
||||||
|
|
||||||
|
// Fatal errors
|
||||||
|
case api.UndefinedSectorState:
|
||||||
|
log.Error("sector update with undefined state!")
|
||||||
|
case api.FailedUnrecoverable:
|
||||||
|
log.Errorf("sector %d failed unrecoverably", state.SectorID)
|
||||||
|
default:
|
||||||
|
log.Errorf("unexpected sector update state: %d", state.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
for _, event := range events {
|
||||||
|
switch e := event.User.(type) {
|
||||||
|
case globalMutator:
|
||||||
|
if e.applyGlobal(state) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case SectorCommitted: // the normal case
|
||||||
|
e.apply(state)
|
||||||
|
state.State = api.CommitWait
|
||||||
|
case SectorSeedReady: // seed changed :/
|
||||||
|
if e.seed.Equals(&state.Seed) {
|
||||||
|
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
|
||||||
|
continue // or it didn't!
|
||||||
|
}
|
||||||
|
log.Warnf("planCommitting: commit Seed changed")
|
||||||
|
e.apply(state)
|
||||||
|
state.State = api.Committing
|
||||||
|
return nil
|
||||||
|
case SectorSealCommitFailed:
|
||||||
|
state.State = api.SealCommitFailed
|
||||||
|
case SectorSealFailed:
|
||||||
|
state.State = api.CommitFailed
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||||
|
trackedSectors, err := m.ListSectors()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("loading sector list: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sector := range trackedSectors {
|
||||||
|
if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil {
|
||||||
|
log.Errorf("restarting sector %d: %+v", sector.SectorID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Grab on-chain sector set and diff with trackedSectors
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
|
return m.sectors.Send(id, SectorForceState{state})
|
||||||
|
}
|
||||||
|
|
||||||
|
func final(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
return xerrors.Errorf("didn't expect any events in state %s, got %+v", api.SectorStates[state.State], events)
|
||||||
|
}
|
||||||
|
|
||||||
|
func on(mut mutator, next api.SectorState) func() (mutator, api.SectorState) {
|
||||||
|
return func() (mutator, api.SectorState) {
|
||||||
|
return mut, next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
return func(events []statemachine.Event, state *SectorInfo) error {
|
||||||
|
if len(events) != 1 {
|
||||||
|
for _, event := range events {
|
||||||
|
if gm, ok := event.User.(globalMutator); !ok {
|
||||||
|
gm.applyGlobal(state)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return xerrors.Errorf("planner for state %s only has a plan for a single event only, got %+v", api.SectorStates[state.State], events)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, t := range ts {
|
||||||
|
mut, next := t()
|
||||||
|
|
||||||
|
if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
events[0].User.(mutator).apply(state)
|
||||||
|
state.State = next
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return xerrors.Errorf("planner for state %s received unexpected event %+v", events[0])
|
||||||
|
}
|
||||||
|
}
|
125
storage/sealing/fsm_events.go
Normal file
125
storage/sealing/fsm_events.go
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mutator interface {
|
||||||
|
apply(state *SectorInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// globalMutator is an event which can apply in every state
|
||||||
|
type globalMutator interface {
|
||||||
|
// applyGlobal applies the event to the state. If if returns true,
|
||||||
|
// event processing should be interrupted
|
||||||
|
applyGlobal(state *SectorInfo) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Global events
|
||||||
|
|
||||||
|
type SectorRestart struct{}
|
||||||
|
|
||||||
|
func (evt SectorRestart) applyGlobal(*SectorInfo) bool { return false }
|
||||||
|
|
||||||
|
type SectorFatalError struct{ error }
|
||||||
|
|
||||||
|
func (evt SectorFatalError) applyGlobal(state *SectorInfo) bool {
|
||||||
|
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, evt.error)
|
||||||
|
// TODO: Do we want to mark the state as unrecoverable?
|
||||||
|
// I feel like this should be a softer error, where the user would
|
||||||
|
// be able to send a retry event of some kind
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorForceState struct {
|
||||||
|
state api.SectorState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorForceState) applyGlobal(state *SectorInfo) bool {
|
||||||
|
state.State = evt.state
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal path
|
||||||
|
|
||||||
|
type SectorStart struct {
|
||||||
|
id uint64
|
||||||
|
pieces []Piece
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorStart) apply(state *SectorInfo) {
|
||||||
|
state.SectorID = evt.id
|
||||||
|
state.Pieces = evt.pieces
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorPacked struct{ pieces []Piece }
|
||||||
|
|
||||||
|
func (evt SectorPacked) apply(state *SectorInfo) {
|
||||||
|
state.Pieces = append(state.Pieces, evt.pieces...)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSealed struct {
|
||||||
|
commR []byte
|
||||||
|
commD []byte
|
||||||
|
ticket SealTicket
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorSealed) apply(state *SectorInfo) {
|
||||||
|
state.CommD = evt.commD
|
||||||
|
state.CommR = evt.commR
|
||||||
|
state.Ticket = evt.ticket
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSealFailed struct{ error }
|
||||||
|
|
||||||
|
func (evt SectorSealFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorPreCommitFailed struct{ error }
|
||||||
|
|
||||||
|
func (evt SectorPreCommitFailed) apply(*SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorPreCommitted struct {
|
||||||
|
message cid.Cid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorPreCommitted) apply(state *SectorInfo) {
|
||||||
|
state.PreCommitMessage = &evt.message
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSeedReady struct {
|
||||||
|
seed SealSeed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorSeedReady) apply(state *SectorInfo) {
|
||||||
|
state.Seed = evt.seed
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSealCommitFailed struct{ error }
|
||||||
|
type SectorCommitFailed struct{ error }
|
||||||
|
type SectorCommitted struct {
|
||||||
|
message cid.Cid
|
||||||
|
proof []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorCommitted) apply(state *SectorInfo) {
|
||||||
|
state.Proof = evt.proof
|
||||||
|
state.CommitMessage = &evt.message
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorProving struct{}
|
||||||
|
|
||||||
|
func (evt SectorProving) apply(*SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorFaulty struct{}
|
||||||
|
|
||||||
|
func (evt SectorFaulty) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorFaultReported struct{ reportMsg cid.Cid }
|
||||||
|
|
||||||
|
func (evt SectorFaultReported) apply(state *SectorInfo) {
|
||||||
|
state.FaultReportMsg = &evt.reportMsg
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorFaultedFinal struct{}
|
85
storage/sealing/fsm_test.go
Normal file
85
storage/sealing/fsm_test.go
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
_ = logging.SetLogLevel("*", "INFO")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *test) planSingle(evt interface{}) {
|
||||||
|
_, err := t.s.plan([]statemachine.Event{{evt}}, t.state)
|
||||||
|
require.NoError(t.t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type test struct {
|
||||||
|
s *Sealing
|
||||||
|
t *testing.T
|
||||||
|
state *SectorInfo
|
||||||
|
|
||||||
|
next func(statemachine.Context, SectorInfo) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHappyPath(t *testing.T) {
|
||||||
|
m := test{
|
||||||
|
s: &Sealing{},
|
||||||
|
t: t,
|
||||||
|
state: &SectorInfo{State: api.Packing},
|
||||||
|
}
|
||||||
|
|
||||||
|
m.planSingle(SectorPacked{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Unsealed)
|
||||||
|
|
||||||
|
m.planSingle(SectorSealed{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitting)
|
||||||
|
|
||||||
|
m.planSingle(SectorPreCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitted)
|
||||||
|
|
||||||
|
m.planSingle(SectorSeedReady{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
|
m.planSingle(SectorCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||||
|
|
||||||
|
m.planSingle(SectorProving{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Proving)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSeedRevert(t *testing.T) {
|
||||||
|
m := test{
|
||||||
|
s: &Sealing{},
|
||||||
|
t: t,
|
||||||
|
state: &SectorInfo{State: api.Packing},
|
||||||
|
}
|
||||||
|
|
||||||
|
m.planSingle(SectorPacked{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Unsealed)
|
||||||
|
|
||||||
|
m.planSingle(SectorSealed{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitting)
|
||||||
|
|
||||||
|
m.planSingle(SectorPreCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitted)
|
||||||
|
|
||||||
|
m.planSingle(SectorSeedReady{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
|
_, err := m.s.plan([]statemachine.Event{{SectorSeedReady{seed: SealSeed{BlockHeight: 5}}}, {SectorCommitted{}}}, m.state)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
|
// not changing the seed this time
|
||||||
|
_, err = m.s.plan([]statemachine.Event{{SectorSeedReady{seed: SealSeed{BlockHeight: 5}}}, {SectorCommitted{}}}, m.state)
|
||||||
|
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||||
|
|
||||||
|
m.planSingle(SectorProving{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Proving)
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
||||||
if len(sizes) == 0 {
|
if len(sizes) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -98,7 +98,7 @@ func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPiece
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) PledgeSector() error {
|
func (m *Sealing) PledgeSector() error {
|
||||||
go func() {
|
go func() {
|
||||||
ctx := context.TODO() // we can't use the context from command which invokes
|
ctx := context.TODO() // we can't use the context from command which invokes
|
||||||
// this, as we run everything here async, and it's cancelled when the
|
// this, as we run everything here async, and it's cancelled when the
|
134
storage/sealing/sealing.go
Normal file
134
storage/sealing/sealing.go
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/filecoin-project/lotus/lib/padreader"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
const SectorStorePrefix = "/sectors"
|
||||||
|
|
||||||
|
var log = logging.Logger("sectors")
|
||||||
|
|
||||||
|
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
|
||||||
|
|
||||||
|
type sealingApi interface { // TODO: trim down
|
||||||
|
// Call a read only method on actors (no interaction with the chain required)
|
||||||
|
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
|
||||||
|
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
|
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
|
||||||
|
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
|
||||||
|
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
|
||||||
|
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
|
||||||
|
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
|
||||||
|
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
||||||
|
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
||||||
|
|
||||||
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||||
|
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
|
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
|
||||||
|
ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error)
|
||||||
|
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
|
||||||
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
||||||
|
|
||||||
|
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
|
||||||
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||||
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Sealing struct {
|
||||||
|
api sealingApi
|
||||||
|
events *events.Events
|
||||||
|
|
||||||
|
maddr address.Address
|
||||||
|
worker address.Address
|
||||||
|
|
||||||
|
sb sectorbuilder.Interface
|
||||||
|
sectors *statemachine.StateGroup
|
||||||
|
tktFn TicketFn
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing {
|
||||||
|
s := &Sealing{
|
||||||
|
api: api,
|
||||||
|
events: events,
|
||||||
|
|
||||||
|
maddr: maddr,
|
||||||
|
worker: worker,
|
||||||
|
sb: sb,
|
||||||
|
tktFn: tktFn,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Run(ctx context.Context) error {
|
||||||
|
m.events = events.NewEvents(ctx, m.api)
|
||||||
|
|
||||||
|
if err := m.restartSectors(ctx); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return xerrors.Errorf("failed load sector states: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Stop(ctx context.Context) error {
|
||||||
|
return m.sectors.Stop(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
||||||
|
if padreader.PaddedSize(size) != size {
|
||||||
|
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
||||||
|
}
|
||||||
|
|
||||||
|
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// offset hard-coded to 0 since we only put one thing in a sector for now
|
||||||
|
return sid, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
||||||
|
log.Infof("Seal piece for deal %d", dealID)
|
||||||
|
|
||||||
|
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("adding piece to sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.newSector(ctx, sectorID, dealID, ppi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
||||||
|
return m.sectors.Send(sid, SectorStart{
|
||||||
|
id: sid,
|
||||||
|
pieces: []Piece{
|
||||||
|
{
|
||||||
|
DealID: dealID,
|
||||||
|
|
||||||
|
Size: ppi.Size,
|
||||||
|
CommP: ppi.CommP[:],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
254
storage/sealing/states.go
Normal file
254
storage/sealing/states.go
Normal file
@ -0,0 +1,254 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
||||||
|
|
||||||
|
var allocated uint64
|
||||||
|
for _, piece := range sector.Pieces {
|
||||||
|
allocated += piece.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
|
||||||
|
|
||||||
|
if allocated > ubytes {
|
||||||
|
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fillerSizes) > 0 {
|
||||||
|
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
|
||||||
|
}
|
||||||
|
|
||||||
|
pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorPacked{pieces: pieces})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
||||||
|
ticket, err := m.tktFn(ctx.Context())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rspco, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, *ticket, sector.pieceInfos())
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorSealed{
|
||||||
|
commD: rspco.CommD[:],
|
||||||
|
commR: rspco.CommR[:],
|
||||||
|
ticket: SealTicket{
|
||||||
|
BlockHeight: ticket.BlockHeight,
|
||||||
|
TicketBytes: ticket.TicketBytes[:],
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
params := &actors.SectorPreCommitInfo{
|
||||||
|
SectorNumber: sector.SectorID,
|
||||||
|
|
||||||
|
CommR: sector.CommR,
|
||||||
|
SealEpoch: sector.Ticket.BlockHeight,
|
||||||
|
DealIDs: sector.deals(),
|
||||||
|
}
|
||||||
|
enc, aerr := actors.SerializeParams(params)
|
||||||
|
if aerr != nil {
|
||||||
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: m.maddr,
|
||||||
|
From: m.worker,
|
||||||
|
Method: actors.MAMethods.PreCommitSector,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||||
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
||||||
|
GasPrice: types.NewInt(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("submitting precommit for sector: ", sector.SectorID)
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
||||||
|
log.Info("Sector precommitted: ", sector.SectorID)
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorPreCommitFailed{err})
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
||||||
|
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
||||||
|
return ctx.Send(SectorPreCommitFailed{err})
|
||||||
|
}
|
||||||
|
log.Info("precommit message landed on chain: ", sector.SectorID)
|
||||||
|
|
||||||
|
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
|
||||||
|
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
|
||||||
|
|
||||||
|
err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH uint64) error {
|
||||||
|
rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), int64(randHeight))
|
||||||
|
if err != nil {
|
||||||
|
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
|
||||||
|
|
||||||
|
ctx.Send(SectorFatalError{error: err})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.Send(SectorSeedReady{seed: SealSeed{
|
||||||
|
BlockHeight: randHeight,
|
||||||
|
TicketBytes: rand,
|
||||||
|
}})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
log.Warn("revert in interactive commit sector step")
|
||||||
|
// TODO: need to cancel running process and restart...
|
||||||
|
return nil
|
||||||
|
}, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
|
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Consider splitting states and persist proof for faster recovery
|
||||||
|
|
||||||
|
params := &actors.SectorProveCommitInfo{
|
||||||
|
Proof: proof,
|
||||||
|
SectorID: sector.SectorID,
|
||||||
|
DealIDs: sector.deals(),
|
||||||
|
}
|
||||||
|
|
||||||
|
enc, aerr := actors.SerializeParams(params)
|
||||||
|
if aerr != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: m.maddr,
|
||||||
|
From: m.worker,
|
||||||
|
Method: actors.MAMethods.ProveCommitSector,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||||
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
||||||
|
GasPrice: types.NewInt(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: check seed / ticket are up to date
|
||||||
|
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorCommitted{
|
||||||
|
proof: proof,
|
||||||
|
message: smsg.Cid(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.CommitMessage == nil {
|
||||||
|
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
||||||
|
}
|
||||||
|
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
|
||||||
|
return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorProving{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// TODO: check if the fault has already been reported, and that this sector is even valid
|
||||||
|
|
||||||
|
// TODO: coalesce faulty sector reporting
|
||||||
|
bf := types.NewBitField()
|
||||||
|
bf.Set(sector.SectorID)
|
||||||
|
|
||||||
|
enc, aerr := actors.SerializeParams(&actors.DeclareFaultsParams{bf})
|
||||||
|
if aerr != nil {
|
||||||
|
return xerrors.Errorf("failed to serialize declare fault params: %w", aerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: m.maddr,
|
||||||
|
From: m.worker,
|
||||||
|
Method: actors.MAMethods.DeclareFaults,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||||
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
||||||
|
GasPrice: types.NewInt(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.FaultReportMsg == nil {
|
||||||
|
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
||||||
|
}
|
||||||
|
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
|
||||||
|
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorFaultedFinal{})
|
||||||
|
}
|
@ -1,16 +1,12 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
|
|
||||||
|
|
||||||
type SealTicket struct {
|
type SealTicket struct {
|
||||||
BlockHeight uint64
|
BlockHeight uint64
|
||||||
TicketBytes []byte
|
TicketBytes []byte
|
||||||
@ -33,6 +29,10 @@ func (t *SealSeed) SB() sectorbuilder.SealSeed {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *SealSeed) Equals(o *SealSeed) bool {
|
||||||
|
return string(t.TicketBytes) == string(o.TicketBytes) && t.BlockHeight == o.BlockHeight
|
||||||
|
}
|
||||||
|
|
||||||
type Piece struct {
|
type Piece struct {
|
||||||
DealID uint64
|
DealID uint64
|
||||||
|
|
||||||
@ -74,10 +74,8 @@ type SectorInfo struct {
|
|||||||
|
|
||||||
// Debug
|
// Debug
|
||||||
LastErr string
|
LastErr string
|
||||||
}
|
|
||||||
|
|
||||||
func (t *SectorInfo) upd() *sectorUpdate {
|
// TODO: Log []struct{ts, msg, trace string}
|
||||||
return §orUpdate{id: t.SectorID, nonce: t.Nonce}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/bits"
|
"math/bits"
|
||||||
@ -42,7 +42,7 @@ func fillersFromRem(toFill uint64) ([]uint64, error) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) ListSectors() ([]SectorInfo, error) {
|
func (m *Sealing) ListSectors() ([]SectorInfo, error) {
|
||||||
var sectors []SectorInfo
|
var sectors []SectorInfo
|
||||||
if err := m.sectors.List(§ors); err != nil {
|
if err := m.sectors.List(§ors); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -50,7 +50,7 @@ func (m *Miner) ListSectors() ([]SectorInfo, error) {
|
|||||||
return sectors, nil
|
return sectors, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) GetSectorInfo(sid uint64) (SectorInfo, error) {
|
func (m *Sealing) GetSectorInfo(sid uint64) (SectorInfo, error) {
|
||||||
var out SectorInfo
|
var out SectorInfo
|
||||||
err := m.sectors.Get(sid).Get(&out)
|
err := m.sectors.Get(sid).Get(&out)
|
||||||
return out, err
|
return out, err
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
@ -1,288 +0,0 @@
|
|||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate
|
|
||||||
|
|
||||||
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) {
|
|
||||||
go func() {
|
|
||||||
update := cb(ctx, sector)
|
|
||||||
|
|
||||||
if update == nil {
|
|
||||||
return // async
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- *update:
|
|
||||||
case <-m.stop:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
|
||||||
|
|
||||||
var allocated uint64
|
|
||||||
for _, piece := range sector.Pieces {
|
|
||||||
allocated += piece.Size
|
|
||||||
}
|
|
||||||
|
|
||||||
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
|
|
||||||
|
|
||||||
if allocated > ubytes {
|
|
||||||
return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
|
|
||||||
}
|
|
||||||
|
|
||||||
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(fillerSizes) > 0 {
|
|
||||||
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
|
|
||||||
}
|
|
||||||
|
|
||||||
pieces, err := m.pledgeSector(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
|
|
||||||
info.Pieces = append(info.Pieces, pieces...)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
|
||||||
ticket, err := m.tktFn(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rspco, err := m.sb.SealPreCommit(ctx, sector.SectorID, *ticket, sector.pieceInfos())
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
|
|
||||||
info.CommD = rspco.CommD[:]
|
|
||||||
info.CommR = rspco.CommR[:]
|
|
||||||
info.Ticket = SealTicket{
|
|
||||||
BlockHeight: ticket.BlockHeight,
|
|
||||||
TicketBytes: ticket.TicketBytes[:],
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
params := &actors.SectorPreCommitInfo{
|
|
||||||
SectorNumber: sector.SectorID,
|
|
||||||
|
|
||||||
CommR: sector.CommR,
|
|
||||||
SealEpoch: sector.Ticket.BlockHeight,
|
|
||||||
DealIDs: sector.deals(),
|
|
||||||
}
|
|
||||||
enc, aerr := actors.SerializeParams(params)
|
|
||||||
if aerr != nil {
|
|
||||||
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
|
||||||
To: m.maddr,
|
|
||||||
From: m.worker,
|
|
||||||
Method: actors.MAMethods.PreCommitSector,
|
|
||||||
Params: enc,
|
|
||||||
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
|
||||||
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
|
||||||
GasPrice: types.NewInt(1),
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("submitting precommit for sector: ", sector.SectorID)
|
|
||||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
|
|
||||||
mcid := smsg.Cid()
|
|
||||||
info.PreCommitMessage = &mcid
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
|
||||||
log.Info("Sector precommitted: ", sector.SectorID)
|
|
||||||
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.PreCommitFailed).error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
|
||||||
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
|
||||||
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
|
||||||
return sector.upd().to(api.PreCommitFailed).error(err)
|
|
||||||
}
|
|
||||||
log.Info("precommit message landed on chain: ", sector.SectorID)
|
|
||||||
|
|
||||||
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
|
|
||||||
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
|
|
||||||
|
|
||||||
updateNonce := sector.Nonce
|
|
||||||
|
|
||||||
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
|
|
||||||
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
|
|
||||||
if err != nil {
|
|
||||||
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
|
|
||||||
|
|
||||||
m.sectorUpdated <- *sector.upd().fatal(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) {
|
|
||||||
info.Seed = SealSeed{
|
|
||||||
BlockHeight: randHeight,
|
|
||||||
TicketBytes: rand,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
updateNonce++
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}, func(ctx context.Context, ts *types.TipSet) error {
|
|
||||||
log.Warn("revert in interactive commit sector step")
|
|
||||||
// TODO: need to cancel running process and restart...
|
|
||||||
return nil
|
|
||||||
}, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
log.Info("scheduling seal proof computation...")
|
|
||||||
|
|
||||||
proof, err := m.sb.SealCommit(ctx, sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Consider splitting states and persist proof for faster recovery
|
|
||||||
|
|
||||||
params := &actors.SectorProveCommitInfo{
|
|
||||||
Proof: proof,
|
|
||||||
SectorID: sector.SectorID,
|
|
||||||
DealIDs: sector.deals(),
|
|
||||||
}
|
|
||||||
|
|
||||||
enc, aerr := actors.SerializeParams(params)
|
|
||||||
if aerr != nil {
|
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
|
||||||
To: m.maddr,
|
|
||||||
From: m.worker,
|
|
||||||
Method: actors.MAMethods.ProveCommitSector,
|
|
||||||
Params: enc,
|
|
||||||
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
|
||||||
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
|
||||||
GasPrice: types.NewInt(1),
|
|
||||||
}
|
|
||||||
|
|
||||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Separate state before this wait, so we persist message cid?
|
|
||||||
return sector.upd().to(api.CommitWait).state(func(info *SectorInfo) {
|
|
||||||
mcid := smsg.Cid()
|
|
||||||
info.CommitMessage = &mcid
|
|
||||||
info.Proof = proof
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
if sector.CommitMessage == nil {
|
|
||||||
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("entered commit wait with no commit cid"))
|
|
||||||
}
|
|
||||||
|
|
||||||
mw, err := m.api.StateWaitMsg(ctx, *sector.CommitMessage)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
|
||||||
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
|
|
||||||
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode))
|
|
||||||
}
|
|
||||||
|
|
||||||
return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
// TODO: check if the fault has already been reported, and that this sector is even valid
|
|
||||||
|
|
||||||
// TODO: coalesce faulty sector reporting
|
|
||||||
bf := types.NewBitField()
|
|
||||||
bf.Set(sector.SectorID)
|
|
||||||
|
|
||||||
fp := &actors.DeclareFaultsParams{bf}
|
|
||||||
enc, aerr := actors.SerializeParams(fp)
|
|
||||||
if aerr != nil {
|
|
||||||
return sector.upd().fatal(xerrors.Errorf("failed to serialize declare fault params: %w", aerr))
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
|
||||||
To: m.maddr,
|
|
||||||
From: m.worker,
|
|
||||||
Method: actors.MAMethods.DeclareFaults,
|
|
||||||
Params: enc,
|
|
||||||
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
|
||||||
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
|
||||||
GasPrice: types.NewInt(1),
|
|
||||||
}
|
|
||||||
|
|
||||||
smsg, err := m.api.MpoolPushMessage(ctx, msg)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to push declare faults message to network: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return sector.upd().to(api.FaultReported).state(func(info *SectorInfo) {
|
|
||||||
c := smsg.Cid()
|
|
||||||
info.FaultReportMsg = &c
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) handleFaultReported(ctx context.Context, sector SectorInfo) *sectorUpdate {
|
|
||||||
if sector.FaultReportMsg == nil {
|
|
||||||
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("entered fault reported state without a FaultReportMsg cid"))
|
|
||||||
}
|
|
||||||
|
|
||||||
mw, err := m.api.StateWaitMsg(ctx, *sector.FaultReportMsg)
|
|
||||||
if err != nil {
|
|
||||||
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for fault declaration: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
|
||||||
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
|
|
||||||
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode))
|
|
||||||
}
|
|
||||||
|
|
||||||
return sector.upd().to(api.FaultedFinal).state(func(info *SectorInfo) {})
|
|
||||||
|
|
||||||
}
|
|
@ -2,8 +2,6 @@ package sectorblocks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/storage"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
@ -11,6 +9,9 @@ import (
|
|||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("sectorblocks")
|
var log = logging.Logger("sectorblocks")
|
||||||
@ -76,7 +77,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
|
|||||||
|
|
||||||
// TODO: better strategy (e.g. look for already unsealed)
|
// TODO: better strategy (e.g. look for already unsealed)
|
||||||
var best api.SealedRef
|
var best api.SealedRef
|
||||||
var bestSi storage.SectorInfo
|
var bestSi sealing.SectorInfo
|
||||||
for _, r := range refs {
|
for _, r := range refs {
|
||||||
si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID)
|
si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1,318 +0,0 @@
|
|||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
|
|
||||||
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
xerrors "golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/lib/padreader"
|
|
||||||
)
|
|
||||||
|
|
||||||
const NonceIncrement = math.MaxUint64
|
|
||||||
|
|
||||||
type sectorUpdate struct {
|
|
||||||
newState api.SectorState
|
|
||||||
id uint64
|
|
||||||
err error
|
|
||||||
nonce uint64
|
|
||||||
mut func(*SectorInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) fatal(err error) *sectorUpdate {
|
|
||||||
u.newState = api.FailedUnrecoverable
|
|
||||||
u.err = err
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) error(err error) *sectorUpdate {
|
|
||||||
u.err = err
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate {
|
|
||||||
u.mut = m
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate {
|
|
||||||
u.newState = newState
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u *sectorUpdate) setNonce(nc uint64) *sectorUpdate {
|
|
||||||
u.nonce = nc
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) UpdateSectorState(ctx context.Context, sector uint64, snonce uint64, state api.SectorState) error {
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- sectorUpdate{
|
|
||||||
newState: state,
|
|
||||||
nonce: snonce,
|
|
||||||
id: sector,
|
|
||||||
}:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) sectorStateLoop(ctx context.Context) error {
|
|
||||||
trackedSectors, err := m.ListSectors()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("loading sector list: %+v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for _, si := range trackedSectors {
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- sectorUpdate{
|
|
||||||
newState: si.State,
|
|
||||||
nonce: si.Nonce,
|
|
||||||
id: si.SectorID,
|
|
||||||
err: nil,
|
|
||||||
mut: nil,
|
|
||||||
}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
log.Warn("didn't restart processing for all sectors: ", ctx.Err())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
{
|
|
||||||
// verify on-chain state
|
|
||||||
trackedByID := map[uint64]*SectorInfo{}
|
|
||||||
for _, si := range trackedSectors {
|
|
||||||
i := si
|
|
||||||
trackedByID[si.SectorID] = &i
|
|
||||||
}
|
|
||||||
|
|
||||||
curTs, err := m.api.ChainHead(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting chain head: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ps, err := m.api.StateMinerProvingSet(ctx, m.maddr, curTs)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting miner proving set: %w", err)
|
|
||||||
}
|
|
||||||
for _, ocs := range ps {
|
|
||||||
if _, ok := trackedByID[ocs.SectorID]; ok {
|
|
||||||
continue // TODO: check state
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: attempt recovery
|
|
||||||
log.Warnf("untracked sector %d found on chain", ocs.SectorID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer log.Warn("quitting deal provider loop")
|
|
||||||
defer close(m.stopped)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case sector := <-m.sectorIncoming:
|
|
||||||
m.onSectorIncoming(sector)
|
|
||||||
case update := <-m.sectorUpdated:
|
|
||||||
m.onSectorUpdated(ctx, update)
|
|
||||||
case <-m.stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
|
|
||||||
has, err := m.sectors.Has(sector.SectorID)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if has {
|
|
||||||
log.Warnf("SealPiece called more than once for sector %d", sector.SectorID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
|
|
||||||
log.Errorf("sector tracking failed: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case m.sectorUpdated <- sectorUpdate{
|
|
||||||
newState: api.Packing,
|
|
||||||
id: sector.SectorID,
|
|
||||||
}:
|
|
||||||
case <-m.stop:
|
|
||||||
log.Warn("failed to send incoming sector update, miner shutting down")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
|
|
||||||
log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState])
|
|
||||||
var sector SectorInfo
|
|
||||||
err := m.sectors.Get(update.id).Mutate(func(s *SectorInfo) error {
|
|
||||||
if update.nonce < s.Nonce {
|
|
||||||
return xerrors.Errorf("update nonce too low, ignoring (%d < %d)", update.nonce, s.Nonce)
|
|
||||||
}
|
|
||||||
|
|
||||||
if update.nonce != NonceIncrement {
|
|
||||||
s.Nonce = update.nonce
|
|
||||||
} else {
|
|
||||||
s.Nonce++ // forced update
|
|
||||||
}
|
|
||||||
s.State = update.newState
|
|
||||||
if update.err != nil {
|
|
||||||
if s.LastErr != "" {
|
|
||||||
s.LastErr += "---------\n\n"
|
|
||||||
}
|
|
||||||
s.LastErr += fmt.Sprintf("entering state %s: %+v", api.SectorStates[update.newState], update.err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if update.mut != nil {
|
|
||||||
update.mut(s)
|
|
||||||
}
|
|
||||||
sector = *s
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if update.err != nil {
|
|
||||||
log.Errorf("sector %d failed: %+v", update.id, update.err)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("sector %d update error: %+v", update.id, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
* Empty
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
*<- Packing <- incoming
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
*<- Unsealed <--> SealFailed
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
* PreCommitting <--> PreCommitFailed
|
|
||||||
| | ^
|
|
||||||
| v |
|
|
||||||
*<- PreCommitted ------/
|
|
||||||
| |||
|
|
||||||
| vvv v--> SealCommitFailed
|
|
||||||
*<- Committing
|
|
||||||
| | ^--> CommitFailed
|
|
||||||
| v ^
|
|
||||||
*<- CommitWait ---/
|
|
||||||
| |
|
|
||||||
| v
|
|
||||||
*<- Proving
|
|
||||||
|
|
|
||||||
v
|
|
||||||
FailedUnrecoverable
|
|
||||||
|
|
||||||
UndefinedSectorState <- ¯\_(ツ)_/¯
|
|
||||||
| ^
|
|
||||||
*---------------------/
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
switch update.newState {
|
|
||||||
// Happy path
|
|
||||||
case api.Packing:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handlePacking)
|
|
||||||
case api.Unsealed:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleUnsealed)
|
|
||||||
case api.PreCommitting:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handlePreCommitting)
|
|
||||||
case api.PreCommitted:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handlePreCommitted)
|
|
||||||
case api.Committing:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleCommitting)
|
|
||||||
case api.CommitWait:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleCommitWait)
|
|
||||||
case api.Proving:
|
|
||||||
// TODO: track sector health / expiration
|
|
||||||
log.Infof("Proving sector %d", update.id)
|
|
||||||
|
|
||||||
// Handled failure modes
|
|
||||||
case api.SealFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'SealFailed'", update.id)
|
|
||||||
case api.PreCommitFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", update.id)
|
|
||||||
case api.SealCommitFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", update.id)
|
|
||||||
case api.CommitFailed:
|
|
||||||
log.Warnf("sector %d entered unimplemented state 'CommitFailed'", update.id)
|
|
||||||
|
|
||||||
// Faults
|
|
||||||
case api.Faulty:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleFaulty)
|
|
||||||
case api.FaultReported:
|
|
||||||
m.handleSectorUpdate(ctx, sector, m.handleFaultReported)
|
|
||||||
|
|
||||||
// Fatal errors
|
|
||||||
case api.UndefinedSectorState:
|
|
||||||
log.Error("sector update with undefined state!")
|
|
||||||
case api.FailedUnrecoverable:
|
|
||||||
log.Errorf("sector %d failed unrecoverably", update.id)
|
|
||||||
default:
|
|
||||||
log.Errorf("unexpected sector update state: %d", update.newState)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
|
||||||
if padreader.PaddedSize(size) != size {
|
|
||||||
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
|
||||||
}
|
|
||||||
|
|
||||||
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// offset hard-coded to 0 since we only put one thing in a sector for now
|
|
||||||
return sid, 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
|
||||||
log.Infof("Seal piece for deal %d", dealID)
|
|
||||||
|
|
||||||
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("adding piece to sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return m.newSector(ctx, sectorID, dealID, ppi)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
|
||||||
si := &SectorInfo{
|
|
||||||
SectorID: sid,
|
|
||||||
|
|
||||||
Pieces: []Piece{
|
|
||||||
{
|
|
||||||
DealID: dealID,
|
|
||||||
|
|
||||||
Size: ppi.Size,
|
|
||||||
CommP: ppi.CommP[:],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case m.sectorIncoming <- si:
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return xerrors.Errorf("failed to submit sector for sealing, queue full: %w", ctx.Err())
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user