wip remote sectorbuilder workers

This commit is contained in:
Łukasz Magiera 2019-11-21 01:52:59 +01:00
parent 6b36f19a9d
commit 9725eb78bf
9 changed files with 485 additions and 65 deletions

View File

@ -66,6 +66,11 @@ type StorageMiner interface {
SectorsRefs(context.Context) (map[string][]SealedRef, error)
WorkerStats(context.Context) (WorkerStats, error)
// WorkerQueue registers a remote worker
WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
}
type WorkerStats struct {

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network"
@ -141,6 +142,9 @@ type StorageMinerStruct struct {
SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"`
WorkerStats func(context.Context) (WorkerStats, error) `perm:"read"`
WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
}
}
@ -522,6 +526,14 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (WorkerStats, erro
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx)
}
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return c.Internal.WorkerDone(ctx, task, res)
}
var _ Common = &CommonStruct{}
var _ FullNode = &FullNodeStruct{}
var _ StorageMiner = &StorageMinerStruct{}

85
cmd/lotus-worker/main.go Normal file
View File

@ -0,0 +1,85 @@
package lotus_worker
import (
"net/http"
"os"
"github.com/filecoin-project/lotus/api"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
)
var log = logging.Logger("main")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting lotus worker")
local := []*cli.Command{
runCmd,
}
app := &cli.App{
Name: "lotus-worker",
Usage: "Remote storage miner worker",
Version: build.Version,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
},
&cli.StringFlag{
Name: "minerrepo",
EnvVars: []string{"LOTUS_MINER_PATH"},
Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME
},
},
Commands: local,
}
if err := app.Run(os.Args); err != nil {
log.Warn(err)
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus fountain",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "pullEndpoint",
Value: "127.0.0.1:30003",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
go func() {
<-ctx.Done()
os.Exit(0)
}()
return http.ListenAndServe(cctx.String("pullEndpoint"), nil)
},
}

133
cmd/lotus-worker/sub.go Normal file
View File

@ -0,0 +1,133 @@
package lotus_worker
import (
"context"
"golang.org/x/xerrors"
"io"
"net/http"
"os"
"path/filepath"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type worker struct {
api api.StorageMiner
minerEndpoint string
repo string
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api api.StorageMiner) error {
w := &worker{
api: api,
}
tasks, err := api.WorkerQueue(ctx)
if err != nil {
return err
}
for task := range tasks {
res := w.processTask(ctx, task)
api.WorkerDone(ctx)
}
}
func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes {
switch task.Type {
case sectorbuilder.WorkerPreCommit:
case sectorbuilder.WorkerCommit:
default:
return errRes(xerrors.Errorf("unknown task type %d", task.Type))
}
if err := w.fetchSector(task.SectorID, task.Type); err != nil {
return errRes(err)
}
var res sectorbuilder.SealRes
switch task.Type {
case sectorbuilder.WorkerPreCommit:
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.PublicPieceInfo)
if err != nil {
return errRes(err)
}
res.Rspco = rspco
if err := w.push("sealed", task.SectorID); err != nil {
return errRes(err)
}
case sectorbuilder.WorkerCommit:
}
return res
}
func (w *worker) fetch(typ string, sectorID uint64) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
resp, err := http.Get(w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID))
if err != nil {
return err
}
defer resp.Body.Close()
out, err := os.Create(outname)
if err != nil {
return err
}
defer out.Close()
// TODO: progress bar
_, err = io.Copy(out, resp.Body)
return err
}
func (w *worker) push(typ string, sectorID uint64) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
f, err := os.OpenFile(outname, os.O_RDONLY, 0644)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", w.minerEndpoint+"/remote/"+typ+"/"+w.sb.SectorName(sectorID), f)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return resp.Body.Close()
}
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
var err error
switch typ {
case sectorbuilder.WorkerPreCommit:
err = w.fetch("staged", sectorID)
case sectorbuilder.WorkerCommit:
panic("todo")
}
if err != nil {
return xerrors.Errorf("fetch failed: %w", err)
}
return nil
}
func errRes(err error) sectorbuilder.SealRes {
return sectorbuilder.SealRes{Err: err}
}

View File

@ -10,20 +10,20 @@ import (
"golang.org/x/xerrors"
)
func (sb *SectorBuilder) sectorName(sectorID uint64) string {
func (sb *SectorBuilder) SectorName(sectorID uint64) string {
return fmt.Sprintf("s-%s-%d", sb.Miner, sectorID)
}
func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.stagedDir, sb.sectorName(sectorID))
func (sb *SectorBuilder) StagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.stagedDir, sb.SectorName(sectorID))
}
func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
return os.OpenFile(sb.StagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
}
func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) {
path := filepath.Join(sb.sealedDir, sb.sectorName(sectorID))
func (sb *SectorBuilder) SealedSectorPath(sectorID uint64) (string, error) {
path := filepath.Join(sb.sealedDir, sb.SectorName(sectorID))
e, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
@ -34,7 +34,7 @@ func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) {
}
func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
dir := filepath.Join(sb.cacheDir, sb.sectorName(sectorID))
dir := filepath.Join(sb.cacheDir, sb.SectorName(sectorID))
err := os.Mkdir(dir, 0755)
if os.IsExist(err) {

128
lib/sectorbuilder/remote.go Normal file
View File

@ -0,0 +1,128 @@
package sectorbuilder
import (
"context"
"golang.org/x/xerrors"
)
type WorkerTaskType int
const (
WorkerPreCommit WorkerTaskType = iota
WorkerCommit
)
type WorkerTask struct {
Type WorkerTaskType
TaskID uint64
SectorID uint64
// preCommit
SealTicket SealTicket
PublicPieceInfo []PublicPieceInfo
}
type workerCall struct {
task WorkerTask
ret chan<- SealRes
}
func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) {
sb.remoteLk.Lock()
taskCh := make(chan WorkerTask)
r := &remote{
sealTasks: taskCh,
busy: 0,
}
sb.remotes = append(sb.remotes, r)
go sb.remoteWorker(ctx, r)
sb.remoteLk.Unlock()
return taskCh, nil
}
func (sb *SectorBuilder) returnTask(task workerCall) {
go func() {
select {
case sb.sealTasks <- task:
case <-sb.stopping:
return
}
}()
}
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
defer log.Warn("Remote worker disconnected")
for {
select {
case task := <-sb.sealTasks:
resCh := make(chan SealRes)
sb.remoteLk.Lock()
sb.remoteResults[task.task.TaskID] = resCh
sb.remoteLk.Unlock()
// send the task
select {
case r.sealTasks <- task.task:
case <-ctx.Done():
sb.returnTask(task)
return
}
r.lk.Lock()
r.busy = task.task.TaskID
r.lk.Unlock()
// wait for the result
select {
case res := <-resCh:
// send the result back to the caller
select {
case task.ret <- res:
case <-ctx.Done():
return
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
case <-sb.stopping:
return
}
case <-ctx.Done():
return
case <-sb.stopping:
return
}
}
}
func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error {
sb.remoteLk.Lock()
rres, ok := sb.remoteResults[task]
if ok {
delete(sb.remoteResults, task)
}
sb.remoteLk.Unlock()
if !ok {
return xerrors.Errorf("task %d not found", task)
}
select {
case rres <- res:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -1,7 +1,6 @@
package sectorbuilder
import (
"context"
"fmt"
"io"
"os"
@ -12,7 +11,6 @@ import (
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/address"
@ -61,7 +59,30 @@ type SectorBuilder struct {
sealedDir string
cacheDir string
sealLocal bool
rateLimit chan struct{}
sealTasks chan workerCall
remoteLk sync.Mutex
remotes []*remote
remoteResults map[uint64]chan<- SealRes
stopping chan struct{}
}
type SealRes struct {
Err error `json:"omitempty"`
Proof []byte `json:"omitempty"`
Rspco RawSealPreCommitOutput `json:"omitempty"`
}
type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64
}
type Config struct {
@ -77,8 +98,8 @@ type Config struct {
}
func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
if cfg.WorkerThreads <= PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
if cfg.WorkerThreads < PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads)
}
proverId := addressToProverID(cfg.Miner)
@ -111,6 +132,14 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
return nil, err
}
rlimit := cfg.WorkerThreads - PoStReservedWorkers
sealLocal := rlimit > 0
if rlimit == 0 {
rlimit = 1
}
sb := &SectorBuilder{
handle: sbp,
ds: ds,
@ -121,8 +150,13 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
sealedDir: cfg.SealedDir,
cacheDir: cfg.CacheDir,
Miner: cfg.Miner,
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
Miner: cfg.Miner,
sealLocal: sealLocal,
rateLimit: make(chan struct{}, rlimit),
sealTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
}
return sb, nil
@ -218,7 +252,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, err
}
sealedPath, err := sb.sealedSectorPath(sectorID)
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
}
@ -232,7 +266,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
stagedPath := sb.stagedSectorPath(sectorID)
stagedPath := sb.StagedSectorPath(sectorID)
rspco, err := sectorbuilder.StandaloneSealPreCommit(
sb.ssize,
@ -285,7 +319,7 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
}
}
sealedPath, err := sb.sealedSectorPath(sectorID)
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return nil, err
}
@ -310,53 +344,6 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
return proof, nil
}
func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
// Wait, this is a blocking method with no way of interrupting it?
// does it checkpoint itself?
return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults)
}
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
copy(ticketa[:], ticket)
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo {
return sectorbuilder.NewSortedSectorInfo(sectors...)
}
func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, proof []byte, faults []uint64) (bool, error) {
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeed, proof, faults)
}
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}

View File

@ -0,0 +1,62 @@
package sectorbuilder
import (
"context"
"io"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"go.opencensus.io/trace"
"github.com/filecoin-project/lotus/chain/address"
)
func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
// Wait, this is a blocking method with no way of interrupting it?
// does it checkpoint itself?
return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults)
}
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
copy(ticketa[:], ticket)
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo {
return sectorbuilder.NewSortedSectorInfo(sectors...)
}
func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, proof []byte, faults []uint64) (bool, error) {
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeed, proof, faults)
}
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
}

View File

@ -91,4 +91,12 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
return out, nil
}
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx)
}
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return sm.SectorBuilder.TaskDone(task, res)
}
var _ api.StorageMiner = &StorageMinerAPI{}