Merge pull request #654 from filecoin-project/feat/remote-workers

Remote sectorbuilder workers
This commit is contained in:
Łukasz Magiera 2019-12-05 00:42:12 +01:00 committed by GitHub
commit 7323831320
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1195 additions and 179 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
/lotus
/lotus-storage-miner
/lotus-seal-worker
/lotus-seed
/pond
/townhall

View File

@ -46,7 +46,7 @@ deps: $(BUILD_DEPS)
.PHONY: deps
debug: GOFLAGS=-tags=debug
debug: lotus lotus-storage-miner
debug: lotus lotus-storage-miner lotus-seal-worker lotus-seed
lotus: $(BUILD_DEPS)
rm -f lotus
@ -60,10 +60,16 @@ lotus-storage-miner: $(BUILD_DEPS)
rm -f lotus-storage-miner
go build $(GOFLAGS) -o lotus-storage-miner ./cmd/lotus-storage-miner
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
.PHONY: lotus-storage-miner
BINS+=lotus-storage-miner
lotus-seal-worker: $(BUILD_DEPS)
rm -f lotus-seal-worker
go build $(GOFLAGS) -o lotus-seal-worker ./cmd/lotus-seal-worker
go run github.com/GeertJohan/go.rice/rice append --exec lotus-seal-worker -i ./build
.PHONY: lotus-seal-worker
BINS+=lotus-seal-worker
build: lotus lotus-storage-miner
.PHONY: build
@ -75,7 +81,7 @@ install:
lotus-seed: $(BUILD_DEPS)
rm -f lotus-seed
go build -o lotus-seed ./cmd/lotus-seed
go build $(GOFLAGS) -o lotus-seed ./cmd/lotus-seed
go run github.com/GeertJohan/go.rice/rice append --exec lotus-seed -i ./build
.PHONY: lotus-seed

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
@ -53,6 +54,8 @@ type StorageMiner interface {
ActorAddress(context.Context) (address.Address, error)
ActorSectorSize(context.Context, address.Address) (uint64, error)
// Temp api for testing
StoreGarbageData(context.Context) error
@ -64,13 +67,12 @@ type StorageMiner interface {
SectorsRefs(context.Context) (map[string][]SealedRef, error)
WorkerStats(context.Context) (WorkerStats, error)
}
WorkerStats(context.Context) (sectorbuilder.WorkerStats, error)
type WorkerStats struct {
Free int
Reserved int // for PoSt
Total int
// WorkerQueue registers a remote worker
WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
}
type SectorInfo struct {

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network"
@ -130,6 +131,7 @@ type StorageMinerStruct struct {
Internal struct {
ActorAddress func(context.Context) (address.Address, error) `perm:"read"`
ActorSectorSize func(context.Context, address.Address) (uint64, error) `perm:"read"`
StoreGarbageData func(context.Context) error `perm:"write"`
@ -137,7 +139,10 @@ type StorageMinerStruct struct {
SectorsList func(context.Context) ([]uint64, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"`
WorkerStats func(context.Context) (WorkerStats, error) `perm:"read"`
WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"`
WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
}
}
@ -485,6 +490,10 @@ func (c *StorageMinerStruct) ActorAddress(ctx context.Context) (address.Address,
return c.Internal.ActorAddress(ctx)
}
func (c *StorageMinerStruct) ActorSectorSize(ctx context.Context, addr address.Address) (uint64, error) {
return c.Internal.ActorSectorSize(ctx, addr)
}
func (c *StorageMinerStruct) StoreGarbageData(ctx context.Context) error {
return c.Internal.StoreGarbageData(ctx)
}
@ -503,10 +512,18 @@ func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]Seal
return c.Internal.SectorsRefs(ctx)
}
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (WorkerStats, error) {
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.WorkerStats, error) {
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx)
}
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return c.Internal.WorkerDone(ctx, task, res)
}
var _ Common = &CommonStruct{}
var _ FullNode = &FullNodeStruct{}
var _ StorageMiner = &StorageMinerStruct{}

View File

@ -2,6 +2,12 @@
package build
var SectorSizes = []uint64{
16 << 20,
256 << 20,
1 << 30,
}
// Seconds
const BlockDelay = 30

View File

@ -12,12 +12,6 @@ import (
const UnixfsChunkSize uint64 = 1 << 20
const UnixfsLinksPerLevel = 1024
var SectorSizes = []uint64{
16 << 20,
256 << 20,
1 << 30,
}
func SupportedSectorSize(ssize uint64) bool {
for _, ss := range SectorSizes {
if ssize == ss {

View File

@ -8,6 +8,7 @@ import (
"syscall"
logging "github.com/ipfs/go-log"
"github.com/mitchellh/go-homedir"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
@ -27,20 +28,40 @@ const (
// ApiConnector returns API instance
type ApiConnector func() api.FullNode
func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) {
r, err := repo.NewFS(ctx.String(repoFlag))
func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) {
p, err := homedir.Expand(ctx.String(repoFlag))
if err != nil {
return "", nil, err
return "", "", err
}
r, err := repo.NewFS(p)
if err != nil {
return "", "", err
}
ma, err := r.APIEndpoint()
if err != nil {
return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err)
return "", "", xerrors.Errorf("failed to get api endpoint: (%s) %w", p, err)
}
_, addr, err := manet.DialArgs(ma)
if err != nil {
return "", "", err
}
return p, addr, nil
}
func GetRawAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) {
rdir, addr, err := RepoInfo(ctx, repoFlag)
if err != nil {
return "", nil, err
}
r, err := repo.NewFS(rdir)
if err != nil {
return "", nil, err
}
var headers http.Header
token, err := r.APIToken()
if err != nil {
@ -59,7 +80,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) {
f = "storagerepo"
}
addr, headers, err := getAPI(ctx, f)
addr, headers, err := GetRawAPI(ctx, f)
if err != nil {
return nil, nil, err
}
@ -68,7 +89,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) {
}
func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(ctx, "repo")
addr, headers, err := GetRawAPI(ctx, "repo")
if err != nil {
return nil, nil, err
}
@ -77,7 +98,7 @@ func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error
}
func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(ctx, "storagerepo")
addr, headers, err := GetRawAPI(ctx, "storagerepo")
if err != nil {
return nil, nil, err
}

View File

@ -0,0 +1,91 @@
package main
import (
"github.com/mitchellh/go-homedir"
"os"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
)
var log = logging.Logger("main")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting lotus worker")
local := []*cli.Command{
runCmd,
}
app := &cli.App{
Name: "lotus-seal-worker",
Usage: "Remote storage miner worker",
Version: build.Version,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
},
&cli.StringFlag{
Name: "storagerepo",
EnvVars: []string{"LOTUS_STORAGE_PATH"},
Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME
},
},
Commands: local,
}
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return xerrors.Errorf("getting miner api: %w", err)
}
defer closer()
ctx := lcli.ReqContext(cctx)
_, auth, err := lcli.GetRawAPI(cctx, "storagerepo")
_, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo")
if err != nil {
return xerrors.Errorf("getting miner repo: %w", err)
}
r, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
go func() {
<-ctx.Done()
log.Warn("Shutting down..")
}()
return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r)
},
}

View File

@ -0,0 +1,128 @@
package main
import (
"context"
"net/http"
"path/filepath"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type worker struct {
api api.StorageMiner
minerEndpoint string
repo string
auth http.Header
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error {
act, err := api.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := api.ActorSectorSize(ctx, act)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: 1,
CacheDir: filepath.Join(repo, "cache"),
SealedDir: filepath.Join(repo, "sealed"),
StagedDir: filepath.Join(repo, "staged"),
UnsealedDir: filepath.Join(repo, "unsealed"),
})
if err != nil {
return err
}
w := &worker{
api: api,
minerEndpoint: endpoint,
auth: auth,
repo: repo,
sb: sb,
}
tasks, err := api.WorkerQueue(ctx)
if err != nil {
return err
}
loop:
for {
select {
case task := <-tasks:
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type)
res := w.processTask(ctx, task)
log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr)
if err := api.WorkerDone(ctx, task.TaskID, res); err != nil {
log.Error(err)
}
case <-ctx.Done():
break loop
}
}
log.Warn("acceptJobs exit")
return nil
}
func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes {
switch task.Type {
case sectorbuilder.WorkerPreCommit:
case sectorbuilder.WorkerCommit:
default:
return errRes(xerrors.Errorf("unknown task type %d", task.Type))
}
if err := w.fetchSector(task.SectorID, task.Type); err != nil {
return errRes(xerrors.Errorf("fetching sector: %w", err))
}
var res sectorbuilder.SealRes
switch task.Type {
case sectorbuilder.WorkerPreCommit:
rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces)
if err != nil {
return errRes(xerrors.Errorf("precomitting: %w", err))
}
res.Rspco = rspco.ToJson()
if err := w.push("sealed", task.SectorID); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
if err := w.push("cache", task.SectorID); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
case sectorbuilder.WorkerCommit:
proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco)
if err != nil {
return errRes(xerrors.Errorf("comitting: %w", err))
}
res.Proof = proof
if err := w.push("cache", task.SectorID); err != nil {
return errRes(xerrors.Errorf("pushing precommited data: %w", err))
}
}
return res
}
func errRes(err error) sectorbuilder.SealRes {
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
}

View File

@ -0,0 +1,147 @@
package main
import (
"io"
"mime"
"net/http"
"os"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"gopkg.in/cheggaaa/pb.v1"
"path/filepath"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/lib/systar"
)
func (w *worker) fetch(typ string, sectorID uint64) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
log.Infof("Fetch %s %s", typ, url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = w.auth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
bar := pb.New64(resp.ContentLength)
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}
if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}
switch mediatype {
case "application/x-tar":
return systar.ExtractTar(barreader, filepath.Dir(outname))
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(barreader), outname)
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}
func (w *worker) push(typ string, sectorID uint64) error {
filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
log.Infof("Push %s %s", typ, url)
stat, err := os.Stat(filename)
if err != nil {
return err
}
var r io.Reader
if stat.IsDir() {
r, err = systar.TarDirectory(filename)
} else {
r, err = os.OpenFile(filename, os.O_RDONLY, 0644)
}
if err != nil {
return xerrors.Errorf("opening push reader: %w", err)
}
bar := pb.New64(0)
bar.ShowPercent = true
bar.ShowSpeed = true
bar.ShowCounters = true
bar.Units = pb.U_BYTES
bar.Start()
defer bar.Finish()
//todo set content size
header := w.auth
if stat.IsDir() {
header.Set("Content-Type", "application/x-tar")
} else {
header.Set("Content-Type", "application/octet-stream")
}
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r))
if err != nil {
return err
}
req.Header = header
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
}
if err := resp.Body.Close(); err != nil {
return err
}
// TODO: keep files around for later stages of sealing
return os.RemoveAll(filename)
}
func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error {
var err error
switch typ {
case sectorbuilder.WorkerPreCommit:
err = w.fetch("staged", sectorID)
case sectorbuilder.WorkerCommit:
err = w.fetch("sealed", sectorID)
if err != nil {
return xerrors.Errorf("fetch sealed: %w", err)
}
err = w.fetch("cache", sectorID)
}
if err != nil {
return xerrors.Errorf("fetch failed: %w", err)
}
return nil
}

View File

@ -58,7 +58,10 @@ var infoCmd = &cli.Command{
if err != nil {
return err
}
fmt.Printf("Worker use: %d / %d (+%d)\n", wstat.Total-wstat.Reserved-wstat.Free, wstat.Total, wstat.Reserved)
fmt.Printf("Worker use:\n")
fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved)
fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal)
eps, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil)
if err != nil {

View File

@ -71,6 +71,10 @@ var initCmd = &cli.Command{
Name: "pre-sealed-sectors",
Usage: "specify set of presealed sectors for starting as a genesis miner",
},
&cli.BoolFlag{
Name: "nosync",
Usage: "don't check full-node sync status",
},
},
Action: func(cctx *cli.Context) error {
log.Info("Initializing lotus storage miner")
@ -91,7 +95,7 @@ var initCmd = &cli.Command{
log.Info("Checking full node sync status")
if !cctx.Bool("genesis-miner") {
if !cctx.Bool("genesis-miner") && !cctx.Bool("nosync") {
if err := lcli.SyncWait(ctx, api); err != nil {
return xerrors.Errorf("sync wait: %w", err)
}
@ -204,6 +208,11 @@ var initCmd = &cli.Command{
}
func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir string, maddr address.Address, mds dtypes.MetadataDS) error {
presealDir, err := homedir.Expand(presealDir)
if err != nil {
return xerrors.Errorf("expanding preseal dir: %w", err)
}
b, err := ioutil.ReadFile(filepath.Join(presealDir, "pre-seal-"+maddr.String()+".json"))
if err != nil {
return xerrors.Errorf("reading preseal metadata: %w", err)

View File

@ -8,6 +8,7 @@ import (
"os/signal"
"syscall"
mux "github.com/gorilla/mux"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
@ -19,25 +20,28 @@ import (
"github.com/filecoin-project/lotus/lib/auth"
"github.com/filecoin-project/lotus/lib/jsonrpc"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/repo"
)
const defaultListen = "/ip4/127.0.0.1/tcp/"
var runCmd = &cli.Command{
Name: "run",
Usage: "Start a lotus storage miner process",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "api",
Value: "",
Value: "2345",
},
&cli.BoolFlag{
Name: "enable-gpu-proving",
Usage: "Enable use of GPU for mining operations",
Usage: "enable use of GPU for mining operations",
Value: true,
},
&cli.BoolFlag{
Name: "nosync",
Usage: "Don't check full-node sync status",
Usage: "don't check full-node sync status",
},
},
Action: func(cctx *cli.Context) error {
@ -93,15 +97,13 @@ var runCmd = &cli.Command{
node.Online(),
node.Repo(r),
node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") },
node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error {
apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" +
cctx.String("api"))
apima, err := parseApi(cctx.String("api"))
if err != nil {
return err
}
return lr.SetAPIEndpoint(apima)
})),
}),
node.Override(new(api.FullNode), nodeApi),
)
@ -131,17 +133,21 @@ var runCmd = &cli.Command{
return xerrors.Errorf("could not listen: %w", err)
}
mux := mux.NewRouter()
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi))
mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: minerapi.AuthVerify,
Next: rpcServer.ServeHTTP,
Next: mux.ServeHTTP,
}
http.Handle("/rpc/v0", ah)
srv := &http.Server{Handler: http.DefaultServeMux}
srv := &http.Server{Handler: ah}
sigChan := make(chan os.Signal, 2)
go func() {
@ -160,3 +166,15 @@ var runCmd = &cli.Command{
return srv.Serve(manet.NetListener(lst))
},
}
func parseApi(api string) (multiaddr.Multiaddr, error) {
if api == "" {
return nil, xerrors.New("empty --api")
}
if api[0] != '/' {
api = defaultListen + api
}
return multiaddr.NewMultiaddr(api)
}

1
go.mod
View File

@ -16,6 +16,7 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.1
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.3

2
go.sum
View File

@ -128,6 +128,8 @@ github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=

View File

@ -10,24 +10,24 @@ import (
"golang.org/x/xerrors"
)
func (sb *SectorBuilder) sectorName(sectorID uint64) string {
func (sb *SectorBuilder) SectorName(sectorID uint64) string {
return fmt.Sprintf("s-%s-%d", sb.Miner, sectorID)
}
func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.stagedDir, sb.sectorName(sectorID))
func (sb *SectorBuilder) StagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.stagedDir, sb.SectorName(sectorID))
}
func (sb *SectorBuilder) unsealedSectorPath(sectorID uint64) string {
return filepath.Join(sb.unsealedDir, sb.sectorName(sectorID))
return filepath.Join(sb.unsealedDir, sb.SectorName(sectorID))
}
func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
return os.OpenFile(sb.StagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
}
func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) {
path := filepath.Join(sb.sealedDir, sb.sectorName(sectorID))
func (sb *SectorBuilder) SealedSectorPath(sectorID uint64) (string, error) {
path := filepath.Join(sb.sealedDir, sb.SectorName(sectorID))
e, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
@ -38,7 +38,7 @@ func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) {
}
func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
dir := filepath.Join(sb.cacheDir, sb.sectorName(sectorID))
dir := filepath.Join(sb.cacheDir, sb.SectorName(sectorID))
err := os.Mkdir(dir, 0755)
if os.IsExist(err) {
@ -48,6 +48,19 @@ func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
return dir, err
}
func (sb *SectorBuilder) GetPath(typ string, sectorName string) (string, error) {
switch typ {
case "staged":
return filepath.Join(sb.stagedDir, sectorName), nil
case "sealed":
return filepath.Join(sb.sealedDir, sectorName), nil
case "cache":
return filepath.Join(sb.cacheDir, sectorName), nil
default:
return "", xerrors.Errorf("unknown sector type for write: %s", typ)
}
}
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {

View File

@ -1,28 +1,12 @@
package sectorbuilder
import (
"io/ioutil"
"os"
"path/filepath"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func TempSectorbuilder(sectorSize uint64, ds dtypes.MetadataDS) (*SectorBuilder, func(), error) {
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
return nil, nil, err
}
sb, err := TempSectorbuilderDir(dir, sectorSize, ds)
return sb, func() {
if err := os.RemoveAll(dir); err != nil {
log.Warn("failed to clean up temp sectorbuilder: ", err)
}
}, err
}
func TempSectorbuilderDir(dir string, sectorSize uint64, ds dtypes.MetadataDS) (*SectorBuilder, error) {
addr, err := address.NewFromString("t3vfxagwiegrywptkbmyohqqbfzd7xzbryjydmxso4hfhgsnv6apddyihltsbiikjf3lm7x2myiaxhuc77capq")
if err != nil {

150
lib/sectorbuilder/remote.go Normal file
View File

@ -0,0 +1,150 @@
package sectorbuilder
import (
"context"
"golang.org/x/xerrors"
)
type WorkerTaskType int
const (
WorkerPreCommit WorkerTaskType = iota
WorkerCommit
)
type WorkerTask struct {
Type WorkerTaskType
TaskID uint64
SectorID uint64
// preCommit
SealTicket SealTicket
Pieces []PublicPieceInfo
// commit
SealSeed SealSeed
Rspco RawSealPreCommitOutput
}
type workerCall struct {
task WorkerTask
ret chan SealRes
}
func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
taskCh := make(chan WorkerTask)
r := &remote{
sealTasks: taskCh,
busy: 0,
}
sb.remoteCtr++
sb.remotes[sb.remoteCtr] = r
go sb.remoteWorker(ctx, r)
return taskCh, nil
}
func (sb *SectorBuilder) returnTask(task workerCall) {
go func() {
select {
case sb.sealTasks <- task:
case <-sb.stopping:
return
}
}()
}
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
defer log.Warn("Remote worker disconnected")
defer func() {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
for i, vr := range sb.remotes {
if vr == r {
delete(sb.remotes, i)
return
}
}
}()
for {
select {
case task := <-sb.sealTasks:
resCh := make(chan SealRes)
sb.remoteLk.Lock()
sb.remoteResults[task.task.TaskID] = resCh
sb.remoteLk.Unlock()
// send the task
select {
case r.sealTasks <- task.task:
case <-ctx.Done():
sb.returnTask(task)
return
}
r.lk.Lock()
r.busy = task.task.TaskID
r.lk.Unlock()
// wait for the result
select {
case res := <-resCh:
// send the result back to the caller
select {
case task.ret <- res:
case <-ctx.Done():
return
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
case <-sb.stopping:
return
}
case <-ctx.Done():
return
case <-sb.stopping:
return
}
r.lk.Lock()
r.busy = 0
r.lk.Unlock()
}
}
func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error {
sb.remoteLk.Lock()
rres, ok := sb.remoteResults[task]
if ok {
delete(sb.remoteResults, task)
}
sb.remoteLk.Unlock()
if !ok {
return xerrors.Errorf("task %d not found", task)
}
select {
case rres <- res:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -1,18 +1,17 @@
package sectorbuilder
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
@ -40,7 +39,7 @@ type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput
type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput
type EPostCandidate = sectorbuilder.Candidate
@ -62,7 +61,58 @@ type SectorBuilder struct {
unsealLk sync.Mutex
sealLocal bool
rateLimit chan struct{}
sealTasks chan workerCall
taskCtr uint64
remoteLk sync.Mutex
remoteCtr int
remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes
stopping chan struct{}
}
type JsonRSPCO struct {
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
}
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{
CommC: rspco.CommC[:],
CommD: rspco.CommD[:],
CommR: rspco.CommR[:],
CommRLast: rspco.CommRLast[:],
}
}
func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
var out RawSealPreCommitOutput
copy(out.CommC[:], rspco.CommC)
copy(out.CommD[:], rspco.CommD)
copy(out.CommR[:], rspco.CommR)
copy(out.CommRLast[:], rspco.CommRLast)
return out
}
type SealRes struct {
Err string
GoErr error `json:"-"`
Proof []byte
Rspco JsonRSPCO
}
type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64 // only for metrics
}
type Config struct {
@ -78,8 +128,8 @@ type Config struct {
}
func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
if cfg.WorkerThreads <= PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
if cfg.WorkerThreads < PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads)
}
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.UnsealedDir} {
@ -105,6 +155,14 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
return nil, err
}
rlimit := cfg.WorkerThreads - PoStReservedWorkers
sealLocal := rlimit > 0
if rlimit == 0 {
rlimit = 1
}
sb := &SectorBuilder{
ds: ds,
@ -117,16 +175,59 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
unsealedDir: cfg.UnsealedDir,
Miner: cfg.Miner,
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
sealLocal: sealLocal,
rateLimit: make(chan struct{}, rlimit),
taskCtr: 1,
sealTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{},
stopping: make(chan struct{}),
}
return sb, nil
}
func (sb *SectorBuilder) RateLimit() func() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call")
func NewStandalone(cfg *Config) (*SectorBuilder, error) {
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.UnsealedDir} {
if err := os.MkdirAll(dir, 0755); err != nil {
if os.IsExist(err) {
continue
}
return nil, err
}
}
return &SectorBuilder{
ds: nil,
ssize: cfg.SectorSize,
Miner: cfg.Miner,
stagedDir: cfg.StagedDir,
sealedDir: cfg.SealedDir,
cacheDir: cfg.CacheDir,
unsealedDir: cfg.UnsealedDir,
sealLocal: true,
taskCtr: 1,
remotes: map[int]*remote{},
rateLimit: make(chan struct{}, cfg.WorkerThreads),
stopping: make(chan struct{}),
}, nil
}
func (sb *SectorBuilder) checkRateLimit() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting local sectorbuilder call")
}
}
func (sb *SectorBuilder) RateLimit() func() {
sb.checkRateLimit()
sb.rateLimit <- struct{}{}
return func() {
@ -134,8 +235,33 @@ func (sb *SectorBuilder) RateLimit() func() {
}
}
func (sb *SectorBuilder) WorkerStats() (free, reserved, total int) {
return cap(sb.rateLimit) - len(sb.rateLimit), PoStReservedWorkers, cap(sb.rateLimit) + PoStReservedWorkers
type WorkerStats struct {
LocalFree int
LocalReserved int
LocalTotal int
// todo: post in progress
RemotesTotal int
RemotesFree int
}
func (sb *SectorBuilder) WorkerStats() WorkerStats {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
remoteFree := len(sb.remotes)
for _, r := range sb.remotes {
if r.busy > 0 {
remoteFree--
}
}
return WorkerStats{
LocalFree: cap(sb.rateLimit) - len(sb.rateLimit),
LocalReserved: PoStReservedWorkers,
LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers,
RemotesTotal: len(sb.remotes),
RemotesFree: remoteFree,
}
}
func addressToProverID(a address.Address) [32]byte {
@ -203,7 +329,7 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
return nil, err
}
sealedPath, err := sb.sealedSectorPath(sectorID)
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return nil, err
}
@ -259,18 +385,59 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
}, nil
}
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
select {
case ret := <-call.ret:
var err error
if ret.Err != "" {
err = xerrors.New(ret.Err)
}
return ret.Rspco.rspco(), err
case <-sb.stopping:
return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
ret := sb.RateLimit()
defer ret()
call := workerCall{
task: WorkerTask{
Type: WorkerPreCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
},
ret: make(chan SealRes),
}
select { // prefer remote
case sb.sealTasks <- call:
return sb.sealPreCommitRemote(call)
default:
}
sb.checkRateLimit()
select { // use whichever is available
case sb.sealTasks <- call:
return sb.sealPreCommitRemote(call)
case sb.rateLimit <- struct{}{}:
}
// local
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
}
sealedPath, err := sb.sealedSectorPath(sectorID)
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
}
var sum uint64
@ -282,7 +449,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
stagedPath := sb.stagedSectorPath(sectorID)
stagedPath := sb.StagedSectorPath(sectorID)
rspco, err := sectorbuilder.SealPreCommit(
sb.ssize,
@ -299,12 +466,25 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
return rspco, nil
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
ret := sb.RateLimit()
defer ret()
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
select {
case ret := <-call.ret:
if ret.Err != "" {
err = xerrors.New(ret.Err)
}
return ret.Proof, err
case <-sb.stopping:
return nil, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
@ -320,17 +500,51 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
ticket.TicketBytes,
seed.TicketBytes,
pieces,
rspco,
sectorbuilder.RawSealPreCommitOutput(rspco),
)
if err != nil {
return nil, xerrors.Errorf("SealCommit: %w", err)
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
select { // prefer remote
case sb.sealTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
select { // use whichever is available
case sb.sealTasks <- call:
proof, err = sb.sealCommitRemote(call)
case sb.rateLimit <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) {
@ -370,7 +584,7 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (Sor
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache path for sector %d: %w", s.SectorID, err)
}
sealedPath, err := sb.sealedSectorPath(s.SectorID)
sealedPath, err := sb.SealedSectorPath(s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err)
}
@ -403,68 +617,8 @@ func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo,
return candidates, proof, err
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
copy(ticketa[:], ticket)
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
func NewSortedPrivateSectorInfo(sectors []sectorbuilder.PrivateSectorInfo) SortedPrivateSectorInfo {
return sectorbuilder.NewSortedPrivateSectorInfo(sectors...)
}
func NewSortedPublicSectorInfo(sectors []sectorbuilder.PublicSectorInfo) SortedPublicSectorInfo {
return sectorbuilder.NewSortedPublicSectorInfo(sectors...)
}
func VerifyElectionPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())))
return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID)
}
func VerifyFallbackPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())))
return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID)
}
func verifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeCount uint64, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
if challengeCount != uint64(len(candidates)) {
log.Warnf("verifyPost with wrong candidate count: expected %d, got %d", challengeCount, len(candidates))
return false, nil // user input, dont't error
}
var challengeSeeda [CommLen]byte
copy(challengeSeeda[:], challengeSeed)
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
prover := addressToProverID(proverID)
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeeda, challengeCount, proof, candidates, prover)
}
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}
func ElectionPostChallengeCount(sectors uint64) uint64 {

View File

@ -0,0 +1,79 @@
package sectorbuilder
import (
"context"
"io"
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"go.opencensus.io/trace"
"github.com/filecoin-project/lotus/chain/address"
)
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
copy(ticketa[:], ticket)
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
func NewSortedPrivateSectorInfo(sectors []sectorbuilder.PrivateSectorInfo) SortedPrivateSectorInfo {
return sectorbuilder.NewSortedPrivateSectorInfo(sectors...)
}
func NewSortedPublicSectorInfo(sectors []sectorbuilder.PublicSectorInfo) SortedPublicSectorInfo {
return sectorbuilder.NewSortedPublicSectorInfo(sectors...)
}
func VerifyElectionPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())))
return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID)
}
func VerifyFallbackPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())))
return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID)
}
func verifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeCount uint64, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
if challengeCount != uint64(len(candidates)) {
log.Warnf("verifyPost with wrong candidate count: expected %d, got %d", challengeCount, len(candidates))
return false, nil // user input, dont't error
}
var challengeSeeda [CommLen]byte
copy(challengeSeeda[:], challengeSeed)
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
prover := addressToProverID(proverID)
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeeda, challengeCount, proof, candidates, prover)
}
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []sectorbuilder.PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
}

47
lib/systar/systar.go Normal file
View File

@ -0,0 +1,47 @@
package systar
import (
"golang.org/x/xerrors"
"io"
"os"
"os/exec"
"path/filepath"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("systar")
func ExtractTar(body io.Reader, dest string) error {
if err := os.MkdirAll(dest, 0755); err != nil {
return xerrors.Errorf("creating dest directory: %w", err)
}
cmd := exec.Command("tar", "-xS", "-C", dest)
cmd.Stdin = body
return cmd.Run()
}
func TarDirectory(file string) (io.ReadCloser, error) {
// use system builtin tar, golang one doesn't support sparse files
dir := filepath.Dir(file)
base := filepath.Base(file)
i, o := io.Pipe()
// don't bother with compression, it's mostly random data
cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base)
cmd.Stdout = o
if err := cmd.Start(); err != nil {
return nil, err
}
go func() {
if err := o.CloseWithError(cmd.Wait()); err != nil {
log.Error(err)
}
}()
return i, nil
}

View File

@ -16,6 +16,11 @@ let sealCodes = [
"PreCommitted",
"Committing",
"Proving",
"SealFailed",
"PreCommitFailed",
"SealCommitFailed",
"CommitFailed",
]
class StorageNode extends React.Component {

View File

@ -136,7 +136,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
return nodeInfo{}, err
}
initArgs := []string{"init"}
initArgs := []string{"init", "--nosync"}
if fullNodeRepo == api.running[1].meta.Repo {
initArgs = []string{"init", "--actor=t0101", "--genesis-miner", "--pre-sealed-sectors=" + filepath.Join(fullNodeRepo, "preseal")}
}
@ -154,7 +154,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
mux := newWsMux()
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id), "--nosync")
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo)
@ -214,7 +214,7 @@ func (api *api) RestartNode(id int32) (nodeInfo, error) {
var cmd *exec.Cmd
if nd.meta.Storage {
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id), "--nosync")
} else {
cmd = exec.Command("./lotus", "daemon", "--api", fmt.Sprintf("%d", 2500+id))
}

View File

@ -2,13 +2,21 @@ package impl
import (
"context"
"encoding/json"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/lib/systar"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"
"io"
"mime"
"net/http"
"os"
"path/filepath"
)
type StorageMinerAPI struct {
@ -23,19 +31,117 @@ type StorageMinerAPI struct {
Full api.FullNode
}
func (sm *StorageMinerAPI) WorkerStats(context.Context) (api.WorkerStats, error) {
free, reserved, total := sm.SectorBuilder.WorkerStats()
return api.WorkerStats{
Free: free,
Reserved: reserved,
Total: total,
}, nil
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
if !api.HasPerm(r.Context(), api.PermAdmin) {
w.WriteHeader(401)
json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"})
return
}
mux := mux.NewRouter()
mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT")
log.Infof("SERVEGETREMOTE %s", r.URL)
mux.ServeHTTP(w, r)
}
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
stat, err := os.Stat(path)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = systar.TarDirectory(path)
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil {
log.Error(err)
return
}
}
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
if err := os.RemoveAll(path); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
switch mediatype {
case "application/x-tar":
if err := systar.ExtractTar(r.Body, filepath.Dir(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
default:
if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
w.WriteHeader(200)
log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength)
}
func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) {
stat := sm.SectorBuilder.WorkerStats()
return stat, nil
}
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
return sm.SectorBuilderConfig.Miner, nil
}
func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Address) (uint64, error) {
return sm.Full.StateMinerSectorSize(ctx, addr, nil)
}
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) error {
return sm.Miner.StoreGarbageData()
}
@ -95,4 +201,12 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
return out, nil
}
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx)
}
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return sm.SectorBuilder.TaskDone(ctx, task, res)
}
var _ api.StorageMiner = &StorageMinerAPI{}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/mitchellh/go-homedir"
"io"
"io/ioutil"
"os"
@ -17,6 +16,7 @@ import (
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/address"
@ -79,16 +79,16 @@ func MakeGenesis(outFile, presealInfo string) func(bs dtypes.ChainBlockstore, w
fdata, err := ioutil.ReadFile(presealInfo)
if err != nil {
return nil, xerrors.Errorf("reading preseals json: %w", err)
}
var preseals map[string]genesis.GenesisMiner
if err := json.Unmarshal(fdata, &preseals); err != nil {
return nil, err
}
var preseal map[string]genesis.GenesisMiner
if err := json.Unmarshal(fdata, &preseal); err != nil {
return nil, err
}
minerAddresses := make([]address.Address, 0, len(preseal))
for s := range preseal {
minerAddresses := make([]address.Address, 0, len(preseals))
for s := range preseals {
a, err := address.NewFromString(s)
if err != nil {
return nil, err
@ -101,13 +101,13 @@ func MakeGenesis(outFile, presealInfo string) func(bs dtypes.ChainBlockstore, w
gmc := &gen.GenMinerCfg{
PeerIDs: []peer.ID{"peer ID 1"},
PreSeals: preseal,
PreSeals: preseals,
MinerAddrs: minerAddresses,
}
addrs := map[address.Address]types.BigInt{}
for _, miner := range preseal {
for _, miner := range preseals {
if _, err := w.Import(&miner.Key); err != nil {
return nil, xerrors.Errorf("importing miner key: %w", err)
}

5
scripts/dev/drop-local-repos Executable file
View File

@ -0,0 +1,5 @@
#!/usr/bin/env sh
set -o xtrace
rm -rf ~/.lotus ~/.lotusstorage/ ~/.genesis-sectors ~/.lotusworker

8
scripts/dev/gen-daemon Executable file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env sh
set -o xtrace
export TRUST_PARAMS=1
go run -tags=debug ./cmd/lotus-seed pre-seal
go run -tags=debug ./cmd/lotus daemon --lotus-make-random-genesis=devel.gen --genesis-presealed-sectors=~/.genesis-sectors/pre-seal-t0101.json

7
scripts/dev/sminer-init Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env sh
set -o xtrace
export TRUST_PARAMS=1
go run -tags=debug ./cmd/lotus-storage-miner init --actor=t0101 --genesis-miner --pre-sealed-sectors=~/.genesis-sectors

View File

@ -109,7 +109,11 @@ func (m *Miner) Run(ctx context.Context) error {
}
go fps.run(ctx)
go m.sectorStateLoop(ctx)
if err := m.sectorStateLoop(ctx); err != nil {
log.Error(err)
return xerrors.Errorf("failed to startup sector state loop: %w", err)
}
return nil
}