worker: Prefetch data in background

This commit is contained in:
Łukasz Magiera 2020-02-04 20:04:49 +01:00
parent 6eb342b8ee
commit 1a9c775407
3 changed files with 76 additions and 24 deletions

View File

@ -2,7 +2,9 @@ package main
import (
"os"
"sync"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/mitchellh/go-homedir"
logging "github.com/ipfs/go-log/v2"
@ -19,6 +21,11 @@ import (
var log = logging.Logger("main")
const (
workers = 1 // TODO: Configurability
transfers = 1
)
func main() {
lotuslog.SetupLogLevels()
@ -67,6 +74,11 @@ func main() {
}
}
type limits struct {
workLimit chan struct{}
transferLimit chan struct{}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
@ -106,6 +118,46 @@ var runCmd = &cli.Command{
log.Warn("Shutting down..")
}()
return acceptJobs(ctx, nodeApi, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit"))
limiter := &limits{
workLimit: make(chan struct{}, workers),
transferLimit: make(chan struct{}, transfers),
}
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: workers,
Paths: sectorbuilder.SimplePath(r),
})
if err != nil {
return err
}
nQueues := workers + transfers
var wg sync.WaitGroup
wg.Add(nQueues)
for i := 0; i < nQueues; i++ {
go func() {
defer wg.Done()
if err := acceptJobs(ctx, nodeApi, sb, limiter, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")); err != nil {
log.Warnf("%+v", err)
return
}
}()
}
wg.Wait()
return nil
},
}

View File

@ -18,30 +18,12 @@ type worker struct {
repo string
auth http.Header
sb *sectorbuilder.SectorBuilder
limiter *limits
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
act, err := api.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := api.ActorSectorSize(ctx, act)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: 1,
Paths: sectorbuilder.SimplePath(repo),
})
if err != nil {
return err
}
if err := paramfetch.GetParams(build.ParametersJson(), ssize); err != nil {
func acceptJobs(ctx context.Context, api lapi.StorageMiner, sb *sectorbuilder.SectorBuilder, limiter *limits, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
if err := paramfetch.GetParams(build.ParametersJson(), sb.SectorSize()); err != nil {
return xerrors.Errorf("get params: %w", err)
}
@ -50,7 +32,9 @@ func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, aut
minerEndpoint: endpoint,
auth: auth,
repo: repo,
sb: sb,
limiter: limiter,
sb: sb,
}
tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{
@ -103,7 +87,10 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
switch task.Type {
case sectorbuilder.WorkerPreCommit:
w.limiter.workLimit <- struct{}{}
rspco, err := w.sb.SealPreCommit(ctx, task.SectorID, task.SealTicket, task.Pieces)
<-w.limiter.workLimit
if err != nil {
return errRes(xerrors.Errorf("precomitting: %w", err))
}
@ -121,7 +108,10 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
return errRes(xerrors.Errorf("cleaning up staged sector: %w", err))
}
case sectorbuilder.WorkerCommit:
w.limiter.workLimit <- struct{}{}
proof, err := w.sb.SealCommit(ctx, task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
<-w.limiter.workLimit
if err != nil {
return errRes(xerrors.Errorf("comitting: %w", err))
}

View File

@ -78,6 +78,11 @@ func (w *worker) fetch(typ string, sectorID uint64) error {
}
func (w *worker) push(typ string, sectorID uint64) error {
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID)
if err != nil {
return err
@ -147,6 +152,11 @@ func (w *worker) remove(typ string, sectorID uint64) error {
}
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
var err error
switch typ {
case sectorbuilder.WorkerPreCommit: