diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 6170f8239..208d64eb5 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -2,23 +2,32 @@ package main import ( "os" + "sync" + paramfetch "github.com/filecoin-project/go-paramfetch" + "github.com/filecoin-project/go-sectorbuilder" "github.com/mitchellh/go-homedir" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + manet "github.com/multiformats/go-multiaddr-net" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/node/repo" - manet "github.com/multiformats/go-multiaddr-net" ) var log = logging.Logger("main") +const ( + workers = 1 // TODO: Configurability + transfers = 1 +) + func main() { lotuslog.SetupLogLevels() @@ -67,6 +76,11 @@ func main() { } } +type limits struct { + workLimit chan struct{} + transferLimit chan struct{} +} + var runCmd = &cli.Command{ Name: "run", Usage: "Start lotus worker", @@ -106,6 +120,50 @@ var runCmd = &cli.Command{ log.Warn("Shutting down..") }() - return acceptJobs(ctx, nodeApi, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")) + limiter := &limits{ + workLimit: make(chan struct{}, workers), + transferLimit: make(chan struct{}, transfers), + } + + act, err := nodeApi.ActorAddress(ctx) + if err != nil { + return err + } + ssize, err := nodeApi.ActorSectorSize(ctx, act) + if err != nil { + return err + } + + if err := paramfetch.GetParams(build.ParametersJson(), ssize); err != nil { + return xerrors.Errorf("get params: %w", err) + } + + sb, err := sectorbuilder.NewStandalone(§orbuilder.Config{ + SectorSize: ssize, + Miner: act, + WorkerThreads: workers, + Paths: sectorbuilder.SimplePath(r), + }) + if err != nil { + return err + } + + nQueues := workers + transfers + var wg sync.WaitGroup + wg.Add(nQueues) + + for i := 0; i < nQueues; i++ { + go func() { + defer wg.Done() + + if err := acceptJobs(ctx, nodeApi, sb, limiter, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")); err != nil { + log.Warnf("%+v", err) + return + } + }() + } + + wg.Wait() + return nil }, } diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index ec5d88870..0c2d233de 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -4,12 +4,10 @@ import ( "context" "net/http" - paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-sectorbuilder" "golang.org/x/xerrors" lapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" ) type worker struct { @@ -18,39 +16,19 @@ type worker struct { repo string auth http.Header - sb *sectorbuilder.SectorBuilder + limiter *limits + sb *sectorbuilder.SectorBuilder } -func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error { - act, err := api.ActorAddress(ctx) - if err != nil { - return err - } - ssize, err := api.ActorSectorSize(ctx, act) - if err != nil { - return err - } - - sb, err := sectorbuilder.NewStandalone(§orbuilder.Config{ - SectorSize: ssize, - Miner: act, - WorkerThreads: 1, - Paths: sectorbuilder.SimplePath(repo), - }) - if err != nil { - return err - } - - if err := paramfetch.GetParams(build.ParametersJson(), ssize); err != nil { - return xerrors.Errorf("get params: %w", err) - } - +func acceptJobs(ctx context.Context, api lapi.StorageMiner, sb *sectorbuilder.SectorBuilder, limiter *limits, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error { w := &worker{ api: api, minerEndpoint: endpoint, auth: auth, repo: repo, - sb: sb, + + limiter: limiter, + sb: sb, } tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{ @@ -103,7 +81,10 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) switch task.Type { case sectorbuilder.WorkerPreCommit: + w.limiter.workLimit <- struct{}{} rspco, err := w.sb.SealPreCommit(ctx, task.SectorID, task.SealTicket, task.Pieces) + <-w.limiter.workLimit + if err != nil { return errRes(xerrors.Errorf("precomitting: %w", err)) } @@ -121,7 +102,10 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return errRes(xerrors.Errorf("cleaning up staged sector: %w", err)) } case sectorbuilder.WorkerCommit: + w.limiter.workLimit <- struct{}{} proof, err := w.sb.SealCommit(ctx, task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco) + <-w.limiter.workLimit + if err != nil { return errRes(xerrors.Errorf("comitting: %w", err)) } diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index e9e8e760f..6091a628f 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -78,6 +78,11 @@ func (w *worker) fetch(typ string, sectorID uint64) error { } func (w *worker) push(typ string, sectorID uint64) error { + w.limiter.transferLimit <- struct{}{} + defer func() { + <-w.limiter.transferLimit + }() + filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID) if err != nil { return err @@ -147,6 +152,11 @@ func (w *worker) remove(typ string, sectorID uint64) error { } func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { + w.limiter.transferLimit <- struct{}{} + defer func() { + <-w.limiter.transferLimit + }() + var err error switch typ { case sectorbuilder.WorkerPreCommit: diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 3274f6aec..ca8c1e454 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -7,7 +7,6 @@ import ( "math" "math/bits" "math/rand" - "runtime" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "golang.org/x/xerrors" @@ -41,7 +40,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie deals := make([]actors.StorageDealProposal, len(sizes)) for i, size := range sizes { - commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU())) + commP, err := m.fastPledgeCommitment(size, uint64(1)) if err != nil { return nil, err } @@ -101,7 +100,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(1)), existingPieceSizes) if err != nil { return nil, xerrors.Errorf("add piece: %w", err) }