Merge pull request #584 from filecoin-project/feat/worker-cfg

WorkerCount on storageminer config
This commit is contained in:
Łukasz Magiera 2019-11-12 20:00:52 +01:00 committed by GitHub
commit 036bc578ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 88 additions and 57 deletions

View File

@ -75,7 +75,7 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) {
func NewGenerator() (*ChainGen, error) { func NewGenerator() (*ChainGen, error) {
mr := repo.NewMemory(nil) mr := repo.NewMemory(nil)
lr, err := mr.Lock(repo.RepoStorageMiner) lr, err := mr.Lock(repo.StorageMiner)
if err != nil { if err != nil {
return nil, xerrors.Errorf("taking mem-repo lock failed: %w", err) return nil, xerrors.Errorf("taking mem-repo lock failed: %w", err)
} }

View File

@ -101,7 +101,7 @@ var initCmd = &cli.Command{
log.Info("Initializing repo") log.Info("Initializing repo")
if err := r.Init(repo.RepoStorageMiner); err != nil { if err := r.Init(repo.StorageMiner); err != nil {
return err return err
} }
@ -122,7 +122,7 @@ var initCmd = &cli.Command{
} }
func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, r repo.Repo) error { func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, r repo.Repo) error {
lr, err := r.Lock(repo.RepoStorageMiner) lr, err := r.Lock(repo.StorageMiner)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,9 +17,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/auth" "github.com/filecoin-project/lotus/lib/auth"
"github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/lib/jsonrpc"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -84,7 +82,6 @@ var runCmd = &cli.Command{
} }
return lr.SetAPIEndpoint(apima) return lr.SetAPIEndpoint(apima)
}), }),
node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath, 5)), // TODO: grab worker count from config
node.Override(new(api.FullNode), nodeApi), node.Override(new(api.FullNode), nodeApi),
) )
if err != nil { if err != nil {

View File

@ -54,7 +54,7 @@ var DaemonCmd = &cli.Command{
return err return err
} }
if err := r.Init(repo.RepoFullNode); err != nil && err != repo.ErrRepoExists { if err := r.Init(repo.FullNode); err != nil && err != repo.ErrRepoExists {
return err return err
} }

View File

@ -127,7 +127,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
return sb, nil return sb, nil
} }
func (sb *SectorBuilder) rlimit() func() { func (sb *SectorBuilder) RateLimit() func() {
if cap(sb.rateLimit) == len(sb.rateLimit) { if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call") log.Warn("rate-limiting sectorbuilder call")
} }
@ -173,7 +173,7 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
return PublicPieceInfo{}, err return PublicPieceInfo{}, err
} }
ret := sb.rlimit() ret := sb.RateLimit()
defer ret() defer ret()
stagedFile, err := sb.stagedSectorFile(sectorId) stagedFile, err := sb.stagedSectorFile(sectorId)
@ -202,14 +202,14 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
// TODO: should *really really* return an io.ReadCloser // TODO: should *really really* return an io.ReadCloser
func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) { func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) {
ret := sb.rlimit() ret := sb.RateLimit()
defer ret() defer ret()
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
} }
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
ret := sb.rlimit() ret := sb.RateLimit()
defer ret() defer ret()
cacheDir, err := sb.sectorCacheDir(sectorID) cacheDir, err := sb.sectorCacheDir(sectorID)
@ -252,7 +252,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
} }
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) { func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) {
ret := sb.rlimit() ret := sb.RateLimit()
defer ret() defer ret()
cacheDir, err := sb.sectorCacheDir(sectorID) cacheDir, err := sb.sectorCacheDir(sectorID)

View File

@ -1,16 +1,15 @@
package sectorbuilder_test package sectorbuilder_test
import ( import (
"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"os" "os"
"testing" "testing"
"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
@ -20,6 +19,7 @@ const sectorSize = 1024
func TestSealAndVerify(t *testing.T) { func TestSealAndVerify(t *testing.T) {
//t.Skip("this is slow") //t.Skip("this is slow")
os.Setenv("BELLMAN_NO_GPU", "1") os.Setenv("BELLMAN_NO_GPU", "1")
os.Setenv("RUST_LOG", "info")
build.SectorSizes = []uint64{sectorSize} build.SectorSizes = []uint64{sectorSize}

View File

@ -181,7 +181,7 @@ func Online() Option {
// Full node // Full node
ApplyIf(isType(repo.RepoFullNode), ApplyIf(isType(repo.FullNode),
// TODO: Fix offline mode // TODO: Fix offline mode
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
@ -235,7 +235,7 @@ func Online() Option {
), ),
// Storage miner // Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoStorageMiner }, ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(storage.TicketFn), modules.SealTicketGen), Override(new(storage.TicketFn), modules.SealTicketGen),
@ -266,7 +266,7 @@ func StorageMiner(out *api.StorageMiner) Option {
), ),
func(s *Settings) error { func(s *Settings) error {
s.nodeType = repo.RepoStorageMiner s.nodeType = repo.StorageMiner
return nil return nil
}, },
@ -294,30 +294,34 @@ func ConfigCommon(cfg *config.Common) Option {
) )
} }
func ConfigFullNode(cfg *config.FullNode) Option { func ConfigFullNode(c interface{}) Option {
//ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoFullNode }), cfg, ok := c.(*config.FullNode)
if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
}
return Options( return Options(
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
) )
} }
func configFull(c interface{}) Option { func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option {
cfg, ok := c.(*config.FullNode)
if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
}
return ConfigFullNode(cfg)
}
func configMiner(c interface{}) Option {
cfg, ok := c.(*config.StorageMiner) cfg, ok := c.(*config.StorageMiner)
if !ok { if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
} }
return ConfigCommon(&cfg.Common) path := cfg.SectorBuilder.Path
if path == "" {
path = lr.Path()
}
return Options(
ConfigCommon(&cfg.Common),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, cfg.SectorBuilder.WorkerCount)),
)
} }
func Repo(r repo.Repo) Option { func Repo(r repo.Repo) Option {
@ -334,8 +338,8 @@ func Repo(r repo.Repo) Option {
return Options( return Options(
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
ApplyIf(isType(repo.RepoFullNode), configFull(c)), ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(isType(repo.RepoStorageMiner), configMiner(c)), ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c, lr)),
Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
@ -371,7 +375,7 @@ func New(ctx context.Context, opts ...Option) (StopFunc, error) {
settings := Settings{ settings := Settings{
modules: map[interface{}]fx.Option{}, modules: map[interface{}]fx.Option{},
invokes: make([]fx.Option, _nInvokes), invokes: make([]fx.Option, _nInvokes),
nodeType: repo.RepoFullNode, nodeType: repo.FullNode,
} }
// apply module options in the right order // apply module options in the right order

View File

@ -17,9 +17,13 @@ type FullNode struct {
Metrics Metrics Metrics Metrics
} }
// // Common
// StorageMiner is a storage miner config // StorageMiner is a storage miner config
type StorageMiner struct { type StorageMiner struct {
Common Common
SectorBuilder SectorBuilder
} }
// API contains configs for API endpoint // API contains configs for API endpoint
@ -34,10 +38,19 @@ type Libp2p struct {
BootstrapPeers []string BootstrapPeers []string
} }
// // Full Node
type Metrics struct { type Metrics struct {
Nickname string Nickname string
} }
// // Storage Miner
type SectorBuilder struct {
Path string
WorkerCount uint
}
func defCommon() Common { func defCommon() Common {
return Common{ return Common{
API: API{ API: API{
@ -64,6 +77,10 @@ func DefaultFullNode() *FullNode {
func DefaultStorageMiner() *StorageMiner { func DefaultStorageMiner() *StorageMiner {
return &StorageMiner{ return &StorageMiner{
Common: defCommon(), Common: defCommon(),
SectorBuilder: SectorBuilder{
WorkerCount: 5,
},
} }
} }

View File

@ -4,9 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"crypto/rand" "crypto/rand"
"io/ioutil"
"net/http/httptest" "net/http/httptest"
"os"
"testing" "testing"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
@ -23,7 +21,6 @@ import (
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/lib/jsonrpc"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
@ -34,7 +31,7 @@ import (
func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet) test.TestStorageNode { func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet) test.TestStorageNode {
r := repo.NewMemory(nil) r := repo.NewMemory(nil)
lr, err := r.Lock(repo.RepoStorageMiner) lr, err := r.Lock(repo.StorageMiner)
require.NoError(t, err) require.NoError(t, err)
ks, err := lr.KeyStore() ks, err := lr.KeyStore()
@ -77,10 +74,6 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a
require.NoError(t, err) require.NoError(t, err)
// start node // start node
secbpath, err := ioutil.TempDir(os.TempDir(), "lotust-stortest-sb-")
require.NoError(t, err)
var minerapi api.StorageMiner var minerapi api.StorageMiner
// TODO: use stop // TODO: use stop
@ -92,7 +85,6 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a
node.MockHost(mn), node.MockHost(mn),
node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath, 2)),
node.Override(new(api.FullNode), tnd), node.Override(new(api.FullNode), tnd),
) )
require.NoError(t, err) require.NoError(t, err)

View File

@ -38,15 +38,15 @@ type RepoType int
const ( const (
_ = iota // Default is invalid _ = iota // Default is invalid
RepoFullNode RepoType = iota FullNode RepoType = iota
RepoStorageMiner StorageMiner
) )
func defConfForType(t RepoType) interface{} { func defConfForType(t RepoType) interface{} {
switch t { switch t {
case RepoFullNode: case FullNode:
return config.DefaultFullNode() return config.DefaultFullNode()
case RepoStorageMiner: case StorageMiner:
return config.DefaultStorageMiner() return config.DefaultStorageMiner()
default: default:
panic(fmt.Sprintf("unknown RepoType(%d)", int(t))) panic(fmt.Sprintf("unknown RepoType(%d)", int(t)))

View File

@ -17,7 +17,7 @@ func genFsRepo(t *testing.T) (*FsRepo, func()) {
t.Fatal(err) t.Fatal(err)
} }
err = repo.Init(RepoFullNode) err = repo.Init(FullNode)
if err != ErrRepoExists && err != nil { if err != ErrRepoExists && err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -1,6 +1,8 @@
package repo package repo
import ( import (
"io/ioutil"
"os"
"sync" "sync"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
@ -32,11 +34,20 @@ type lockedMemRepo struct {
t RepoType t RepoType
sync.RWMutex sync.RWMutex
tempDir string
token *byte token *byte
} }
func (lmem *lockedMemRepo) Path() string { func (lmem *lockedMemRepo) Path() string {
return "" t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
if err != nil {
panic(err) // only used in tests, probably fine
}
lmem.Lock()
lmem.tempDir = t
lmem.Unlock()
return t
} }
var _ Repo = &MemRepo{} var _ Repo = &MemRepo{}
@ -127,6 +138,13 @@ func (lmem *lockedMemRepo) Close() error {
return ErrClosedRepo return ErrClosedRepo
} }
if lmem.tempDir != "" {
if err := os.RemoveAll(lmem.tempDir); err != nil {
return err
}
lmem.tempDir = ""
}
lmem.mem.token = nil lmem.mem.token = nil
lmem.mem.api.Lock() lmem.mem.api.Lock()
lmem.mem.api.ma = nil lmem.mem.api.ma = nil

View File

@ -18,12 +18,12 @@ func basicTest(t *testing.T, repo Repo) {
} }
assert.Nil(t, apima, "with no api endpoint, return should be nil") assert.Nil(t, apima, "with no api endpoint, return should be nil")
lrepo, err := repo.Lock(RepoFullNode) lrepo, err := repo.Lock(FullNode)
assert.NoError(t, err, "should be able to lock once") assert.NoError(t, err, "should be able to lock once")
assert.NotNil(t, lrepo, "locked repo shouldn't be nil") assert.NotNil(t, lrepo, "locked repo shouldn't be nil")
{ {
lrepo2, err := repo.Lock(RepoFullNode) lrepo2, err := repo.Lock(FullNode)
if assert.Error(t, err) { if assert.Error(t, err) {
assert.Equal(t, ErrRepoAlreadyLocked, err) assert.Equal(t, ErrRepoAlreadyLocked, err)
} }
@ -33,7 +33,7 @@ func basicTest(t *testing.T, repo Repo) {
err = lrepo.Close() err = lrepo.Close()
assert.NoError(t, err, "should be able to unlock") assert.NoError(t, err, "should be able to unlock")
lrepo, err = repo.Lock(RepoFullNode) lrepo, err = repo.Lock(FullNode)
assert.NoError(t, err, "should be able to relock") assert.NoError(t, err, "should be able to relock")
assert.NotNil(t, lrepo, "locked repo shouldn't be nil") assert.NotNil(t, lrepo, "locked repo shouldn't be nil")
@ -64,7 +64,7 @@ func basicTest(t *testing.T, repo Repo) {
k1 := types.KeyInfo{Type: "foo"} k1 := types.KeyInfo{Type: "foo"}
k2 := types.KeyInfo{Type: "bar"} k2 := types.KeyInfo{Type: "bar"}
lrepo, err = repo.Lock(RepoFullNode) lrepo, err = repo.Lock(FullNode)
assert.NoError(t, err, "should be able to relock") assert.NoError(t, err, "should be able to relock")
assert.NotNil(t, lrepo, "locked repo shouldn't be nil") assert.NotNil(t, lrepo, "locked repo shouldn't be nil")

View File

@ -23,7 +23,10 @@ func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, existingPiece
deals := make([]actors.StorageDeal, len(sizes)) deals := make([]actors.StorageDeal, len(sizes))
for i, size := range sizes { for i, size := range sizes {
release := m.sb.RateLimit()
commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size)
release()
if err != nil { if err != nil {
return nil, err return nil, err
} }