diff --git a/build/params.go b/build/params.go index 0c4908b74..2545bd33a 100644 --- a/build/params.go +++ b/build/params.go @@ -125,5 +125,5 @@ func init() { const BadBlockCacheSize = 8192 // assuming 4000 blocks per round, this lets us not lose any messages across a -// 10 block reorg. +// 10 block reorg. const BlsSignatureCacheSize = 40000 diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 0d951f1fe..c7866c81e 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -75,7 +75,7 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) { func NewGenerator() (*ChainGen, error) { mr := repo.NewMemory(nil) - lr, err := mr.Lock(repo.RepoStorageMiner) + lr, err := mr.Lock(repo.StorageMiner) if err != nil { return nil, xerrors.Errorf("taking mem-repo lock failed: %w", err) } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 5943d6dac..3cddbf866 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -101,7 +101,7 @@ var initCmd = &cli.Command{ log.Info("Initializing repo") - if err := r.Init(repo.RepoStorageMiner); err != nil { + if err := r.Init(repo.StorageMiner); err != nil { return err } @@ -122,7 +122,7 @@ var initCmd = &cli.Command{ } func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, r repo.Repo) error { - lr, err := r.Lock(repo.RepoStorageMiner) + lr, err := r.Lock(repo.StorageMiner) if err != nil { return err } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index c8a10baac..7f978806e 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -17,9 +17,7 @@ import ( lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/auth" "github.com/filecoin-project/lotus/lib/jsonrpc" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/repo" ) @@ -84,7 +82,6 @@ var runCmd = &cli.Command{ } return lr.SetAPIEndpoint(apima) }), - node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath, 5)), // TODO: grab worker count from config node.Override(new(api.FullNode), nodeApi), ) if err != nil { diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 4921f3930..5a978a105 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -54,7 +54,7 @@ var DaemonCmd = &cli.Command{ return err } - if err := r.Init(repo.RepoFullNode); err != nil && err != repo.ErrRepoExists { + if err := r.Init(repo.FullNode); err != nil && err != repo.ErrRepoExists { return err } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 35f37ac2d..716cefe15 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -127,7 +127,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { return sb, nil } -func (sb *SectorBuilder) rlimit() func() { +func (sb *SectorBuilder) RateLimit() func() { if cap(sb.rateLimit) == len(sb.rateLimit) { log.Warn("rate-limiting sectorbuilder call") } @@ -173,7 +173,7 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea return PublicPieceInfo{}, err } - ret := sb.rlimit() + ret := sb.RateLimit() defer ret() stagedFile, err := sb.stagedSectorFile(sectorId) @@ -202,14 +202,14 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea // TODO: should *really really* return an io.ReadCloser func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) { - ret := sb.rlimit() + ret := sb.RateLimit() defer ret() return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) } func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { - ret := sb.rlimit() + ret := sb.RateLimit() defer ret() cacheDir, err := sb.sectorCacheDir(sectorID) @@ -252,7 +252,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece } func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) { - ret := sb.rlimit() + ret := sb.RateLimit() defer ret() cacheDir, err := sb.sectorCacheDir(sectorID) diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index bb965c2bb..a5e394b36 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -1,16 +1,15 @@ package sectorbuilder_test import ( + "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "io" "io/ioutil" "math/rand" "os" "testing" - "github.com/ipfs/go-datastore" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) @@ -20,6 +19,7 @@ const sectorSize = 1024 func TestSealAndVerify(t *testing.T) { //t.Skip("this is slow") os.Setenv("BELLMAN_NO_GPU", "1") + os.Setenv("RUST_LOG", "info") build.SectorSizes = []uint64{sectorSize} diff --git a/node/builder.go b/node/builder.go index 12a2f7e79..1bf6881fc 100644 --- a/node/builder.go +++ b/node/builder.go @@ -181,7 +181,7 @@ func Online() Option { // Full node - ApplyIf(isType(repo.RepoFullNode), + ApplyIf(isType(repo.FullNode), // TODO: Fix offline mode Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), @@ -235,7 +235,7 @@ func Online() Option { ), // Storage miner - ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoStorageMiner }, + ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(storage.TicketFn), modules.SealTicketGen), @@ -266,7 +266,7 @@ func StorageMiner(out *api.StorageMiner) Option { ), func(s *Settings) error { - s.nodeType = repo.RepoStorageMiner + s.nodeType = repo.StorageMiner return nil }, @@ -294,30 +294,34 @@ func ConfigCommon(cfg *config.Common) Option { ) } -func ConfigFullNode(cfg *config.FullNode) Option { - //ApplyIf(func(s *Settings) bool { return s.nodeType == repo.RepoFullNode }), +func ConfigFullNode(c interface{}) Option { + cfg, ok := c.(*config.FullNode) + if !ok { + return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) + } + return Options( ConfigCommon(&cfg.Common), Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), ) } -func configFull(c interface{}) Option { - cfg, ok := c.(*config.FullNode) - if !ok { - return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) - } - - return ConfigFullNode(cfg) -} - -func configMiner(c interface{}) Option { +func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option { cfg, ok := c.(*config.StorageMiner) if !ok { return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) } - return ConfigCommon(&cfg.Common) + path := cfg.SectorBuilder.Path + if path == "" { + path = lr.Path() + } + + return Options( + ConfigCommon(&cfg.Common), + + Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, cfg.SectorBuilder.WorkerCount)), + ) } func Repo(r repo.Repo) Option { @@ -334,8 +338,8 @@ func Repo(r repo.Repo) Option { return Options( Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing - ApplyIf(isType(repo.RepoFullNode), configFull(c)), - ApplyIf(isType(repo.RepoStorageMiner), configMiner(c)), + ApplyIf(isType(repo.FullNode), ConfigFullNode(c)), + ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c, lr)), Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), @@ -371,7 +375,7 @@ func New(ctx context.Context, opts ...Option) (StopFunc, error) { settings := Settings{ modules: map[interface{}]fx.Option{}, invokes: make([]fx.Option, _nInvokes), - nodeType: repo.RepoFullNode, + nodeType: repo.FullNode, } // apply module options in the right order diff --git a/node/config/def.go b/node/config/def.go index 93a876656..928ea1d6d 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -17,9 +17,13 @@ type FullNode struct { Metrics Metrics } +// // Common + // StorageMiner is a storage miner config type StorageMiner struct { Common + + SectorBuilder SectorBuilder } // API contains configs for API endpoint @@ -34,10 +38,19 @@ type Libp2p struct { BootstrapPeers []string } +// // Full Node + type Metrics struct { Nickname string } +// // Storage Miner + +type SectorBuilder struct { + Path string + WorkerCount uint +} + func defCommon() Common { return Common{ API: API{ @@ -64,6 +77,10 @@ func DefaultFullNode() *FullNode { func DefaultStorageMiner() *StorageMiner { return &StorageMiner{ Common: defCommon(), + + SectorBuilder: SectorBuilder{ + WorkerCount: 5, + }, } } diff --git a/node/node_test.go b/node/node_test.go index 1d8c20b03..a1ee38c75 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,9 +4,7 @@ import ( "bytes" "context" "crypto/rand" - "io/ioutil" "net/http/httptest" - "os" "testing" "github.com/libp2p/go-libp2p-core/crypto" @@ -23,7 +21,6 @@ import ( "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/jsonrpc" - "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" @@ -34,7 +31,7 @@ import ( func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet) test.TestStorageNode { r := repo.NewMemory(nil) - lr, err := r.Lock(repo.RepoStorageMiner) + lr, err := r.Lock(repo.StorageMiner) require.NoError(t, err) ks, err := lr.KeyStore() @@ -77,10 +74,6 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a require.NoError(t, err) // start node - - secbpath, err := ioutil.TempDir(os.TempDir(), "lotust-stortest-sb-") - require.NoError(t, err) - var minerapi api.StorageMiner // TODO: use stop @@ -92,7 +85,6 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a node.MockHost(mn), - node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath, 2)), node.Override(new(api.FullNode), tnd), ) require.NoError(t, err) diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 0a81e6513..cce67ba8a 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -37,16 +37,16 @@ const ( type RepoType int const ( - _ = iota // Default is invalid - RepoFullNode RepoType = iota - RepoStorageMiner + _ = iota // Default is invalid + FullNode RepoType = iota + StorageMiner ) func defConfForType(t RepoType) interface{} { switch t { - case RepoFullNode: + case FullNode: return config.DefaultFullNode() - case RepoStorageMiner: + case StorageMiner: return config.DefaultStorageMiner() default: panic(fmt.Sprintf("unknown RepoType(%d)", int(t))) diff --git a/node/repo/fsrepo_test.go b/node/repo/fsrepo_test.go index de55eb351..bd03cc084 100644 --- a/node/repo/fsrepo_test.go +++ b/node/repo/fsrepo_test.go @@ -17,7 +17,7 @@ func genFsRepo(t *testing.T) (*FsRepo, func()) { t.Fatal(err) } - err = repo.Init(RepoFullNode) + err = repo.Init(FullNode) if err != ErrRepoExists && err != nil { t.Fatal(err) } diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 00f6de424..6cc54ddb9 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -1,6 +1,8 @@ package repo import ( + "io/ioutil" + "os" "sync" "github.com/ipfs/go-datastore" @@ -32,11 +34,20 @@ type lockedMemRepo struct { t RepoType sync.RWMutex - token *byte + tempDir string + token *byte } func (lmem *lockedMemRepo) Path() string { - return "" + t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-") + if err != nil { + panic(err) // only used in tests, probably fine + } + + lmem.Lock() + lmem.tempDir = t + lmem.Unlock() + return t } var _ Repo = &MemRepo{} @@ -127,6 +138,13 @@ func (lmem *lockedMemRepo) Close() error { return ErrClosedRepo } + if lmem.tempDir != "" { + if err := os.RemoveAll(lmem.tempDir); err != nil { + return err + } + lmem.tempDir = "" + } + lmem.mem.token = nil lmem.mem.api.Lock() lmem.mem.api.ma = nil diff --git a/node/repo/repo_test.go b/node/repo/repo_test.go index d9cdc5dc5..9c43e8f4b 100644 --- a/node/repo/repo_test.go +++ b/node/repo/repo_test.go @@ -18,12 +18,12 @@ func basicTest(t *testing.T, repo Repo) { } assert.Nil(t, apima, "with no api endpoint, return should be nil") - lrepo, err := repo.Lock(RepoFullNode) + lrepo, err := repo.Lock(FullNode) assert.NoError(t, err, "should be able to lock once") assert.NotNil(t, lrepo, "locked repo shouldn't be nil") { - lrepo2, err := repo.Lock(RepoFullNode) + lrepo2, err := repo.Lock(FullNode) if assert.Error(t, err) { assert.Equal(t, ErrRepoAlreadyLocked, err) } @@ -33,7 +33,7 @@ func basicTest(t *testing.T, repo Repo) { err = lrepo.Close() assert.NoError(t, err, "should be able to unlock") - lrepo, err = repo.Lock(RepoFullNode) + lrepo, err = repo.Lock(FullNode) assert.NoError(t, err, "should be able to relock") assert.NotNil(t, lrepo, "locked repo shouldn't be nil") @@ -64,7 +64,7 @@ func basicTest(t *testing.T, repo Repo) { k1 := types.KeyInfo{Type: "foo"} k2 := types.KeyInfo{Type: "bar"} - lrepo, err = repo.Lock(RepoFullNode) + lrepo, err = repo.Lock(FullNode) assert.NoError(t, err, "should be able to relock") assert.NotNil(t, lrepo, "locked repo shouldn't be nil") diff --git a/storage/garbage.go b/storage/garbage.go index f61ff50cc..ab41d6ff0 100644 --- a/storage/garbage.go +++ b/storage/garbage.go @@ -23,7 +23,10 @@ func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, existingPiece deals := make([]actors.StorageDeal, len(sizes)) for i, size := range sizes { + release := m.sb.RateLimit() commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) + release() + if err != nil { return nil, err }