From 13da5a596625319db2addefbeebab362b29d2053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 4 Nov 2019 17:47:08 +0100 Subject: [PATCH] Put WorkerThreads on sectorbuilder.Config --- cmd/lotus-storage-miner/run.go | 2 +- lib/sectorbuilder/sectorbuilder.go | 14 ++++++++------ lib/sectorbuilder/sectorbuilder_test.go | 11 +++++++---- lotuspond/spawn.go | 6 +++--- node/impl/storminer.go | 2 +- node/modules/storageminer.go | 17 ++++++++++++----- node/node_test.go | 2 +- 7 files changed, 33 insertions(+), 21 deletions(-) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index d0806eb16..eff0cbe10 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -77,7 +77,7 @@ var runCmd = &cli.Command{ } return lr.SetAPIEndpoint(apima) }), - node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(storageRepoPath)), + node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath)), node.Override(new(api.FullNode), nodeApi), ) if err != nil { diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index bd57a7bda..40e88ccc6 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -40,20 +40,22 @@ type SectorBuilder struct { handle unsafe.Pointer } -type SectorBuilderConfig struct { - SectorSize uint64 - Miner address.Address +type Config struct { + SectorSize uint64 + Miner address.Address + + WorkerThreads uint8 + CacheDir string SealedDir string StagedDir string MetadataDir string } -func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) { +func New(cfg *Config) (*SectorBuilder, error) { proverId := addressToProverID(cfg.Miner) - nemWorkerThreads := uint8(5) // TODO: from config - sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, nemWorkerThreads) + sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) if err != nil { return nil, err } diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index d17285776..ad2d8ebc2 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -37,13 +37,16 @@ func TestSealAndVerify(t *testing.T) { sealed := filepath.Join(dir, "sealed") staging := filepath.Join(dir, "staging") - sb, err := sectorbuilder.New(§orbuilder.SectorBuilderConfig{ - SectorSize: sectorSize, + sb, err := sectorbuilder.New(§orbuilder.Config{ + SectorSize: sectorSize, + Miner: addr, + + WorkerThreads: 1, + CacheDir: cache, SealedDir: sealed, StagedDir: staging, MetadataDir: metadata, - Miner: addr, }) if err != nil { t.Fatal(err) @@ -69,7 +72,7 @@ func TestSealAndVerify(t *testing.T) { seed := sectorbuilder.SealSeed{ BlockHeight: 15, - TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, }, + TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}, } sco, err := sb.SealCommit(sid, seed) diff --git a/lotuspond/spawn.go b/lotuspond/spawn.go index b58e0e7db..eac9ea418 100644 --- a/lotuspond/spawn.go +++ b/lotuspond/spawn.go @@ -49,7 +49,7 @@ func (api *api) Spawn() (nodeInfo, error) { cmd := exec.Command("./lotus", "daemon", "--bootstrap=false", genParam, "--api", fmt.Sprintf("%d", 2500+id)) cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) - cmd.Env = append(os.Environ(), "LOTUS_PATH=" + dir) + cmd.Env = append(os.Environ(), "LOTUS_PATH="+dir) if err := cmd.Start(); err != nil { return nodeInfo{}, err } @@ -112,7 +112,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { cmd := exec.Command("./lotus-storage-miner", initArgs...) cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile) cmd.Stdout = io.MultiWriter(os.Stdout, logfile) - cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo) + cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo) if err := cmd.Run(); err != nil { return nodeInfo{}, err } @@ -124,7 +124,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) - cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo) + cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo) if err := cmd.Start(); err != nil { return nodeInfo{}, err } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index dbbee1ca1..b9efe9f2f 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -14,7 +14,7 @@ import ( type StorageMinerAPI struct { CommonAPI - SectorBuilderConfig *sectorbuilder.SectorBuilderConfig + SectorBuilderConfig *sectorbuilder.Config SectorBuilder *sectorbuilder.SectorBuilder Sectors *sector.Store SectorBlocks *sectorblocks.SectorBlocks diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index b95500305..26f70cdb6 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -2,6 +2,7 @@ package modules import ( "context" + "math" "path/filepath" "github.com/ipfs/go-bitswap" @@ -38,8 +39,8 @@ func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { return address.NewFromBytes(maddrb) } -func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.SectorBuilderConfig, error) { - return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.SectorBuilderConfig, error) { +func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { + return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) { minerAddr, err := minerAddrFromDS(ds) if err != nil { return nil, err @@ -55,14 +56,20 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS, api.FullNod return nil, err } + if threads > math.MaxUint8 { + return nil, xerrors.Errorf("too many sectorbuilder threads specified: %d, max allowed: %d", threads, math.MaxUint8) + } + cache := filepath.Join(sp, "cache") metadata := filepath.Join(sp, "meta") sealed := filepath.Join(sp, "sealed") staging := filepath.Join(sp, "staging") - sb := §orbuilder.SectorBuilderConfig{ - Miner: minerAddr, - SectorSize: ssize, + sb := §orbuilder.Config{ + Miner: minerAddr, + SectorSize: ssize, + WorkerThreads: uint8(threads), + CacheDir: cache, MetadataDir: metadata, SealedDir: sealed, diff --git a/node/node_test.go b/node/node_test.go index 06f1fc1c6..e361bb5fa 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -93,7 +93,7 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a node.Repo(r), node.Test(), - node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(secbpath)), + node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath)), node.Override(new(api.FullNode), tnd), ) require.NoError(t, err)