Move miner sealing logic into a separate package
This commit is contained in:
parent
925e057acb
commit
604bf64ef6
@ -11,7 +11,12 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
|
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
paramfetch "github.com/filecoin-project/go-paramfetch"
|
paramfetch "github.com/filecoin-project/go-paramfetch"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
badger "github.com/ipfs/go-ds-badger2"
|
badger "github.com/ipfs/go-ds-badger2"
|
||||||
@ -21,11 +26,6 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
lapi "github.com/filecoin-project/lotus/api"
|
lapi "github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
@ -38,6 +38,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var initCmd = &cli.Command{
|
var initCmd = &cli.Command{
|
||||||
@ -242,17 +243,17 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, sector := range meta.Sectors {
|
for _, sector := range meta.Sectors {
|
||||||
sectorKey := datastore.NewKey(storage.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
|
sectorKey := datastore.NewKey(sealing.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
|
||||||
|
|
||||||
dealID, err := findMarketDealID(ctx, api, sector.Deal)
|
dealID, err := findMarketDealID(ctx, api, sector.Deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding storage deal for pre-sealed sector %d: %w", sector.SectorID, err)
|
return xerrors.Errorf("finding storage deal for pre-sealed sector %d: %w", sector.SectorID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
info := &storage.SectorInfo{
|
info := &sealing.SectorInfo{
|
||||||
State: lapi.Proving,
|
State: lapi.Proving,
|
||||||
SectorID: sector.SectorID,
|
SectorID: sector.SectorID,
|
||||||
Pieces: []storage.Piece{
|
Pieces: []sealing.Piece{
|
||||||
{
|
{
|
||||||
DealID: dealID,
|
DealID: dealID,
|
||||||
Size: meta.SectorSize,
|
Size: meta.SectorSize,
|
||||||
@ -262,9 +263,9 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
|
|||||||
CommD: sector.CommD[:],
|
CommD: sector.CommD[:],
|
||||||
CommR: sector.CommR[:],
|
CommR: sector.CommR[:],
|
||||||
Proof: nil,
|
Proof: nil,
|
||||||
Ticket: storage.SealTicket{},
|
Ticket: sealing.SealTicket{},
|
||||||
PreCommitMessage: nil,
|
PreCommitMessage: nil,
|
||||||
Seed: storage.SealSeed{},
|
Seed: sealing.SealSeed{},
|
||||||
CommitMessage: nil,
|
CommitMessage: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
12
gen/main.go
12
gen/main.go
@ -12,7 +12,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/statemachine"
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -122,11 +122,11 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = gen.WriteMapEncodersToFile("./storage/cbor_gen.go", "storage",
|
err = gen.WriteMapEncodersToFile("./storage/sectors/cbor_gen.go", "sectors",
|
||||||
storage.SealTicket{},
|
sealing.SealTicket{},
|
||||||
storage.SealSeed{},
|
sealing.SealSeed{},
|
||||||
storage.Piece{},
|
sealing.Piece{},
|
||||||
storage.SectorInfo{},
|
sealing.SectorInfo{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
@ -51,6 +51,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/peermgr"
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
// special is a type used to give keys to modules which
|
// special is a type used to give keys to modules which
|
||||||
@ -245,7 +246,7 @@ func Online() Option {
|
|||||||
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
|
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
|
||||||
Override(new(sectorbuilder.Interface), modules.SectorBuilder),
|
Override(new(sectorbuilder.Interface), modules.SectorBuilder),
|
||||||
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
||||||
Override(new(storage.TicketFn), modules.SealTicketGen),
|
Override(new(sealing.TicketFn), modules.SealTicketGen),
|
||||||
Override(new(*storage.Miner), modules.StorageMiner),
|
Override(new(*storage.Miner), modules.StorageMiner),
|
||||||
|
|
||||||
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
||||||
|
@ -5,7 +5,16 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||||
|
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
||||||
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
|
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
|
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
paramfetch "github.com/filecoin-project/go-paramfetch"
|
paramfetch "github.com/filecoin-project/go-paramfetch"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/filecoin-project/go-statestore"
|
||||||
"github.com/ipfs/go-bitswap"
|
"github.com/ipfs/go-bitswap"
|
||||||
"github.com/ipfs/go-bitswap/network"
|
"github.com/ipfs/go-bitswap/network"
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
@ -23,26 +32,17 @@ import (
|
|||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
||||||
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
||||||
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
||||||
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
"github.com/filecoin-project/go-statestore"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/gen"
|
"github.com/filecoin-project/lotus/chain/gen"
|
||||||
|
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
"github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||||
@ -98,7 +98,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn storage.TicketFn) (*storage.Miner, error) {
|
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*storage.Miner, error) {
|
||||||
maddr, err := minerAddrFromDS(ds)
|
maddr, err := minerAddrFromDS(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -241,7 +241,7 @@ func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuil
|
|||||||
return sb, nil
|
return sb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SealTicketGen(api api.FullNode) storage.TicketFn {
|
func SealTicketGen(api api.FullNode) sealing.TicketFn {
|
||||||
return func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
|
return func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
|
||||||
ts, err := api.ChainHead(ctx)
|
ts, err := api.ChainHead(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3,11 +3,11 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -16,31 +16,26 @@ import (
|
|||||||
"github.com/filecoin-project/go-sectorbuilder"
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/events"
|
|
||||||
"github.com/filecoin-project/lotus/chain/gen"
|
"github.com/filecoin-project/lotus/chain/gen"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/statemachine"
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("storageminer")
|
var log = logging.Logger("storageminer")
|
||||||
|
|
||||||
const SectorStorePrefix = "/sectors"
|
|
||||||
|
|
||||||
type Miner struct {
|
type Miner struct {
|
||||||
api storageMinerApi
|
api storageMinerApi
|
||||||
events *events.Events
|
|
||||||
h host.Host
|
h host.Host
|
||||||
|
sb sectorbuilder.Interface
|
||||||
|
ds datastore.Batching
|
||||||
|
tktFn sealing.TicketFn
|
||||||
|
|
||||||
maddr address.Address
|
maddr address.Address
|
||||||
worker address.Address
|
worker address.Address
|
||||||
|
|
||||||
// Sealing
|
sealing *sealing.Sealing
|
||||||
sb sectorbuilder.Interface
|
|
||||||
sectors *statemachine.StateGroup
|
|
||||||
tktFn TicketFn
|
|
||||||
|
|
||||||
sectorIncoming chan *SectorInfo
|
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
}
|
}
|
||||||
@ -70,23 +65,20 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) (*Miner, error) {
|
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) {
|
||||||
m := &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
|
|
||||||
maddr: addr,
|
|
||||||
h: h,
|
h: h,
|
||||||
sb: sb,
|
sb: sb,
|
||||||
|
ds: ds,
|
||||||
tktFn: tktFn,
|
tktFn: tktFn,
|
||||||
|
|
||||||
sectorIncoming: make(chan *SectorInfo),
|
maddr: addr,
|
||||||
|
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: separate sector stuff from miner struct
|
|
||||||
m.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), m, SectorInfo{})
|
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,26 +87,24 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
return xerrors.Errorf("miner preflight checks failed: %w", err)
|
return xerrors.Errorf("miner preflight checks failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.events = events.NewEvents(ctx, m.api)
|
|
||||||
|
|
||||||
fps := &fpostScheduler{
|
fps := &fpostScheduler{
|
||||||
api: m.api,
|
api: m.api,
|
||||||
sb: m.sb,
|
sb: m.sb,
|
||||||
|
|
||||||
actor: m.maddr,
|
actor: m.maddr,
|
||||||
worker: m.worker,
|
worker: m.worker,
|
||||||
}
|
}
|
||||||
|
|
||||||
go fps.run(ctx)
|
go fps.run(ctx)
|
||||||
if err := m.restartSectors(ctx); err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
evts := events.NewEvents(ctx, m.api)
|
||||||
return xerrors.Errorf("failed load sector states: %w", err)
|
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn)
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Stop(ctx context.Context) error {
|
func (m *Miner) Stop(ctx context.Context) error {
|
||||||
defer m.sectors.Stop(ctx)
|
defer m.sealing.Stop(ctx)
|
||||||
|
|
||||||
close(m.stop)
|
close(m.stop)
|
||||||
select {
|
select {
|
||||||
|
35
storage/sealing.go
Normal file
35
storage/sealing.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: refactor this to be direct somehow
|
||||||
|
|
||||||
|
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
||||||
|
return m.sealing.AllocatePiece(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
||||||
|
return m.sealing.SealPiece(ctx, size, r, sectorID, dealID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) ListSectors() ([]sealing.SectorInfo, error) {
|
||||||
|
return m.sealing.ListSectors()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) GetSectorInfo(sid uint64) (sealing.SectorInfo, error) {
|
||||||
|
return m.sealing.GetSectorInfo(sid)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) PledgeSector() error {
|
||||||
|
return m.sealing.PledgeSector()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
|
return m.sealing.ForceSectorState(ctx, id, state)
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
||||||
if len(sizes) == 0 {
|
if len(sizes) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -98,7 +98,7 @@ func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPiece
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) PledgeSector() error {
|
func (m *Sealing) PledgeSector() error {
|
||||||
go func() {
|
go func() {
|
||||||
ctx := context.TODO() // we can't use the context from command which invokes
|
ctx := context.TODO() // we can't use the context from command which invokes
|
||||||
// this, as we run everything here async, and it's cancelled when the
|
// this, as we run everything here async, and it's cancelled when the
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -53,7 +53,7 @@ type SectorForceState struct {
|
|||||||
state api.SectorState
|
state api.SectorState
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) Plan(events []statemachine.Event, user interface{}) (interface{}, error) {
|
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) {
|
||||||
next, err := m.plan(events, user.(*SectorInfo))
|
next, err := m.plan(events, user.(*SectorInfo))
|
||||||
if err != nil || next == nil {
|
if err != nil || next == nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -71,7 +71,7 @@ func (m *Miner) Plan(events []statemachine.Event, user interface{}) (interface{}
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
||||||
/////
|
/////
|
||||||
// First process all events
|
// First process all events
|
||||||
|
|
||||||
@ -242,7 +242,7 @@ func (m *Miner) plan(events []statemachine.Event, state *SectorInfo) (func(state
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) restartSectors(ctx context.Context) error {
|
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||||
trackedSectors, err := m.ListSectors()
|
trackedSectors, err := m.ListSectors()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("loading sector list: %+v", err)
|
log.Errorf("loading sector list: %+v", err)
|
||||||
@ -259,6 +259,6 @@ func (m *Miner) restartSectors(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
return m.sectors.Send(id, SectorForceState{state})
|
return m.sectors.Send(id, SectorForceState{state})
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -12,7 +12,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/lib/statemachine"
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *Miner) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
||||||
|
|
||||||
var allocated uint64
|
var allocated uint64
|
||||||
@ -43,7 +43,7 @@ func (m *Miner) handlePacking(ctx statemachine.Context, sector SectorInfo) error
|
|||||||
return ctx.Send(SectorPacked{pieces: pieces})
|
return ctx.Send(SectorPacked{pieces: pieces})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
||||||
ticket, err := m.tktFn(ctx.Context())
|
ticket, err := m.tktFn(ctx.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -65,7 +65,7 @@ func (m *Miner) handleUnsealed(ctx statemachine.Context, sector SectorInfo) erro
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
params := &actors.SectorPreCommitInfo{
|
params := &actors.SectorPreCommitInfo{
|
||||||
SectorNumber: sector.SectorID,
|
SectorNumber: sector.SectorID,
|
||||||
|
|
||||||
@ -97,7 +97,7 @@ func (m *Miner) handlePreCommitting(ctx statemachine.Context, sector SectorInfo)
|
|||||||
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
||||||
log.Info("Sector precommitted: ", sector.SectorID)
|
log.Info("Sector precommitted: ", sector.SectorID)
|
||||||
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
|
||||||
@ -142,7 +142,7 @@ func (m *Miner) handlePreCommitted(ctx statemachine.Context, sector SectorInfo)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
log.Info("scheduling seal proof computation...")
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
||||||
@ -184,7 +184,7 @@ func (m *Miner) handleCommitting(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if sector.CommitMessage == nil {
|
if sector.CommitMessage == nil {
|
||||||
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
||||||
@ -203,7 +203,7 @@ func (m *Miner) handleCommitWait(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
return ctx.Send(SectorProving{})
|
return ctx.Send(SectorProving{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
// TODO: check if the fault has already been reported, and that this sector is even valid
|
// TODO: check if the fault has already been reported, and that this sector is even valid
|
||||||
|
|
||||||
// TODO: coalesce faulty sector reporting
|
// TODO: coalesce faulty sector reporting
|
||||||
@ -235,7 +235,7 @@ func (m *Miner) handleFaulty(ctx statemachine.Context, sector SectorInfo) error
|
|||||||
return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()})
|
return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if sector.FaultReportMsg == nil {
|
if sector.FaultReportMsg == nil {
|
||||||
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
||||||
}
|
}
|
@ -1,16 +1,12 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
|
|
||||||
|
|
||||||
type SealTicket struct {
|
type SealTicket struct {
|
||||||
BlockHeight uint64
|
BlockHeight uint64
|
||||||
TicketBytes []byte
|
TicketBytes []byte
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/bits"
|
"math/bits"
|
||||||
@ -42,7 +42,7 @@ func fillersFromRem(toFill uint64) ([]uint64, error) {
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) ListSectors() ([]SectorInfo, error) {
|
func (m *Sealing) ListSectors() ([]SectorInfo, error) {
|
||||||
var sectors []SectorInfo
|
var sectors []SectorInfo
|
||||||
if err := m.sectors.List(§ors); err != nil {
|
if err := m.sectors.List(§ors); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -50,7 +50,7 @@ func (m *Miner) ListSectors() ([]SectorInfo, error) {
|
|||||||
return sectors, nil
|
return sectors, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) GetSectorInfo(sid uint64) (SectorInfo, error) {
|
func (m *Sealing) GetSectorInfo(sid uint64) (SectorInfo, error) {
|
||||||
var out SectorInfo
|
var out SectorInfo
|
||||||
err := m.sectors.Get(sid).Get(&out)
|
err := m.sectors.Get(sid).Get(&out)
|
||||||
return out, err
|
return out, err
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
136
storage/sealing/sectors.go
Normal file
136
storage/sealing/sectors.go
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/padreader"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
const SectorStorePrefix = "/sectors"
|
||||||
|
|
||||||
|
var log = logging.Logger("sectors")
|
||||||
|
|
||||||
|
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
|
||||||
|
|
||||||
|
type sectorsApi interface { // TODO: trim down
|
||||||
|
// Call a read only method on actors (no interaction with the chain required)
|
||||||
|
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
|
||||||
|
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
|
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
|
||||||
|
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
|
||||||
|
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
|
||||||
|
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
|
||||||
|
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
|
||||||
|
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
||||||
|
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
||||||
|
|
||||||
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||||
|
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
|
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
|
||||||
|
ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error)
|
||||||
|
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
|
||||||
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
||||||
|
|
||||||
|
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
|
||||||
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||||
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Sealing struct {
|
||||||
|
api sectorsApi
|
||||||
|
events *events.Events
|
||||||
|
|
||||||
|
maddr address.Address
|
||||||
|
worker address.Address
|
||||||
|
|
||||||
|
sb sectorbuilder.Interface
|
||||||
|
sectors *statemachine.StateGroup
|
||||||
|
tktFn TicketFn
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(api sectorsApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing {
|
||||||
|
s := &Sealing{
|
||||||
|
api: api,
|
||||||
|
events: events,
|
||||||
|
|
||||||
|
maddr: maddr,
|
||||||
|
worker: worker,
|
||||||
|
sb: sb,
|
||||||
|
tktFn: tktFn,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Run(ctx context.Context) error {
|
||||||
|
m.events = events.NewEvents(ctx, m.api)
|
||||||
|
|
||||||
|
if err := m.restartSectors(ctx); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return xerrors.Errorf("failed load sector states: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Stop(ctx context.Context) error {
|
||||||
|
return m.sectors.Stop(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
||||||
|
if padreader.PaddedSize(size) != size {
|
||||||
|
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
||||||
|
}
|
||||||
|
|
||||||
|
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// offset hard-coded to 0 since we only put one thing in a sector for now
|
||||||
|
return sid, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
||||||
|
log.Infof("Seal piece for deal %d", dealID)
|
||||||
|
|
||||||
|
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("adding piece to sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.newSector(ctx, sectorID, dealID, ppi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
||||||
|
return m.sectors.Send(sid, SectorStart{
|
||||||
|
id: sid,
|
||||||
|
pieces: []Piece{
|
||||||
|
{
|
||||||
|
DealID: dealID,
|
||||||
|
|
||||||
|
Size: ppi.Size,
|
||||||
|
CommP: ppi.CommP[:],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
@ -1,4 +1,4 @@
|
|||||||
package storage
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
@ -2,8 +2,6 @@ package sectorblocks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/storage"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
@ -11,6 +9,9 @@ import (
|
|||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("sectorblocks")
|
var log = logging.Logger("sectorblocks")
|
||||||
@ -76,7 +77,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
|
|||||||
|
|
||||||
// TODO: better strategy (e.g. look for already unsealed)
|
// TODO: better strategy (e.g. look for already unsealed)
|
||||||
var best api.SealedRef
|
var best api.SealedRef
|
||||||
var bestSi storage.SectorInfo
|
var bestSi sealing.SectorInfo
|
||||||
for _, r := range refs {
|
for _, r := range refs {
|
||||||
si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID)
|
si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1,50 +0,0 @@
|
|||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder"
|
|
||||||
xerrors "golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/lib/padreader"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
|
||||||
if padreader.PaddedSize(size) != size {
|
|
||||||
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
|
||||||
}
|
|
||||||
|
|
||||||
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// offset hard-coded to 0 since we only put one thing in a sector for now
|
|
||||||
return sid, 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
|
||||||
log.Infof("Seal piece for deal %d", dealID)
|
|
||||||
|
|
||||||
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("adding piece to sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return m.newSector(ctx, sectorID, dealID, ppi)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
|
||||||
return m.sectors.Send(sid, SectorStart{
|
|
||||||
id: sid,
|
|
||||||
pieces: []Piece{
|
|
||||||
{
|
|
||||||
DealID: dealID,
|
|
||||||
|
|
||||||
Size: ppi.Size,
|
|
||||||
CommP: ppi.CommP[:],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user