support remote SealPreCommit

This commit is contained in:
Łukasz Magiera 2019-11-21 17:10:04 +01:00
parent ba3ad75670
commit 88bbcd80ea
6 changed files with 98 additions and 15 deletions

View File

@ -73,6 +73,11 @@ lotus-storage-miner: $(BUILD_DEPS)
go build -o lotus-storage-miner ./cmd/lotus-storage-miner
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
lotus-worker: $(BUILD_DEPS)
rm -f lotus-worker
go build -o lotus-worker ./cmd/lotus-worker
go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build
.PHONY: lotus-storage-miner
CLEAN+=lotus-storage-miner

View File

@ -8,6 +8,7 @@ import (
"syscall"
logging "github.com/ipfs/go-log"
"github.com/mitchellh/go-homedir"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
@ -27,20 +28,40 @@ const (
// ApiConnector returns API instance
type ApiConnector func() api.FullNode
func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) {
r, err := repo.NewFS(ctx.String(repoFlag))
func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) {
p, err := homedir.Expand(ctx.String(repoFlag))
if err != nil {
return "", nil, err
return "", "", err
}
r, err := repo.NewFS(p)
if err != nil {
return "", "", err
}
ma, err := r.APIEndpoint()
if err != nil {
return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err)
return "", "", xerrors.Errorf("failed to get api endpoint: %w", err)
}
_, addr, err := manet.DialArgs(ma)
if err != nil {
return "", "", err
}
return p, addr, nil
}
func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) {
rdir, addr, err := RepoInfo(ctx, repoFlag)
if err != nil {
return "", nil, err
}
r, err := repo.NewFS(rdir)
if err != nil {
return "", nil, err
}
var headers http.Header
token, err := r.APIToken()
if err != nil {

View File

@ -34,8 +34,8 @@ func main() {
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
},
&cli.StringFlag{
Name: "minerrepo",
EnvVars: []string{"LOTUS_MINER_PATH"},
Name: "storagerepo",
EnvVars: []string{"LOTUS_STORAGE_PATH"},
Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME
},
},
@ -66,6 +66,16 @@ var runCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)
_, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo")
if err != nil {
return err
}
r, _, err := lcli.RepoInfo(cctx, "repo")
if err != nil {
return err
}
v, err := nodeApi.Version(ctx)
if err != nil {
return err
@ -79,6 +89,6 @@ var runCmd = &cli.Command{
os.Exit(0)
}()
return acceptJobs(ctx, nodeApi)
return acceptJobs(ctx, nodeApi, storageAddr, r)
},
}

View File

@ -80,7 +80,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
switch task.Type {
case sectorbuilder.WorkerPreCommit:
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.PublicPieceInfo)
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces)
if err != nil {
return errRes(err)
}

View File

@ -20,13 +20,13 @@ type WorkerTask struct {
SectorID uint64
// preCommit
SealTicket SealTicket
PublicPieceInfo []PublicPieceInfo
SealTicket SealTicket
Pieces []PublicPieceInfo
}
type workerCall struct {
task WorkerTask
ret chan<- SealRes
ret chan SealRes
}
func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) {

View File

@ -6,6 +6,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"unsafe"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
@ -64,6 +65,7 @@ type SectorBuilder struct {
sealTasks chan workerCall
taskCtr uint64
remoteLk sync.Mutex
remotes []*remote
remoteResults map[uint64]chan<- SealRes
@ -155,6 +157,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
sealLocal: sealLocal,
rateLimit: make(chan struct{}, rlimit),
taskCtr: 1,
sealTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
@ -182,16 +185,23 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) {
stagedDir: cfg.StagedDir,
sealedDir: cfg.SealedDir,
cacheDir: cfg.CacheDir,
sealLocal: true,
taskCtr: 1,
rateLimit: make(chan struct{}, cfg.WorkerThreads),
stopping: make(chan struct{}),
}, nil
}
func (sb *SectorBuilder) RateLimit() func() {
func (sb *SectorBuilder) checkRateLimit() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call")
log.Warn("rate-limiting local sectorbuilder call")
}
}
func (sb *SectorBuilder) RateLimit() func() {
sb.checkRateLimit()
sb.rateLimit <- struct{}{}
return func() {
@ -269,9 +279,46 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
}
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
select {
case ret := <-call.ret:
return ret.Rspco, ret.Err
case <-sb.stopping:
return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
ret := sb.RateLimit()
defer ret()
call := workerCall{
task: WorkerTask{
Type: WorkerPreCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
},
ret: make(chan SealRes),
}
select { // prefer remote
case sb.sealTasks <- call:
return sb.sealPreCommitRemote(call)
default:
}
sb.checkRateLimit()
select { // use whichever is available
case sb.sealTasks <- call:
return sb.sealPreCommitRemote(call)
case sb.rateLimit <- struct{}{}:
}
// local
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {