Put WorkerThreads on sectorbuilder.Config

This commit is contained in:
Łukasz Magiera 2019-11-04 17:47:08 +01:00
parent 3ff9b2fab4
commit 13da5a5966
7 changed files with 33 additions and 21 deletions

View File

@ -77,7 +77,7 @@ var runCmd = &cli.Command{
} }
return lr.SetAPIEndpoint(apima) return lr.SetAPIEndpoint(apima)
}), }),
node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(storageRepoPath)), node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath)),
node.Override(new(api.FullNode), nodeApi), node.Override(new(api.FullNode), nodeApi),
) )
if err != nil { if err != nil {

View File

@ -40,20 +40,22 @@ type SectorBuilder struct {
handle unsafe.Pointer handle unsafe.Pointer
} }
type SectorBuilderConfig struct { type Config struct {
SectorSize uint64 SectorSize uint64
Miner address.Address Miner address.Address
WorkerThreads uint8
CacheDir string CacheDir string
SealedDir string SealedDir string
StagedDir string StagedDir string
MetadataDir string MetadataDir string
} }
func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) { func New(cfg *Config) (*SectorBuilder, error) {
proverId := addressToProverID(cfg.Miner) proverId := addressToProverID(cfg.Miner)
nemWorkerThreads := uint8(5) // TODO: from config
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, nemWorkerThreads) sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -37,13 +37,16 @@ func TestSealAndVerify(t *testing.T) {
sealed := filepath.Join(dir, "sealed") sealed := filepath.Join(dir, "sealed")
staging := filepath.Join(dir, "staging") staging := filepath.Join(dir, "staging")
sb, err := sectorbuilder.New(&sectorbuilder.SectorBuilderConfig{ sb, err := sectorbuilder.New(&sectorbuilder.Config{
SectorSize: sectorSize, SectorSize: sectorSize,
Miner: addr,
WorkerThreads: 1,
CacheDir: cache, CacheDir: cache,
SealedDir: sealed, SealedDir: sealed,
StagedDir: staging, StagedDir: staging,
MetadataDir: metadata, MetadataDir: metadata,
Miner: addr,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -69,7 +72,7 @@ func TestSealAndVerify(t *testing.T) {
seed := sectorbuilder.SealSeed{ seed := sectorbuilder.SealSeed{
BlockHeight: 15, BlockHeight: 15,
TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, }, TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9},
} }
sco, err := sb.SealCommit(sid, seed) sco, err := sb.SealCommit(sid, seed)

View File

@ -49,7 +49,7 @@ func (api *api) Spawn() (nodeInfo, error) {
cmd := exec.Command("./lotus", "daemon", "--bootstrap=false", genParam, "--api", fmt.Sprintf("%d", 2500+id)) cmd := exec.Command("./lotus", "daemon", "--bootstrap=false", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = append(os.Environ(), "LOTUS_PATH=" + dir) cmd.Env = append(os.Environ(), "LOTUS_PATH="+dir)
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return nodeInfo{}, err return nodeInfo{}, err
} }
@ -112,7 +112,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
cmd := exec.Command("./lotus-storage-miner", initArgs...) cmd := exec.Command("./lotus-storage-miner", initArgs...)
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile) cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile) cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo) cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo)
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return nodeInfo{}, err return nodeInfo{}, err
} }
@ -124,7 +124,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo) cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo)
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return nodeInfo{}, err return nodeInfo{}, err
} }

View File

@ -14,7 +14,7 @@ import (
type StorageMinerAPI struct { type StorageMinerAPI struct {
CommonAPI CommonAPI
SectorBuilderConfig *sectorbuilder.SectorBuilderConfig SectorBuilderConfig *sectorbuilder.Config
SectorBuilder *sectorbuilder.SectorBuilder SectorBuilder *sectorbuilder.SectorBuilder
Sectors *sector.Store Sectors *sector.Store
SectorBlocks *sectorblocks.SectorBlocks SectorBlocks *sectorblocks.SectorBlocks

View File

@ -2,6 +2,7 @@ package modules
import ( import (
"context" "context"
"math"
"path/filepath" "path/filepath"
"github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap"
@ -38,8 +39,8 @@ func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
return address.NewFromBytes(maddrb) return address.NewFromBytes(maddrb)
} }
func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.SectorBuilderConfig, error) { func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) {
return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.SectorBuilderConfig, error) { return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) {
minerAddr, err := minerAddrFromDS(ds) minerAddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
@ -55,14 +56,20 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS, api.FullNod
return nil, err return nil, err
} }
if threads > math.MaxUint8 {
return nil, xerrors.Errorf("too many sectorbuilder threads specified: %d, max allowed: %d", threads, math.MaxUint8)
}
cache := filepath.Join(sp, "cache") cache := filepath.Join(sp, "cache")
metadata := filepath.Join(sp, "meta") metadata := filepath.Join(sp, "meta")
sealed := filepath.Join(sp, "sealed") sealed := filepath.Join(sp, "sealed")
staging := filepath.Join(sp, "staging") staging := filepath.Join(sp, "staging")
sb := &sectorbuilder.SectorBuilderConfig{ sb := &sectorbuilder.Config{
Miner: minerAddr, Miner: minerAddr,
SectorSize: ssize, SectorSize: ssize,
WorkerThreads: uint8(threads),
CacheDir: cache, CacheDir: cache,
MetadataDir: metadata, MetadataDir: metadata,
SealedDir: sealed, SealedDir: sealed,

View File

@ -93,7 +93,7 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a
node.Repo(r), node.Repo(r),
node.Test(), node.Test(),
node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(secbpath)), node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath)),
node.Override(new(api.FullNode), tnd), node.Override(new(api.FullNode), tnd),
) )
require.NoError(t, err) require.NoError(t, err)