Rename agvmgr+sealmgr to sectorstorage

This commit is contained in:
Łukasz Magiera 2020-03-23 12:40:02 +01:00
parent cd618dfdbd
commit 5e4a7e54df
36 changed files with 150 additions and 285 deletions

View File

@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
// alias because cbor-gen doesn't like non-alias types // alias because cbor-gen doesn't like non-alias types

View File

@ -6,15 +6,15 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
type WorkerApi interface { type WorkerApi interface {
Version(context.Context) (build.Version, error) Version(context.Context) (build.Version, error)
// TODO: Info() (name, ...) ? // TODO: Info() (name, ...) ?
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) // TaskType -> Weight
Paths(context.Context) ([]stores.StoragePath, error) Paths(context.Context) ([]stores.StoragePath, error)
Info(context.Context) (WorkerInfo, error) Info(context.Context) (WorkerInfo, error)

View File

@ -2,28 +2,28 @@ package apistruct
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
// All permissions are listed in permissioned.go // All permissions are listed in permissioned.go
@ -204,9 +204,9 @@ type WorkerStruct struct {
Version func(context.Context) (build.Version, error) `perm:"admin"` Version func(context.Context) (build.Version, error) `perm:"admin"`
TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"` TaskTypes func(context.Context) (map[sealtasks.TaskType]struct{}, error) `perm:"admin"`
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (api.WorkerInfo, error) `perm:"admin"` Info func(context.Context) (api.WorkerInfo, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"` SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
@ -722,7 +722,7 @@ func (w *WorkerStruct) Version(ctx context.Context) (build.Version, error) {
return w.Internal.Version(ctx) return w.Internal.Version(ctx)
} }
func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealmgr.TaskType]struct{}, error) { func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return w.Internal.TaskTypes(ctx) return w.Internal.TaskTypes(ctx)
} }

View File

@ -36,7 +36,7 @@ import (
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sbmock" "github.com/filecoin-project/lotus/storage/sectorstorage/mock"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -690,7 +690,7 @@ func (m genFakeVerifier) VerifyElectionPost(ctx context.Context, pvi abi.PoStVer
panic("nyi") panic("nyi")
} }
func (m genFakeVerifier) GenerateDataCommitment(ssize abi.PaddedPieceSize, pieces []abi.PieceInfo) (cid.Cid, error) { func (m genFakeVerifier) GenerateDataCommitment(ssize abi.PaddedPieceSize, pieces []abi.PieceInfo) (cid.Cid, error) {
return sbmock.MockVerifier.GenerateDataCommitment(ssize, pieces) return mock.MockVerifier.GenerateDataCommitment(ssize, pieces)
} }
func (m genFakeVerifier) VerifySeal(svi abi.SealVerifyInfo) (bool, error) { func (m genFakeVerifier) VerifySeal(svi abi.SealVerifyInfo) (bool, error) {

View File

@ -16,6 +16,7 @@ import (
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
@ -25,9 +26,9 @@ import (
"github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
var log = logging.Logger("main") var log = logging.Logger("main")
@ -223,9 +224,9 @@ var runCmd = &cli.Command{
// Create / expose the worker // Create / expose the worker
workerApi := &worker{ workerApi := &worker{
LocalWorker: advmgr.NewLocalWorker(advmgr.WorkerConfig{ LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt, SealProof: spt,
TaskTypes: []sealmgr.TaskType{sealmgr.TTPreCommit1, sealmgr.TTPreCommit2, sealmgr.TTCommit2}, TaskTypes: []sealtasks.TaskType{sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2},
}, remote, localStore, nodeApi), }, remote, localStore, nodeApi),
} }

View File

@ -3,13 +3,14 @@ package main
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/storage/sectorstorage"
) )
type worker struct { // TODO: use advmgr.LocalWorker here type worker struct { // TODO: use advmgr.LocalWorker here
*advmgr.LocalWorker *sectorstorage.LocalWorker
} }
func (w *worker) Version(context.Context) (build.Version, error) { func (w *worker) Version(context.Context) (build.Version, error) {

View File

@ -7,7 +7,6 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -27,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
var log = logging.Logger("preseal") var log = logging.Logger("preseal")

View File

@ -43,8 +43,8 @@ import (
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing" "github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr" "github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
var initCmd = &cli.Command{ var initCmd = &cli.Command{
@ -404,7 +404,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return xerrors.Errorf("getting id address: %w", err) return xerrors.Errorf("getting id address: %w", err)
} }
smgr, err := advmgr.New(lr, stores.NewIndex(), &sectorbuilder.Config{ smgr, err := sectorstorage.New(lr, stores.NewIndex(), &sectorbuilder.Config{
SealProofType: spt, SealProofType: spt,
PoStProofType: ppt, PoStProofType: ppt,
}, nil, api) }, nil, api)

View File

@ -19,7 +19,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
const metaFile = "sectorstore.json" const metaFile = "sectorstore.json"

View File

@ -14,18 +14,18 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage"
) )
type retrievalProviderNode struct { type retrievalProviderNode struct {
miner *storage.Miner miner *storage.Miner
sealer sealmgr.Manager sealer sectorstorage.SectorManager
full api.FullNode full api.FullNode
} }
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the // NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node // Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode) retrievalmarket.RetrievalProviderNode { func NewRetrievalProviderNode(miner *storage.Miner, sealer sectorstorage.SectorManager, full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sealer, full} return &retrievalProviderNode{miner, sealer, full}
} }

View File

@ -26,7 +26,6 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder" sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-actors/actors/runtime" "github.com/filecoin-project/specs-actors/actors/runtime"
storage2 "github.com/filecoin-project/specs-storage/storage" storage2 "github.com/filecoin-project/specs-storage/storage"
@ -61,9 +60,9 @@ import (
"github.com/filecoin-project/lotus/paychmgr" "github.com/filecoin-project/lotus/paychmgr"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing" "github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
var log = logging.Logger("builder") var log = logging.Logger("builder")
@ -273,10 +272,10 @@ func Online() Option {
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig), Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))), Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
Override(new(sealing.SectorIDCounter), modules.SectorIDCounter), Override(new(sealing.SectorIDCounter), modules.SectorIDCounter),
Override(new(*advmgr.Manager), advmgr.New), Override(new(*sectorstorage.Manager), sectorstorage.New),
Override(new(sealmgr.Manager), From(new(*advmgr.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Override(new(storage2.Prover), From(new(sealmgr.Manager))), Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(sealing.TicketFn), modules.SealTicketGen), Override(new(sealing.TicketFn), modules.SealTicketGen),
@ -335,13 +334,13 @@ func ConfigCommon(cfg *config.Common) Option {
Override(SetApiEndpointKey, func(lr repo.LockedRepo, e dtypes.APIEndpoint) error { Override(SetApiEndpointKey, func(lr repo.LockedRepo, e dtypes.APIEndpoint) error {
return lr.SetAPIEndpoint(e) return lr.SetAPIEndpoint(e)
}), }),
Override(new(advmgr.URLs), func(e dtypes.APIEndpoint) (advmgr.URLs, error) { Override(new(sectorstorage.URLs), func(e dtypes.APIEndpoint) (sectorstorage.URLs, error) {
_, ip, err := manet.DialArgs(e) _, ip, err := manet.DialArgs(e)
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting api endpoint dial args: %w", err) return nil, xerrors.Errorf("getting api endpoint dial args: %w", err)
} }
var urls advmgr.URLs var urls sectorstorage.URLs
urls = append(urls, "http://"+ip+"/remote") // TODO: This makes assumptions, and probably bad ones too urls = append(urls, "http://"+ip+"/remote") // TODO: This makes assumptions, and probably bad ones too
return urls, nil return urls, nil
}), }),

View File

@ -21,9 +21,9 @@ import (
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
type StorageMinerAPI struct { type StorageMinerAPI struct {
@ -37,7 +37,7 @@ type StorageMinerAPI struct {
Miner *storage.Miner Miner *storage.Miner
BlockMiner *miner.Miner BlockMiner *miner.Miner
Full api.FullNode Full api.FullNode
StorageMgr *advmgr.Manager `optional:"true"` StorageMgr *sectorstorage.Manager `optional:"true"`
*stores.Index *stores.Index
} }
@ -148,7 +148,7 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe
} }
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error { func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
w, err := advmgr.ConnectRemote(ctx, sm, url) w, err := sectorstorage.ConnectRemote(ctx, sm, url)
if err != nil { if err != nil {
return xerrors.Errorf("connecting remote storage failed: %w", err) return xerrors.Errorf("connecting remote storage failed: %w", err)
} }

View File

@ -49,7 +49,7 @@ import (
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing" "github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage"
) )
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
@ -117,7 +117,7 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
return &sidsc{sc} return &sidsc{sc}
} }
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sealmgr.Manager, sc sealing.SectorIDCounter, tktFn sealing.TicketFn) (*storage.Miner, error) { func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, tktFn sealing.TicketFn) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
@ -327,7 +327,7 @@ func StorageProvider(ctx helpers.MetricsCtx, fapi lapi.FullNode, h host.Host, ds
} }
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore // RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sealmgr.Manager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) { func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
address, err := minerAddrFromDS(ds) address, err := minerAddrFromDS(ds)
if err != nil { if err != nil {

View File

@ -41,9 +41,8 @@ import (
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
modtest "github.com/filecoin-project/lotus/node/modules/testing" modtest "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sbmock" "github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/mock"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
) )
func init() { func init() {
@ -310,7 +309,7 @@ func mockSbBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []t
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
genm, k, err := sbmock.PreSeal(2048, maddr, nPreseal) genm, k, err := mock.PreSeal(2048, maddr, nPreseal)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -356,7 +355,7 @@ func mockSbBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []t
node.MockHost(mn), node.MockHost(mn),
node.Test(), node.Test(),
node.Override(new(sectorbuilder.Verifier), sbmock.MockVerifier), node.Override(new(sectorbuilder.Verifier), mock.MockVerifier),
genesis, genesis,
) )
@ -386,10 +385,10 @@ func mockSbBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []t
wa := genms[i].Worker wa := genms[i].Worker
storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options( storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options(
node.Override(new(sealmgr.Manager), func() (sealmgr.Manager, error) { node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
return sealmgr.NewSimpleManager(genMiner, sbmock.NewMockSectorBuilder(5, build.SectorSizes[0])) return mock.NewMockSectorMgr(5, build.SectorSizes[0]), nil
}), }),
node.Unset(new(*advmgr.Manager)), node.Unset(new(*sectorstorage.Manager)),
)) ))
} }

View File

@ -2,7 +2,6 @@ package repo
import ( import (
"encoding/json" "encoding/json"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -17,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
type MemRepo struct { type MemRepo struct {

View File

@ -14,7 +14,7 @@ import (
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -34,7 +34,7 @@ var log = logging.Logger("storageminer")
type Miner struct { type Miner struct {
api storageMinerApi api storageMinerApi
h host.Host h host.Host
sealer sealmgr.Manager sealer sectorstorage.SectorManager
ds datastore.Batching ds datastore.Batching
tktFn sealing.TicketFn tktFn sealing.TicketFn
sc sealing.SectorIDCounter sc sealing.SectorIDCounter
@ -74,7 +74,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sealmgr.Manager, sc sealing.SectorIDCounter, tktFn sealing.TicketFn) (*Miner, error) { func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, tktFn sealing.TicketFn) (*Miner, error) {
m := &Miner{ m := &Miner{
api: api, api: api,
h: h, h: h,

View File

@ -21,7 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage"
) )
const SectorStorePrefix = "/sectors" const SectorStorePrefix = "/sectors"
@ -69,13 +69,13 @@ type Sealing struct {
maddr address.Address maddr address.Address
worker address.Address worker address.Address
sealer sealmgr.Manager sealer sectorstorage.SectorManager
sectors *statemachine.StateGroup sectors *statemachine.StateGroup
tktFn TicketFn tktFn TicketFn
sc SectorIDCounter sc SectorIDCounter
} }
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, sc SectorIDCounter, tktFn TicketFn) *Sealing { func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, tktFn TicketFn) *Sealing {
s := &Sealing{ s := &Sealing{
api: api, api: api,
events: events, events: events,

View File

@ -1,115 +0,0 @@
package sealmgr
import (
"context"
"io"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
)
type LocalWorker struct {
sectorbuilder.Basic
}
var _ Worker = &LocalWorker{}
// Simple implements a very basic storage manager which has one local worker,
// running one thing locally
type Simple struct {
maddr address.Address
rateLimiter sync.Mutex
worker Worker
}
type sszgetter interface {
SectorSize() abi.SectorSize
}
func (s *LocalWorker) SectorSize() abi.SectorSize {
return s.Basic.(sszgetter).SectorSize()
}
func (s *Simple) SectorSize() abi.SectorSize {
return s.worker.(sszgetter).SectorSize()
}
func NewSimpleManager(maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) {
w := &LocalWorker{
sb,
}
return &Simple{
maddr: maddr,
worker: w,
}, nil
}
func (s *Simple) NewSector(ctx context.Context, id abi.SectorID) error {
return s.worker.NewSector(ctx, id)
}
func (s *Simple) AddPiece(ctx context.Context, id abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r storage.Data) (abi.PieceInfo, error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.AddPiece(ctx, id, existingPieces, sz, r)
}
func (s *Simple) SealPreCommit1(ctx context.Context, id abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit1(ctx, id, ticket, pieces)
}
func (s *Simple) SealPreCommit2(ctx context.Context, id abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit2(ctx, id, phase1Out)
}
func (s *Simple) SealCommit1(ctx context.Context, id abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit1(ctx, id, ticket, seed, pieces, cids)
}
func (s *Simple) SealCommit2(ctx context.Context, id abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit2(ctx, id, phase1Out)
}
func (s *Simple) FinalizeSector(ctx context.Context, id abi.SectorID) error {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.FinalizeSector(ctx, id)
}
func (s *Simple) GenerateEPostCandidates(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
return s.worker.GenerateEPostCandidates(ctx, miner, sectorInfo, challengeSeed, faults)
}
func (s *Simple) GenerateFallbackPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) (storage.FallbackPostOut, error) {
return s.worker.GenerateFallbackPoSt(ctx, miner, sectorInfo, challengeSeed, faults)
}
func (s *Simple) ComputeElectionPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
return s.worker.ComputeElectionPoSt(ctx, miner, sectorInfo, challengeSeed, winners)
}
func (s *Simple) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("todo")
}
var _ Manager = &Simple{}

View File

@ -1,30 +0,0 @@
package sealmgr
import (
"context"
"io"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
)
type Worker interface {
sectorbuilder.Sealer
storage.Prover
}
type Manager interface {
SectorSize() abi.SectorSize
// TODO: Can[Pre]Commit[1,2]
// TODO: Scrub() []Faults
// TODO: Separate iface
ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
sectorbuilder.Sealer
storage.Prover
}

View File

@ -1,4 +1,4 @@
package advmgr package sectorstorage
import ( import (
"container/list" "container/list"
@ -14,12 +14,12 @@ import (
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
var log = logging.Logger("advmgr") var log = logging.Logger("advmgr")
@ -29,7 +29,7 @@ type URLs []string
type Worker interface { type Worker interface {
sectorbuilder.Sealer sectorbuilder.Sealer
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
// Returns paths accessible to the worker // Returns paths accessible to the worker
Paths(context.Context) ([]stores.StoragePath, error) Paths(context.Context) ([]stores.StoragePath, error)
@ -37,6 +37,15 @@ type Worker interface {
Info(context.Context) (api.WorkerInfo, error) Info(context.Context) (api.WorkerInfo, error)
} }
type SectorManager interface {
SectorSize() abi.SectorSize
ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
sectorbuilder.Sealer
storage.Prover
}
type workerID uint64 type workerID uint64
type Manager struct { type Manager struct {
@ -48,7 +57,7 @@ type Manager struct {
remoteHnd *stores.FetchHandler remoteHnd *stores.FetchHandler
index stores.SectorIndex index stores.SectorIndex
storage2.Prover storage.Prover
workersLk sync.Mutex workersLk sync.Mutex
nextWorker workerID nextWorker workerID
@ -106,7 +115,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType, SealProof: cfg.SealProofType,
TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize}, TaskTypes: []sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize},
}, stor, lstor, si)) }, stor, lstor, si))
if err != nil { if err != nil {
return nil, xerrors.Errorf("adding local worker: %w", err) return nil, xerrors.Errorf("adding local worker: %w", err)
@ -159,7 +168,7 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, secto
panic("implement me") panic("implement me")
} }
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) { func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) {
m.workersLk.Lock() m.workersLk.Lock()
defer m.workersLk.Unlock() defer m.workersLk.Unlock()
@ -210,7 +219,7 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
return workers, paths return workers, paths
} }
func (m *Manager) getWorker(ctx context.Context, taskType sealmgr.TaskType, accept []workerID) (Worker, func(), error) { func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []workerID) (Worker, func(), error) {
ret := make(chan workerResponse) ret := make(chan workerResponse)
select { select {
@ -257,13 +266,13 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
} }
log.Debugf("find workers for %v", best) log.Debugf("find workers for %v", best)
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best) candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
if len(candidateWorkers) == 0 { if len(candidateWorkers) == 0 {
return abi.PieceInfo{}, xerrors.New("no worker found") return abi.PieceInfo{}, xerrors.New("no worker found")
} }
worker, done, err := m.getWorker(ctx, sealmgr.TTAddPiece, candidateWorkers) worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err) return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err)
} }
@ -274,7 +283,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return worker.AddPiece(ctx, sector, existingPieces, sz, r) return worker.AddPiece(ctx, sector, existingPieces, sz, r)
} }
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) { func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
// TODO: also consider where the unsealed data sits // TODO: also consider where the unsealed data sits
best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
@ -282,12 +291,12 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return nil, xerrors.Errorf("finding path for sector sealing: %w", err) return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
} }
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best) candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
if len(candidateWorkers) == 0 { if len(candidateWorkers) == 0 {
return nil, xerrors.New("no suitable workers found") return nil, xerrors.New("no suitable workers found")
} }
worker, done, err := m.getWorker(ctx, sealmgr.TTPreCommit1, candidateWorkers) worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
if err != nil { if err != nil {
return nil, xerrors.Errorf("scheduling worker: %w", err) return nil, xerrors.Errorf("scheduling worker: %w", err)
} }
@ -298,22 +307,22 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return worker.SealPreCommit1(ctx, sector, ticket, pieces) return worker.SealPreCommit1(ctx, sector, ticket, pieces)
} }
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) { func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
// TODO: allow workers to fetch the sectors // TODO: allow workers to fetch the sectors
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil { if err != nil {
return storage2.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err) return storage.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
} }
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
if len(candidateWorkers) == 0 { if len(candidateWorkers) == 0 {
return storage2.SectorCids{}, xerrors.New("no suitable workers found") return storage.SectorCids{}, xerrors.New("no suitable workers found")
} }
worker, done, err := m.getWorker(ctx, sealmgr.TTPreCommit2, candidateWorkers) worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
if err != nil { if err != nil {
return storage2.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err) return storage.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err)
} }
defer done() defer done()
@ -322,19 +331,19 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return worker.SealPreCommit2(ctx, sector, phase1Out) return worker.SealPreCommit2(ctx, sector, phase1Out)
} }
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) { func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil { if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err) return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
} }
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best) candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
if len(candidateWorkers) == 0 { if len(candidateWorkers) == 0 {
return nil, xerrors.New("no suitable workers found") // TODO: wait? return nil, xerrors.New("no suitable workers found") // TODO: wait?
} }
// TODO: Try very hard to execute on worker with access to the sectors // TODO: Try very hard to execute on worker with access to the sectors
worker, done, err := m.getWorker(ctx, sealmgr.TTCommit1, candidateWorkers) worker, done, err := m.getWorker(ctx, sealtasks.TTCommit1, candidateWorkers)
if err != nil { if err != nil {
return nil, xerrors.Errorf("scheduling worker: %w", err) return nil, xerrors.Errorf("scheduling worker: %w", err)
} }
@ -345,7 +354,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
} }
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) { func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
var candidateWorkers []workerID var candidateWorkers []workerID
m.workersLk.Lock() m.workersLk.Lock()
@ -355,14 +364,14 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
log.Errorf("error getting supported worker task types: %+v", err) log.Errorf("error getting supported worker task types: %+v", err)
continue continue
} }
if _, ok := tt[sealmgr.TTCommit2]; !ok { if _, ok := tt[sealtasks.TTCommit2]; !ok {
continue continue
} }
candidateWorkers = append(candidateWorkers, id) candidateWorkers = append(candidateWorkers, id)
} }
m.workersLk.Unlock() m.workersLk.Unlock()
worker, done, err := m.getWorker(ctx, sealmgr.TTCommit2, candidateWorkers) worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
if err != nil { if err != nil {
return nil, xerrors.Errorf("scheduling worker: %w", err) return nil, xerrors.Errorf("scheduling worker: %w", err)
} }
@ -377,7 +386,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
return xerrors.Errorf("finding sealed sector: %w", err) return xerrors.Errorf("finding sealed sector: %w", err)
} }
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTFinalize, best) candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
// TODO: Remove sector from sealing stores // TODO: Remove sector from sealing stores
// TODO: Move the sector to long-term storage // TODO: Move the sector to long-term storage
@ -398,4 +407,4 @@ func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error
return out, nil return out, nil
} }
var _ sealmgr.Manager = &Manager{} var _ SectorManager = &Manager{}

View File

@ -1,9 +1,10 @@
package sbmock package mock
import ( import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/lotus/storage/sectorstorage"
"io" "io"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
@ -23,7 +24,7 @@ import (
var log = logging.Logger("sbmock") var log = logging.Logger("sbmock")
type SBMock struct { type SectorMgr struct {
sectors map[abi.SectorID]*sectorState sectors map[abi.SectorID]*sectorState
sectorSize abi.SectorSize sectorSize abi.SectorSize
nextSectorID abi.SectorNumber nextSectorID abi.SectorNumber
@ -35,13 +36,13 @@ type SBMock struct {
type mockVerif struct{} type mockVerif struct{}
func NewMockSectorBuilder(threads int, ssize abi.SectorSize) *SBMock { func NewMockSectorMgr(threads int, ssize abi.SectorSize) *SectorMgr {
rt, _, err := api.ProofTypeFromSectorSize(ssize) rt, _, err := api.ProofTypeFromSectorSize(ssize)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &SBMock{ return &SectorMgr{
sectors: make(map[abi.SectorID]*sectorState), sectors: make(map[abi.SectorID]*sectorState),
sectorSize: ssize, sectorSize: ssize,
nextSectorID: 5, nextSectorID: 5,
@ -65,7 +66,7 @@ type sectorState struct {
lk sync.Mutex lk sync.Mutex
} }
func (sb *SBMock) RateLimit() func() { func (sb *SectorMgr) RateLimit() func() {
sb.rateLimit <- struct{}{} sb.rateLimit <- struct{}{}
// TODO: probably want to copy over rate limit code // TODO: probably want to copy over rate limit code
@ -74,11 +75,11 @@ func (sb *SBMock) RateLimit() func() {
} }
} }
func (sb *SBMock) NewSector(ctx context.Context, sector abi.SectorID) error { func (sb *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error {
return nil return nil
} }
func (sb *SBMock) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { func (sb *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorId, size, sb.proofType) log.Warn("Add piece: ", sectorId, size, sb.proofType)
sb.lk.Lock() sb.lk.Lock()
ss, ok := sb.sectors[sectorId] ss, ok := sb.sectors[sectorId]
@ -106,11 +107,11 @@ func (sb *SBMock) AddPiece(ctx context.Context, sectorId abi.SectorID, existingP
}, nil }, nil
} }
func (sb *SBMock) SectorSize() abi.SectorSize { func (sb *SectorMgr) SectorSize() abi.SectorSize {
return sb.sectorSize return sb.sectorSize
} }
func (sb *SBMock) AcquireSectorNumber() (abi.SectorNumber, error) { func (sb *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
sb.lk.Lock() sb.lk.Lock()
defer sb.lk.Unlock() defer sb.lk.Unlock()
id := sb.nextSectorID id := sb.nextSectorID
@ -118,7 +119,7 @@ func (sb *SBMock) AcquireSectorNumber() (abi.SectorNumber, error) {
return id, nil return id, nil
} }
func (sb *SBMock) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { func (sb *SectorMgr) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
sb.lk.Lock() sb.lk.Lock()
ss, ok := sb.sectors[sid] ss, ok := sb.sectors[sid]
sb.lk.Unlock() sb.lk.Unlock()
@ -173,7 +174,7 @@ func (sb *SBMock) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket a
return cc, nil return cc, nil
} }
func (sb *SBMock) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) { func (sb *SectorMgr) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
db := []byte(string(phase1Out)) db := []byte(string(phase1Out))
db[0] ^= 'd' db[0] ^= 'd'
@ -192,7 +193,7 @@ func (sb *SBMock) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Ou
}, nil }, nil
} }
func (sb *SBMock) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) { func (sb *SectorMgr) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
sb.lk.Lock() sb.lk.Lock()
ss, ok := sb.sectors[sid] ss, ok := sb.sectors[sid]
sb.lk.Unlock() sb.lk.Unlock()
@ -220,7 +221,7 @@ func (sb *SBMock) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.
return out[:], nil return out[:], nil
} }
func (sb *SBMock) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) { func (sb *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
var out [32]byte var out [32]byte
for i := range out { for i := range out {
out[i] = phase1Out[i] ^ byte(sid.Number&0xff) out[i] = phase1Out[i] ^ byte(sid.Number&0xff)
@ -231,7 +232,7 @@ func (sb *SBMock) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out s
// Test Instrumentation Methods // Test Instrumentation Methods
func (sb *SBMock) FailSector(sid abi.SectorID) error { func (sb *SectorMgr) FailSector(sid abi.SectorID) error {
sb.lk.Lock() sb.lk.Lock()
defer sb.lk.Unlock() defer sb.lk.Unlock()
ss, ok := sb.sectors[sid] ss, ok := sb.sectors[sid]
@ -259,15 +260,15 @@ func AddOpFinish(ctx context.Context) (context.Context, func()) {
} }
} }
func (sb *SBMock) GenerateFallbackPoSt(context.Context, abi.ActorID, []abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) (storage.FallbackPostOut, error) { func (sb *SectorMgr) GenerateFallbackPoSt(context.Context, abi.ActorID, []abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) (storage.FallbackPostOut, error) {
panic("implement me") panic("implement me")
} }
func (sb *SBMock) ComputeElectionPoSt(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { func (sb *SectorMgr) ComputeElectionPoSt(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
panic("implement me") panic("implement me")
} }
func (sb *SBMock) GenerateEPostCandidates(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) { func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
if len(faults) > 0 { if len(faults) > 0 {
panic("todo") panic("todo")
} }
@ -297,14 +298,14 @@ func (sb *SBMock) GenerateEPostCandidates(ctx context.Context, mid abi.ActorID,
return out, nil return out, nil
} }
func (sb *SBMock) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) { func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) {
if len(sb.sectors[sectorID].pieces) > 1 { if len(sb.sectors[sectorID].pieces) > 1 {
panic("implme") panic("implme")
} }
return ioutil.NopCloser(io.LimitReader(bytes.NewReader(sb.sectors[sectorID].pieces[0].Bytes()[offset:]), int64(size))), nil return ioutil.NopCloser(io.LimitReader(bytes.NewReader(sb.sectors[sectorID].pieces[0].Bytes()[offset:]), int64(size))), nil
} }
func (sb *SBMock) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) { func (sb *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) {
usize := abi.PaddedPieceSize(sb.sectorSize).Unpadded() usize := abi.PaddedPieceSize(sb.sectorSize).Unpadded()
sid, err := sb.AcquireSectorNumber() sid, err := sb.AcquireSectorNumber()
if err != nil { if err != nil {
@ -327,7 +328,7 @@ func (sb *SBMock) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo,
return id, []abi.PieceInfo{pi}, nil return id, []abi.PieceInfo{pi}, nil
} }
func (sb *SBMock) FinalizeSector(context.Context, abi.SectorID) error { func (sb *SectorMgr) FinalizeSector(context.Context, abi.SectorID) error {
return nil return nil
} }
@ -367,4 +368,4 @@ func (m mockVerif) GenerateDataCommitment(ssize abi.PaddedPieceSize, pieces []ab
var MockVerifier = mockVerif{} var MockVerifier = mockVerif{}
var _ sectorbuilder.Verifier = MockVerifier var _ sectorbuilder.Verifier = MockVerifier
var _ sectorbuilder.Basic = &SBMock{} var _ sectorstorage.SectorManager = &SectorMgr{}

View File

@ -1,4 +1,4 @@
package sbmock package mock
import ( import (
"context" "context"
@ -9,7 +9,7 @@ import (
) )
func TestOpFinish(t *testing.T) { func TestOpFinish(t *testing.T) {
sb := NewMockSectorBuilder(1, 2048) sb := NewMockSectorMgr(1, 2048)
sid, pieces, err := sb.StageFakeData(123) sid, pieces, err := sb.StageFakeData(123)
if err != nil { if err != nil {

View File

@ -1,4 +1,4 @@
package sbmock package mock
import ( import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"

View File

@ -1,4 +1,4 @@
package sbmock package mock
import ( import (
"crypto/rand" "crypto/rand"

View File

@ -1,8 +1,8 @@
package advmgr package sectorstorage
import ( import (
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
) )
@ -30,8 +30,8 @@ type Resources struct {
const MaxCachingOverhead = 32 << 30 const MaxCachingOverhead = 32 << 30
var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
sealmgr.TTAddPiece: { sealtasks.TTAddPiece: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ // This is probably a bit conservative abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ // This is probably a bit conservative
MaxMemory: 32 << 30, MaxMemory: 32 << 30,
MinMemory: 32 << 30, MinMemory: 32 << 30,
@ -49,7 +49,7 @@ var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{
BaseMinMemory: 1 << 30, BaseMinMemory: 1 << 30,
}, },
}, },
sealmgr.TTPreCommit1: { sealtasks.TTPreCommit1: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 64 << 30, MaxMemory: 64 << 30,
MinMemory: 32 << 30, MinMemory: 32 << 30,
@ -67,7 +67,7 @@ var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{
BaseMinMemory: 1 << 30, BaseMinMemory: 1 << 30,
}, },
}, },
sealmgr.TTPreCommit2: { sealtasks.TTPreCommit2: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 96 << 30, MaxMemory: 96 << 30,
MinMemory: 64 << 30, MinMemory: 64 << 30,
@ -85,7 +85,7 @@ var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{
BaseMinMemory: 1 << 30, BaseMinMemory: 1 << 30,
}, },
}, },
sealmgr.TTCommit1: { // Very short (~100ms), so params are very light sealtasks.TTCommit1: { // Very short (~100ms), so params are very light
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 1 << 30, MaxMemory: 1 << 30,
MinMemory: 1 << 30, MinMemory: 1 << 30,
@ -103,7 +103,7 @@ var ResourceTable = map[sealmgr.TaskType]map[abi.RegisteredProof]Resources{
BaseMinMemory: 1 << 30, BaseMinMemory: 1 << 30,
}, },
}, },
sealmgr.TTCommit2: { // TODO: Measure more accurately sealtasks.TTCommit2: { // TODO: Measure more accurately
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 110 << 30, MaxMemory: 110 << 30,
MinMemory: 60 << 30, MinMemory: 60 << 30,

View File

@ -1,10 +1,10 @@
package advmgr package sectorstorage
import ( import (
"context" "context"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors" "golang.org/x/xerrors"

View File

@ -1,17 +1,17 @@
package advmgr package sectorstorage
import ( import (
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr"
) )
const mib = 1 << 20 const mib = 1 << 20
type workerRequest struct { type workerRequest struct {
taskType sealmgr.TaskType taskType sealtasks.TaskType
accept []workerID // ordered by preference accept []workerID // ordered by preference
ret chan<- workerResponse ret chan<- workerResponse

View File

@ -1,4 +1,4 @@
package sealmgr package sealtasks
type TaskType string type TaskType string

View File

@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
) )
var log = logging.Logger("stores") var log = logging.Logger("stores")

View File

@ -13,7 +13,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
) )
// ID identifies sector storage by UUID. One sector storage should map to one // ID identifies sector storage by UUID. One sector storage should map to one

View File

@ -14,7 +14,7 @@ import (
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
) )
type StoragePath struct { type StoragePath struct {

View File

@ -16,7 +16,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
) )
type Remote struct { type Remote struct {

View File

@ -1,4 +1,4 @@
package advmgr package sectorstorage
import ( import (
"context" "context"
@ -14,16 +14,16 @@ import (
storage2 "github.com/filecoin-project/specs-storage/storage" storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil" "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
"github.com/filecoin-project/lotus/storage/sealmgr/stores" "github.com/filecoin-project/lotus/storage/sectorstorage/stores"
) )
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache} var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type WorkerConfig struct { type WorkerConfig struct {
SealProof abi.RegisteredProof SealProof abi.RegisteredProof
TaskTypes []sealmgr.TaskType TaskTypes []sealtasks.TaskType
} }
type LocalWorker struct { type LocalWorker struct {
@ -32,7 +32,7 @@ type LocalWorker struct {
localStore *stores.Local localStore *stores.Local
sindex stores.SectorIndex sindex stores.SectorIndex
acceptTasks map[sealmgr.TaskType]struct{} acceptTasks map[sealtasks.TaskType]struct{}
} }
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker { func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
@ -41,7 +41,7 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
panic(err) panic(err)
} }
acceptTasks := map[sealmgr.TaskType]struct{}{} acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes { for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{} acceptTasks[taskType] = struct{}{}
} }
@ -155,7 +155,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return sb.FinalizeSector(ctx, sector) return sb.FinalizeSector(ctx, sector)
} }
func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) { func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
return l.acceptTasks, nil return l.acceptTasks, nil
} }

View File

@ -1,4 +1,4 @@
package advmgr package sectorstorage
import ( import (
"context" "context"