diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index b5bf0c6dc..0d400453f 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -11,7 +11,12 @@ import ( "path/filepath" "strconv" + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/storagemarket" + deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl" paramfetch "github.com/filecoin-project/go-paramfetch" + "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" badger "github.com/ipfs/go-ds-badger2" @@ -21,11 +26,6 @@ import ( "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" - "github.com/filecoin-project/go-address" - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-fil-markets/storagemarket" - deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl" - "github.com/filecoin-project/go-sectorbuilder" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" @@ -38,6 +38,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sealing" ) var initCmd = &cli.Command{ @@ -242,17 +243,17 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin } for _, sector := range meta.Sectors { - sectorKey := datastore.NewKey(storage.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID)) + sectorKey := datastore.NewKey(sealing.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID)) dealID, err := findMarketDealID(ctx, api, sector.Deal) if err != nil { return xerrors.Errorf("finding storage deal for pre-sealed sector %d: %w", sector.SectorID, err) } - info := &storage.SectorInfo{ + info := &sealing.SectorInfo{ State: lapi.Proving, SectorID: sector.SectorID, - Pieces: []storage.Piece{ + Pieces: []sealing.Piece{ { DealID: dealID, Size: meta.SectorSize, @@ -262,9 +263,9 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin CommD: sector.CommD[:], CommR: sector.CommR[:], Proof: nil, - Ticket: storage.SealTicket{}, + Ticket: sealing.SealTicket{}, PreCommitMessage: nil, - Seed: storage.SealSeed{}, + Seed: sealing.SealSeed{}, CommitMessage: nil, } diff --git a/gen/main.go b/gen/main.go index a5b2a6ce6..7c5a0e1e4 100644 --- a/gen/main.go +++ b/gen/main.go @@ -12,7 +12,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/statemachine" "github.com/filecoin-project/lotus/paych" - "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sealing" ) func main() { @@ -122,11 +122,11 @@ func main() { os.Exit(1) } - err = gen.WriteMapEncodersToFile("./storage/cbor_gen.go", "storage", - storage.SealTicket{}, - storage.SealSeed{}, - storage.Piece{}, - storage.SectorInfo{}, + err = gen.WriteMapEncodersToFile("./storage/sectors/cbor_gen.go", "sectors", + sealing.SealTicket{}, + sealing.SealSeed{}, + sealing.Piece{}, + sealing.SectorInfo{}, ) if err != nil { fmt.Println(err) diff --git a/node/builder.go b/node/builder.go index e2563c81e..d17d6beb4 100644 --- a/node/builder.go +++ b/node/builder.go @@ -51,6 +51,7 @@ import ( "github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storage/sealing" ) // special is a type used to give keys to modules which @@ -245,7 +246,7 @@ func Online() Option { ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, Override(new(sectorbuilder.Interface), modules.SectorBuilder), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), - Override(new(storage.TicketFn), modules.SealTicketGen), + Override(new(sealing.TicketFn), modules.SealTicketGen), Override(new(*storage.Miner), modules.StorageMiner), Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index ae9780560..1d2b1d7ce 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -5,7 +5,16 @@ import ( "math" "reflect" + "github.com/filecoin-project/go-address" + dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" + "github.com/filecoin-project/go-fil-markets/storagemarket" + deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl" + storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" paramfetch "github.com/filecoin-project/go-paramfetch" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-statestore" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" @@ -23,26 +32,17 @@ import ( "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" - dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" - "github.com/filecoin-project/go-fil-markets/storagemarket" - deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl" - storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" - "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/gen" + "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" - - "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storage/sealing" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -98,7 +98,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit } } -func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn storage.TicketFn) (*storage.Miner, error) { +func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err @@ -241,7 +241,7 @@ func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuil return sb, nil } -func SealTicketGen(api api.FullNode) storage.TicketFn { +func SealTicketGen(api api.FullNode) sealing.TicketFn { return func(ctx context.Context) (*sectorbuilder.SealTicket, error) { ts, err := api.ChainHead(ctx) if err != nil { diff --git a/storage/miner.go b/storage/miner.go index 5b116feec..01cb26b51 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -3,11 +3,11 @@ package storage import ( "context" "errors" + "github.com/filecoin-project/lotus/chain/events" "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" "golang.org/x/xerrors" @@ -16,31 +16,26 @@ import ( "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statemachine" + "github.com/filecoin-project/lotus/storage/sealing" ) var log = logging.Logger("storageminer") -const SectorStorePrefix = "/sectors" - type Miner struct { - api storageMinerApi - events *events.Events - h host.Host + api storageMinerApi + h host.Host + sb sectorbuilder.Interface + ds datastore.Batching + tktFn sealing.TicketFn maddr address.Address worker address.Address - // Sealing - sb sectorbuilder.Interface - sectors *statemachine.StateGroup - tktFn TicketFn + sealing *sealing.Sealing - sectorIncoming chan *SectorInfo stop chan struct{} stopped chan struct{} } @@ -70,23 +65,20 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) (*Miner, error) { +func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) { m := &Miner{ api: api, - - maddr: addr, h: h, sb: sb, + ds: ds, tktFn: tktFn, - sectorIncoming: make(chan *SectorInfo), + maddr: addr, + stop: make(chan struct{}), stopped: make(chan struct{}), } - // TODO: separate sector stuff from miner struct - m.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), m, SectorInfo{}) - return m, nil } @@ -95,26 +87,24 @@ func (m *Miner) Run(ctx context.Context) error { return xerrors.Errorf("miner preflight checks failed: %w", err) } - m.events = events.NewEvents(ctx, m.api) - fps := &fpostScheduler{ api: m.api, sb: m.sb, + actor: m.maddr, worker: m.worker, } go fps.run(ctx) - if err := m.restartSectors(ctx); err != nil { - log.Errorf("%+v", err) - return xerrors.Errorf("failed load sector states: %w", err) - } + + evts := events.NewEvents(ctx, m.api) + m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn) return nil } func (m *Miner) Stop(ctx context.Context) error { - defer m.sectors.Stop(ctx) + defer m.sealing.Stop(ctx) close(m.stop) select { diff --git a/storage/sealing.go b/storage/sealing.go new file mode 100644 index 000000000..d6d27b429 --- /dev/null +++ b/storage/sealing.go @@ -0,0 +1,35 @@ +package storage + +import ( + "context" + "io" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/sealing" +) + +// TODO: refactor this to be direct somehow + +func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { + return m.sealing.AllocatePiece(size) +} + +func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { + return m.sealing.SealPiece(ctx, size, r, sectorID, dealID) +} + +func (m *Miner) ListSectors() ([]sealing.SectorInfo, error) { + return m.sealing.ListSectors() +} + +func (m *Miner) GetSectorInfo(sid uint64) (sealing.SectorInfo, error) { + return m.sealing.GetSectorInfo(sid) +} + +func (m *Miner) PledgeSector() error { + return m.sealing.PledgeSector() +} + +func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error { + return m.sealing.ForceSectorState(ctx, id, state) +} diff --git a/storage/cbor_gen.go b/storage/sealing/cbor_gen.go similarity index 99% rename from storage/cbor_gen.go rename to storage/sealing/cbor_gen.go index d2a6b9ee3..62c6241dc 100644 --- a/storage/cbor_gen.go +++ b/storage/sealing/cbor_gen.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "fmt" diff --git a/storage/garbage.go b/storage/sealing/garbage.go similarity index 94% rename from storage/garbage.go rename to storage/sealing/garbage.go index a5470cdaa..1c3925671 100644 --- a/storage/garbage.go +++ b/storage/sealing/garbage.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "bytes" @@ -14,7 +14,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { +func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) { if len(sizes) == 0 { return nil, nil } @@ -98,7 +98,7 @@ func (m *Miner) pledgeSector(ctx context.Context, sectorID uint64, existingPiece return out, nil } -func (m *Miner) PledgeSector() error { +func (m *Sealing) PledgeSector() error { go func() { ctx := context.TODO() // we can't use the context from command which invokes // this, as we run everything here async, and it's cancelled when the diff --git a/storage/sector_fsm.go b/storage/sealing/sector_fsm.go similarity index 93% rename from storage/sector_fsm.go rename to storage/sealing/sector_fsm.go index 506cd1672..50b62844a 100644 --- a/storage/sector_fsm.go +++ b/storage/sealing/sector_fsm.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "context" @@ -53,7 +53,7 @@ type SectorForceState struct { state api.SectorState } -func (m *Miner) Plan(events []statemachine.Event, user interface{}) (interface{}, error) { +func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) { next, err := m.plan(events, user.(*SectorInfo)) if err != nil || next == nil { return nil, err @@ -71,7 +71,7 @@ func (m *Miner) Plan(events []statemachine.Event, user interface{}) (interface{} }, nil } -func (m *Miner) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { +func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) { ///// // First process all events @@ -242,7 +242,7 @@ func (m *Miner) plan(events []statemachine.Event, state *SectorInfo) (func(state return nil, nil } -func (m *Miner) restartSectors(ctx context.Context) error { +func (m *Sealing) restartSectors(ctx context.Context) error { trackedSectors, err := m.ListSectors() if err != nil { log.Errorf("loading sector list: %+v", err) @@ -259,6 +259,6 @@ func (m *Miner) restartSectors(ctx context.Context) error { return nil } -func (m *Miner) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error { +func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error { return m.sectors.Send(id, SectorForceState{state}) } diff --git a/storage/sector_states.go b/storage/sealing/sector_states.go similarity index 92% rename from storage/sector_states.go rename to storage/sealing/sector_states.go index 78752b383..1dbc6e4d4 100644 --- a/storage/sector_states.go +++ b/storage/sealing/sector_states.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "context" @@ -12,7 +12,7 @@ import ( "github.com/filecoin-project/lotus/lib/statemachine" ) -func (m *Miner) handlePacking(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) var allocated uint64 @@ -43,7 +43,7 @@ func (m *Miner) handlePacking(ctx statemachine.Context, sector SectorInfo) error return ctx.Send(SectorPacked{pieces: pieces}) } -func (m *Miner) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { log.Infow("performing sector replication...", "sector", sector.SectorID) ticket, err := m.tktFn(ctx.Context()) if err != nil { @@ -65,7 +65,7 @@ func (m *Miner) handleUnsealed(ctx statemachine.Context, sector SectorInfo) erro }) } -func (m *Miner) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { params := &actors.SectorPreCommitInfo{ SectorNumber: sector.SectorID, @@ -97,7 +97,7 @@ func (m *Miner) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorPreCommitted{message: smsg.Cid()}) } -func (m *Miner) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error { // would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts log.Info("Sector precommitted: ", sector.SectorID) mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) @@ -142,7 +142,7 @@ func (m *Miner) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) return nil } -func (m *Miner) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { log.Info("scheduling seal proof computation...") proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco()) @@ -184,7 +184,7 @@ func (m *Miner) handleCommitting(ctx statemachine.Context, sector SectorInfo) er }) } -func (m *Miner) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error { if sector.CommitMessage == nil { log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID) return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")}) @@ -203,7 +203,7 @@ func (m *Miner) handleCommitWait(ctx statemachine.Context, sector SectorInfo) er return ctx.Send(SectorProving{}) } -func (m *Miner) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { // TODO: check if the fault has already been reported, and that this sector is even valid // TODO: coalesce faulty sector reporting @@ -235,7 +235,7 @@ func (m *Miner) handleFaulty(ctx statemachine.Context, sector SectorInfo) error return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()}) } -func (m *Miner) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error { +func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error { if sector.FaultReportMsg == nil { return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid") } diff --git a/storage/sector_types.go b/storage/sealing/sector_types.go similarity index 94% rename from storage/sector_types.go rename to storage/sealing/sector_types.go index d3730bcfe..c2d6b3359 100644 --- a/storage/sector_types.go +++ b/storage/sealing/sector_types.go @@ -1,16 +1,12 @@ -package storage +package sealing import ( - "context" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/api" ) -type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) - type SealTicket struct { BlockHeight uint64 TicketBytes []byte diff --git a/storage/sector_utils.go b/storage/sealing/sector_utils.go similarity index 91% rename from storage/sector_utils.go rename to storage/sealing/sector_utils.go index 96fcbf891..8fa887d3c 100644 --- a/storage/sector_utils.go +++ b/storage/sealing/sector_utils.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "math/bits" @@ -42,7 +42,7 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { return out, nil } -func (m *Miner) ListSectors() ([]SectorInfo, error) { +func (m *Sealing) ListSectors() ([]SectorInfo, error) { var sectors []SectorInfo if err := m.sectors.List(§ors); err != nil { return nil, err @@ -50,7 +50,7 @@ func (m *Miner) ListSectors() ([]SectorInfo, error) { return sectors, nil } -func (m *Miner) GetSectorInfo(sid uint64) (SectorInfo, error) { +func (m *Sealing) GetSectorInfo(sid uint64) (SectorInfo, error) { var out SectorInfo err := m.sectors.Get(sid).Get(&out) return out, err diff --git a/storage/sector_utils_test.go b/storage/sealing/sector_utils_test.go similarity index 98% rename from storage/sector_utils_test.go rename to storage/sealing/sector_utils_test.go index 9ab38241c..02746a3d8 100644 --- a/storage/sector_utils_test.go +++ b/storage/sealing/sector_utils_test.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "testing" diff --git a/storage/sealing/sectors.go b/storage/sealing/sectors.go new file mode 100644 index 000000000..c915c27f1 --- /dev/null +++ b/storage/sealing/sectors.go @@ -0,0 +1,136 @@ +package sealing + +import ( + "context" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "io" + + "github.com/filecoin-project/lotus/lib/padreader" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +const SectorStorePrefix = "/sectors" + +var log = logging.Logger("sectors") + +type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error) + +type sectorsApi interface { // TODO: trim down + // Call a read only method on actors (no interaction with the chain required) + StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) + StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) + StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) + StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) + StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) + StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually + StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) + StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) + + MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) + + ChainHead(context.Context) (*types.TipSet, error) + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) + ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + + WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) +} + +type Sealing struct { + api sectorsApi + events *events.Events + + maddr address.Address + worker address.Address + + sb sectorbuilder.Interface + sectors *statemachine.StateGroup + tktFn TicketFn +} + +func New(api sectorsApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing { + s := &Sealing{ + api: api, + events: events, + + maddr: maddr, + worker: worker, + sb: sb, + tktFn: tktFn, + } + + s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) + + return s +} + +func (m *Sealing) Run(ctx context.Context) error { + m.events = events.NewEvents(ctx, m.api) + + if err := m.restartSectors(ctx); err != nil { + log.Errorf("%+v", err) + return xerrors.Errorf("failed load sector states: %w", err) + } + + return nil +} + +func (m *Sealing) Stop(ctx context.Context) error { + return m.sectors.Stop(ctx) +} + +func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { + if padreader.PaddedSize(size) != size { + return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") + } + + sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector + if err != nil { + return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) + } + + // offset hard-coded to 0 since we only put one thing in a sector for now + return sid, 0, nil +} + +func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { + log.Infof("Seal piece for deal %d", dealID) + + ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{}) + if err != nil { + return xerrors.Errorf("adding piece to sector: %w", err) + } + + return m.newSector(ctx, sectorID, dealID, ppi) +} + +func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { + return m.sectors.Send(sid, SectorStart{ + id: sid, + pieces: []Piece{ + { + DealID: dealID, + + Size: ppi.Size, + CommP: ppi.CommP[:], + }, + }, + }) +} + diff --git a/storage/sectors_fsm_test.go b/storage/sealing/sectors_fsm_test.go similarity index 99% rename from storage/sectors_fsm_test.go rename to storage/sealing/sectors_fsm_test.go index 115a928b5..3eda54860 100644 --- a/storage/sectors_fsm_test.go +++ b/storage/sealing/sectors_fsm_test.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "testing" diff --git a/storage/sectors_test.go b/storage/sealing/sectors_test.go similarity index 98% rename from storage/sectors_test.go rename to storage/sealing/sectors_test.go index 111a00aed..f14051a15 100644 --- a/storage/sectors_test.go +++ b/storage/sealing/sectors_test.go @@ -1,4 +1,4 @@ -package storage +package sealing import ( "bytes" diff --git a/storage/sectorblocks/blockstore.go b/storage/sectorblocks/blockstore.go index 3cf497d59..36394c1c8 100644 --- a/storage/sectorblocks/blockstore.go +++ b/storage/sectorblocks/blockstore.go @@ -2,8 +2,6 @@ package sectorblocks import ( "context" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/storage" "io/ioutil" blocks "github.com/ipfs/go-block-format" @@ -11,6 +9,9 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/sealing" ) var log = logging.Logger("sectorblocks") @@ -76,7 +77,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) { // TODO: better strategy (e.g. look for already unsealed) var best api.SealedRef - var bestSi storage.SectorInfo + var bestSi sealing.SectorInfo for _, r := range refs { si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID) if err != nil { diff --git a/storage/sectors.go b/storage/sectors.go deleted file mode 100644 index 5f0cf32d3..000000000 --- a/storage/sectors.go +++ /dev/null @@ -1,50 +0,0 @@ -package storage - -import ( - "context" - "io" - - "github.com/filecoin-project/go-sectorbuilder" - xerrors "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/lib/padreader" -) - -func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) { - if padreader.PaddedSize(size) != size { - return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") - } - - sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector - if err != nil { - return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) - } - - // offset hard-coded to 0 since we only put one thing in a sector for now - return sid, 0, nil -} - -func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { - log.Infof("Seal piece for deal %d", dealID) - - ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{}) - if err != nil { - return xerrors.Errorf("adding piece to sector: %w", err) - } - - return m.newSector(ctx, sectorID, dealID, ppi) -} - -func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { - return m.sectors.Send(sid, SectorStart{ - id: sid, - pieces: []Piece{ - { - DealID: dealID, - - Size: ppi.Size, - CommP: ppi.CommP[:], - }, - }, - }) -}