Merge pull request #1217 from filecoin-project/feat/worker-prefetch

worker: Prefetch data in background
This commit is contained in:
Łukasz Magiera 2020-02-05 03:09:24 +01:00 committed by GitHub
commit e7a1be4dde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 33 deletions

View File

@ -2,23 +2,32 @@ package main
import (
"os"
"sync"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/mitchellh/go-homedir"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/repo"
manet "github.com/multiformats/go-multiaddr-net"
)
var log = logging.Logger("main")
const (
workers = 1 // TODO: Configurability
transfers = 1
)
func main() {
lotuslog.SetupLogLevels()
@ -67,6 +76,11 @@ func main() {
}
}
type limits struct {
workLimit chan struct{}
transferLimit chan struct{}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
@ -106,6 +120,50 @@ var runCmd = &cli.Command{
log.Warn("Shutting down..")
}()
return acceptJobs(ctx, nodeApi, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit"))
limiter := &limits{
workLimit: make(chan struct{}, workers),
transferLimit: make(chan struct{}, transfers),
}
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
if err := paramfetch.GetParams(build.ParametersJson(), ssize); err != nil {
return xerrors.Errorf("get params: %w", err)
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: workers,
Paths: sectorbuilder.SimplePath(r),
})
if err != nil {
return err
}
nQueues := workers + transfers
var wg sync.WaitGroup
wg.Add(nQueues)
for i := 0; i < nQueues; i++ {
go func() {
defer wg.Done()
if err := acceptJobs(ctx, nodeApi, sb, limiter, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")); err != nil {
log.Warnf("%+v", err)
return
}
}()
}
wg.Wait()
return nil
},
}

View File

@ -4,12 +4,10 @@ import (
"context"
"net/http"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"golang.org/x/xerrors"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
)
type worker struct {
@ -18,39 +16,19 @@ type worker struct {
repo string
auth http.Header
sb *sectorbuilder.SectorBuilder
limiter *limits
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
act, err := api.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := api.ActorSectorSize(ctx, act)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: 1,
Paths: sectorbuilder.SimplePath(repo),
})
if err != nil {
return err
}
if err := paramfetch.GetParams(build.ParametersJson(), ssize); err != nil {
return xerrors.Errorf("get params: %w", err)
}
func acceptJobs(ctx context.Context, api lapi.StorageMiner, sb *sectorbuilder.SectorBuilder, limiter *limits, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
w := &worker{
api: api,
minerEndpoint: endpoint,
auth: auth,
repo: repo,
sb: sb,
limiter: limiter,
sb: sb,
}
tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{
@ -103,7 +81,10 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
switch task.Type {
case sectorbuilder.WorkerPreCommit:
w.limiter.workLimit <- struct{}{}
rspco, err := w.sb.SealPreCommit(ctx, task.SectorID, task.SealTicket, task.Pieces)
<-w.limiter.workLimit
if err != nil {
return errRes(xerrors.Errorf("precomitting: %w", err))
}
@ -121,7 +102,10 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
return errRes(xerrors.Errorf("cleaning up staged sector: %w", err))
}
case sectorbuilder.WorkerCommit:
w.limiter.workLimit <- struct{}{}
proof, err := w.sb.SealCommit(ctx, task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
<-w.limiter.workLimit
if err != nil {
return errRes(xerrors.Errorf("comitting: %w", err))
}

View File

@ -78,6 +78,11 @@ func (w *worker) fetch(typ string, sectorID uint64) error {
}
func (w *worker) push(typ string, sectorID uint64) error {
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID)
if err != nil {
return err
@ -147,6 +152,11 @@ func (w *worker) remove(typ string, sectorID uint64) error {
}
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
var err error
switch typ {
case sectorbuilder.WorkerPreCommit:

View File

@ -7,7 +7,6 @@ import (
"math"
"math/bits"
"math/rand"
"runtime"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"golang.org/x/xerrors"
@ -41,7 +40,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
deals := make([]actors.StorageDealProposal, len(sizes))
for i, size := range sizes {
commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU()))
commP, err := m.fastPledgeCommitment(size, uint64(1))
if err != nil {
return nil, err
}
@ -101,7 +100,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie
out := make([]Piece, len(sizes))
for i, size := range sizes {
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes)
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(1)), existingPieceSizes)
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}