diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index eff0cbe10..1b2a53a80 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -77,7 +77,7 @@ var runCmd = &cli.Command{ } return lr.SetAPIEndpoint(apima) }), - node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath)), + node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(storageRepoPath, 5)), // TODO: grab worker count from config node.Override(new(api.FullNode), nodeApi), ) if err != nil { diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 40e88ccc6..2e6c67ba6 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -8,12 +8,14 @@ import ( "unsafe" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/address" ) +const PoStReservedWorkers = 1 + var log = logging.Logger("sectorbuilder") type SectorSealingStatus = sectorbuilder.SectorSealingStatus @@ -38,6 +40,8 @@ const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { handle unsafe.Pointer + + rateLimit chan struct{} } type Config struct { @@ -53,6 +57,10 @@ type Config struct { } func New(cfg *Config) (*SectorBuilder, error) { + if cfg.WorkerThreads <= PoStReservedWorkers { + return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers + 1, cfg.WorkerThreads) + } + proverId := addressToProverID(cfg.Miner) sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) @@ -62,9 +70,21 @@ func New(cfg *Config) (*SectorBuilder, error) { return &SectorBuilder{ handle: sbp, + rateLimit: make(chan struct{}, cfg.WorkerThreads - PoStReservedWorkers), }, nil } +func (sb *SectorBuilder) rlimit() func() { + if cap(sb.rateLimit) == len(sb.rateLimit) { + log.Warn("rate-limiting sectorbuilder call") + } + sb.rateLimit <- struct{}{} + + return func() { + <-sb.rateLimit + } +} + func addressToProverID(a address.Address) [32]byte { var proverId [32]byte copy(proverId[:], a.Payload()) @@ -81,6 +101,9 @@ func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Rea return 0, err } + ret := sb.rlimit() + defer ret() + sectorID, err := sectorbuilder.AddPieceFromFile(sb.handle, pieceKey, pieceSize, f) if err != nil { return 0, err @@ -91,18 +114,30 @@ func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Rea // TODO: should *really really* return an io.ReadCloser func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) { + ret := sb.rlimit() + defer ret() + return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) } func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket) (SealPreCommitOutput, error) { + ret := sb.rlimit() + defer ret() + return sectorbuilder.SealPreCommit(sb.handle, sectorID, ticket) } func (sb *SectorBuilder) SealCommit(sectorID uint64, seed SealSeed) (SealCommitOutput, error) { + ret := sb.rlimit() + defer ret() + return sectorbuilder.SealCommit(sb.handle, sectorID, seed) } func (sb *SectorBuilder) ResumeSealCommit(sectorID uint64) (SealCommitOutput, error) { + ret := sb.rlimit() + defer ret() + return sectorbuilder.ResumeSealCommit(sb.handle, sectorID) } diff --git a/node/node_test.go b/node/node_test.go index e361bb5fa..60ffc59fc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -93,7 +93,7 @@ func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, a node.Repo(r), node.Test(), - node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath)), + node.Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(secbpath, 2)), node.Override(new(api.FullNode), tnd), ) require.NoError(t, err)