From 9725eb78bfff858c71be9632e9e80b576b4c82e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 01:52:59 +0100 Subject: [PATCH 01/14] wip remote sectorbuilder workers --- api/api_storage.go | 5 ++ api/struct.go | 12 +++ cmd/lotus-worker/main.go | 85 ++++++++++++++++++ cmd/lotus-worker/sub.go | 133 +++++++++++++++++++++++++++++ lib/sectorbuilder/files.go | 14 +-- lib/sectorbuilder/remote.go | 128 +++++++++++++++++++++++++++ lib/sectorbuilder/sectorbuilder.go | 103 ++++++++++------------ lib/sectorbuilder/simple.go | 62 ++++++++++++++ node/impl/storminer.go | 8 ++ 9 files changed, 485 insertions(+), 65 deletions(-) create mode 100644 cmd/lotus-worker/main.go create mode 100644 cmd/lotus-worker/sub.go create mode 100644 lib/sectorbuilder/remote.go create mode 100644 lib/sectorbuilder/simple.go diff --git a/api/api_storage.go b/api/api_storage.go index 299449662..1a45065d7 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -66,6 +66,11 @@ type StorageMiner interface { SectorsRefs(context.Context) (map[string][]SealedRef, error) WorkerStats(context.Context) (WorkerStats, error) + + // WorkerQueue registers a remote worker + WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error) + + WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error } type WorkerStats struct { diff --git a/api/struct.go b/api/struct.go index 1f08f94bd..9403028e3 100644 --- a/api/struct.go +++ b/api/struct.go @@ -2,6 +2,7 @@ package api import ( "context" + "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/network" @@ -141,6 +142,9 @@ type StorageMinerStruct struct { SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"` WorkerStats func(context.Context) (WorkerStats, error) `perm:"read"` + + WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm + WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` } } @@ -522,6 +526,14 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (WorkerStats, erro return c.Internal.WorkerStats(ctx) } +func (c *StorageMinerStruct) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) { + return c.Internal.WorkerQueue(ctx) +} + +func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { + return c.Internal.WorkerDone(ctx, task, res) +} + var _ Common = &CommonStruct{} var _ FullNode = &FullNodeStruct{} var _ StorageMiner = &StorageMinerStruct{} diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go new file mode 100644 index 000000000..db30956e2 --- /dev/null +++ b/cmd/lotus-worker/main.go @@ -0,0 +1,85 @@ +package lotus_worker + +import ( + "net/http" + "os" + + "github.com/filecoin-project/lotus/api" + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/lotus/build" + lcli "github.com/filecoin-project/lotus/cli" +) + +var log = logging.Logger("main") + +func main() { + logging.SetLogLevel("*", "INFO") + + log.Info("Starting lotus worker") + + local := []*cli.Command{ + runCmd, + } + + app := &cli.App{ + Name: "lotus-worker", + Usage: "Remote storage miner worker", + Version: build.Version, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + EnvVars: []string{"WORKER_PATH"}, + Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME + }, + &cli.StringFlag{ + Name: "minerrepo", + EnvVars: []string{"LOTUS_MINER_PATH"}, + Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME + }, + }, + + Commands: local, + } + + if err := app.Run(os.Args); err != nil { + log.Warn(err) + return + } +} + +var runCmd = &cli.Command{ + Name: "run", + Usage: "Start lotus fountain", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "pullEndpoint", + Value: "127.0.0.1:30003", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + v, err := nodeApi.Version(ctx) + if err != nil { + return err + } + if v.APIVersion != build.APIVersion { + return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) + } + + go func() { + <-ctx.Done() + os.Exit(0) + }() + + return http.ListenAndServe(cctx.String("pullEndpoint"), nil) + }, +} diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go new file mode 100644 index 000000000..f14e1c3de --- /dev/null +++ b/cmd/lotus-worker/sub.go @@ -0,0 +1,133 @@ +package lotus_worker + +import ( + "context" + "golang.org/x/xerrors" + "io" + "net/http" + "os" + "path/filepath" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/sectorbuilder" +) + +type worker struct { + api api.StorageMiner + minerEndpoint string + repo string + + sb *sectorbuilder.SectorBuilder +} + +func acceptJobs(ctx context.Context, api api.StorageMiner) error { + w := &worker{ + api: api, + } + + tasks, err := api.WorkerQueue(ctx) + if err != nil { + return err + } + + for task := range tasks { + res := w.processTask(ctx, task) + + api.WorkerDone(ctx) + } +} + +func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes { + switch task.Type { + case sectorbuilder.WorkerPreCommit: + case sectorbuilder.WorkerCommit: + default: + return errRes(xerrors.Errorf("unknown task type %d", task.Type)) + } + + if err := w.fetchSector(task.SectorID, task.Type); err != nil { + return errRes(err) + } + + var res sectorbuilder.SealRes + + switch task.Type { + case sectorbuilder.WorkerPreCommit: + rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.PublicPieceInfo) + if err != nil { + return errRes(err) + } + res.Rspco = rspco + + if err := w.push("sealed", task.SectorID); err != nil { + return errRes(err) + } + case sectorbuilder.WorkerCommit: + + } + + return res +} + +func (w *worker) fetch(typ string, sectorID uint64) error { + outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + + resp, err := http.Get(w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)) + if err != nil { + return err + } + defer resp.Body.Close() + + out, err := os.Create(outname) + if err != nil { + return err + } + defer out.Close() + + // TODO: progress bar + + _, err = io.Copy(out, resp.Body) + return err +} + +func (w *worker) push(typ string, sectorID uint64) error { + outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + + f, err := os.OpenFile(outname, os.O_RDONLY, 0644) + if err != nil { + return err + } + + req, err := http.NewRequest("PUT", w.minerEndpoint+"/remote/"+typ+"/"+w.sb.SectorName(sectorID), f) + if err != nil { + return err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if err := f.Close(); err != nil { + return err + } + + return resp.Body.Close() +} + +func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { + var err error + switch typ { + case sectorbuilder.WorkerPreCommit: + err = w.fetch("staged", sectorID) + case sectorbuilder.WorkerCommit: + panic("todo") + } + if err != nil { + return xerrors.Errorf("fetch failed: %w", err) + } + return nil +} + +func errRes(err error) sectorbuilder.SealRes { + return sectorbuilder.SealRes{Err: err} +} diff --git a/lib/sectorbuilder/files.go b/lib/sectorbuilder/files.go index 51b748f50..4fc3b8246 100644 --- a/lib/sectorbuilder/files.go +++ b/lib/sectorbuilder/files.go @@ -10,20 +10,20 @@ import ( "golang.org/x/xerrors" ) -func (sb *SectorBuilder) sectorName(sectorID uint64) string { +func (sb *SectorBuilder) SectorName(sectorID uint64) string { return fmt.Sprintf("s-%s-%d", sb.Miner, sectorID) } -func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string { - return filepath.Join(sb.stagedDir, sb.sectorName(sectorID)) +func (sb *SectorBuilder) StagedSectorPath(sectorID uint64) string { + return filepath.Join(sb.stagedDir, sb.SectorName(sectorID)) } func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) { - return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644) + return os.OpenFile(sb.StagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644) } -func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) { - path := filepath.Join(sb.sealedDir, sb.sectorName(sectorID)) +func (sb *SectorBuilder) SealedSectorPath(sectorID uint64) (string, error) { + path := filepath.Join(sb.sealedDir, sb.SectorName(sectorID)) e, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) if err != nil { @@ -34,7 +34,7 @@ func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) { } func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) { - dir := filepath.Join(sb.cacheDir, sb.sectorName(sectorID)) + dir := filepath.Join(sb.cacheDir, sb.SectorName(sectorID)) err := os.Mkdir(dir, 0755) if os.IsExist(err) { diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go new file mode 100644 index 000000000..ca331e3b6 --- /dev/null +++ b/lib/sectorbuilder/remote.go @@ -0,0 +1,128 @@ +package sectorbuilder + +import ( + "context" + + "golang.org/x/xerrors" +) + +type WorkerTaskType int + +const ( + WorkerPreCommit WorkerTaskType = iota + WorkerCommit +) + +type WorkerTask struct { + Type WorkerTaskType + TaskID uint64 + + SectorID uint64 + + // preCommit + SealTicket SealTicket + PublicPieceInfo []PublicPieceInfo +} + +type workerCall struct { + task WorkerTask + ret chan<- SealRes +} + +func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) { + sb.remoteLk.Lock() + + taskCh := make(chan WorkerTask) + r := &remote{ + sealTasks: taskCh, + busy: 0, + } + + sb.remotes = append(sb.remotes, r) + go sb.remoteWorker(ctx, r) + + sb.remoteLk.Unlock() + + return taskCh, nil +} + +func (sb *SectorBuilder) returnTask(task workerCall) { + go func() { + select { + case sb.sealTasks <- task: + case <-sb.stopping: + return + } + }() +} + +func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { + defer log.Warn("Remote worker disconnected") + + for { + select { + case task := <-sb.sealTasks: + resCh := make(chan SealRes) + + sb.remoteLk.Lock() + sb.remoteResults[task.task.TaskID] = resCh + sb.remoteLk.Unlock() + + // send the task + select { + case r.sealTasks <- task.task: + case <-ctx.Done(): + sb.returnTask(task) + return + } + + r.lk.Lock() + r.busy = task.task.TaskID + r.lk.Unlock() + + // wait for the result + select { + case res := <-resCh: + + // send the result back to the caller + select { + case task.ret <- res: + case <-ctx.Done(): + return + case <-sb.stopping: + return + } + case <-ctx.Done(): + log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err()) + return + case <-sb.stopping: + return + } + + case <-ctx.Done(): + return + case <-sb.stopping: + return + } + } +} + +func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error { + sb.remoteLk.Lock() + rres, ok := sb.remoteResults[task] + if ok { + delete(sb.remoteResults, task) + } + sb.remoteLk.Unlock() + + if !ok { + return xerrors.Errorf("task %d not found", task) + } + + select { + case rres <- res: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 43ed38d22..3cf8b100f 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -1,7 +1,6 @@ package sectorbuilder import ( - "context" "fmt" "io" "os" @@ -12,7 +11,6 @@ import ( sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - "go.opencensus.io/trace" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/address" @@ -61,7 +59,30 @@ type SectorBuilder struct { sealedDir string cacheDir string + sealLocal bool rateLimit chan struct{} + + sealTasks chan workerCall + + remoteLk sync.Mutex + remotes []*remote + remoteResults map[uint64]chan<- SealRes + + stopping chan struct{} +} + +type SealRes struct { + Err error `json:"omitempty"` + + Proof []byte `json:"omitempty"` + Rspco RawSealPreCommitOutput `json:"omitempty"` +} + +type remote struct { + lk sync.Mutex + + sealTasks chan<- WorkerTask + busy uint64 } type Config struct { @@ -77,8 +98,8 @@ type Config struct { } func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { - if cfg.WorkerThreads <= PoStReservedWorkers { - return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads) + if cfg.WorkerThreads < PoStReservedWorkers { + return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads) } proverId := addressToProverID(cfg.Miner) @@ -111,6 +132,14 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { return nil, err } + rlimit := cfg.WorkerThreads - PoStReservedWorkers + + sealLocal := rlimit > 0 + + if rlimit == 0 { + rlimit = 1 + } + sb := &SectorBuilder{ handle: sbp, ds: ds, @@ -121,8 +150,13 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { sealedDir: cfg.SealedDir, cacheDir: cfg.CacheDir, - Miner: cfg.Miner, - rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers), + Miner: cfg.Miner, + + sealLocal: sealLocal, + rateLimit: make(chan struct{}, rlimit), + + sealTasks: make(chan workerCall), + remoteResults: map[uint64]chan<- SealRes{}, } return sb, nil @@ -218,7 +252,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece return RawSealPreCommitOutput{}, err } - sealedPath, err := sb.sealedSectorPath(sectorID) + sealedPath, err := sb.SealedSectorPath(sectorID) if err != nil { return RawSealPreCommitOutput{}, err } @@ -232,7 +266,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum)) } - stagedPath := sb.stagedSectorPath(sectorID) + stagedPath := sb.StagedSectorPath(sectorID) rspco, err := sectorbuilder.StandaloneSealPreCommit( sb.ssize, @@ -285,7 +319,7 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea } } - sealedPath, err := sb.sealedSectorPath(sectorID) + sealedPath, err := sb.SealedSectorPath(sectorID) if err != nil { return nil, err } @@ -310,53 +344,6 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea return proof, nil } -func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { - // Wait, this is a blocking method with no way of interrupting it? - // does it checkpoint itself? - return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults) -} - -func (sb *SectorBuilder) SectorSize() uint64 { - return sb.ssize -} - -var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector - -func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) { - var commRa, commDa, ticketa, seeda [32]byte - copy(commRa[:], commR) - copy(commDa[:], commD) - copy(ticketa[:], ticket) - copy(seeda[:], seed) - proverIDa := addressToProverID(proverID) - - return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof) -} - -func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo { - return sectorbuilder.NewSortedSectorInfo(sectors...) -} - -func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, proof []byte, faults []uint64) (bool, error) { - _, span := trace.StartSpan(ctx, "VerifyPoSt") - defer span.End() - return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeed, proof, faults) -} - -func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) { - f, werr, err := toReadableFile(piece, int64(pieceSize)) - if err != nil { - return [32]byte{}, err - } - - commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize) - if err != nil { - return [32]byte{}, err - } - - return commP, werr() -} - -func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) { - return sectorbuilder.GenerateDataCommitment(ssize, pieces) +func (sb *SectorBuilder) Stop() { + close(sb.stopping) } diff --git a/lib/sectorbuilder/simple.go b/lib/sectorbuilder/simple.go new file mode 100644 index 000000000..d1ca86e6a --- /dev/null +++ b/lib/sectorbuilder/simple.go @@ -0,0 +1,62 @@ +package sectorbuilder + +import ( + "context" + "io" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "go.opencensus.io/trace" + + "github.com/filecoin-project/lotus/chain/address" +) + +func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { + // Wait, this is a blocking method with no way of interrupting it? + // does it checkpoint itself? + return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults) +} + +func (sb *SectorBuilder) SectorSize() uint64 { + return sb.ssize +} + +var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector + +func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) { + var commRa, commDa, ticketa, seeda [32]byte + copy(commRa[:], commR) + copy(commDa[:], commD) + copy(ticketa[:], ticket) + copy(seeda[:], seed) + proverIDa := addressToProverID(proverID) + + return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof) +} + +func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo { + return sectorbuilder.NewSortedSectorInfo(sectors...) +} + +func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, proof []byte, faults []uint64) (bool, error) { + _, span := trace.StartSpan(ctx, "VerifyPoSt") + defer span.End() + return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeed, proof, faults) +} + +func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) { + f, werr, err := toReadableFile(piece, int64(pieceSize)) + if err != nil { + return [32]byte{}, err + } + + commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize) + if err != nil { + return [32]byte{}, err + } + + return commP, werr() +} + +func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) { + return sectorbuilder.GenerateDataCommitment(ssize, pieces) +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index f339fd66a..f9613a599 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -91,4 +91,12 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed return out, nil } +func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) { + return sm.SectorBuilder.AddWorker(ctx) +} + +func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { + return sm.SectorBuilder.TaskDone(task, res) +} + var _ api.StorageMiner = &StorageMinerAPI{} From ba3ad75670e05bc39e85084b51e82e5526eda01f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 15:10:51 +0100 Subject: [PATCH 02/14] remote-worker: wire up storage miner endpoints --- api/api_storage.go | 3 +- api/struct.go | 7 +++- cmd/lotus-storage-miner/run.go | 14 +++++-- cmd/lotus-worker/main.go | 11 +++--- cmd/lotus-worker/sub.go | 37 +++++++++++++++--- go.mod | 1 + go.sum | 2 + lib/sectorbuilder/files.go | 18 +++++++++ lib/sectorbuilder/remote.go | 9 ++++- lib/sectorbuilder/sectorbuilder.go | 28 +++++++++++++- node/impl/storminer.go | 61 +++++++++++++++++++++++++++++- 11 files changed, 170 insertions(+), 21 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 1a45065d7..dd41fd2c3 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -3,7 +3,6 @@ package api import ( "context" "fmt" - "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) @@ -54,6 +53,8 @@ type StorageMiner interface { ActorAddress(context.Context) (address.Address, error) + ActorSectorSize(context.Context, address.Address) (uint64, error) + // Temp api for testing StoreGarbageData(context.Context) error diff --git a/api/struct.go b/api/struct.go index 9403028e3..d60be3c38 100644 --- a/api/struct.go +++ b/api/struct.go @@ -133,7 +133,8 @@ type StorageMinerStruct struct { CommonStruct Internal struct { - ActorAddress func(context.Context) (address.Address, error) `perm:"read"` + ActorAddress func(context.Context) (address.Address, error) `perm:"read"` + ActorSectorSize func(context.Context, address.Address) (uint64, error) `perm:"read"` StoreGarbageData func(context.Context) error `perm:"write"` @@ -504,6 +505,10 @@ func (c *StorageMinerStruct) ActorAddress(ctx context.Context) (address.Address, return c.Internal.ActorAddress(ctx) } +func (c *StorageMinerStruct) ActorSectorSize(ctx context.Context, addr address.Address) (uint64, error) { + return c.Internal.ActorSectorSize(ctx, addr) +} + func (c *StorageMinerStruct) StoreGarbageData(ctx context.Context) error { return c.Internal.StoreGarbageData(ctx) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 735a0d21d..b083cb0d9 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -8,6 +8,7 @@ import ( "os/signal" "syscall" + mux "github.com/gorilla/mux" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" "golang.org/x/xerrors" @@ -19,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/lib/auth" "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/repo" ) @@ -118,17 +120,21 @@ var runCmd = &cli.Command{ return xerrors.Errorf("could not listen: %w", err) } + mux := mux.NewRouter() + rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) + mux.Handle("/rpc/v0", rpcServer) + mux.HandleFunc("/remote", minerapi.(*impl.StorageMinerAPI).ServeRemote) + mux.Handle("/", http.DefaultServeMux) // pprof + ah := &auth.Handler{ Verify: minerapi.AuthVerify, - Next: rpcServer.ServeHTTP, + Next: mux.ServeHTTP, } - http.Handle("/rpc/v0", ah) - - srv := &http.Server{Handler: http.DefaultServeMux} + srv := &http.Server{Handler: ah} sigChan := make(chan os.Signal, 2) go func() { diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index db30956e2..9634a4d16 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -1,14 +1,13 @@ -package lotus_worker +package main import ( - "net/http" "os" - "github.com/filecoin-project/lotus/api" logging "github.com/ipfs/go-log" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" ) @@ -52,7 +51,7 @@ func main() { var runCmd = &cli.Command{ Name: "run", - Usage: "Start lotus fountain", + Usage: "Start lotus worker", Flags: []cli.Flag{ &cli.StringFlag{ Name: "pullEndpoint", @@ -72,7 +71,7 @@ var runCmd = &cli.Command{ return err } if v.APIVersion != build.APIVersion { - return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) + return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) } go func() { @@ -80,6 +79,6 @@ var runCmd = &cli.Command{ os.Exit(0) }() - return http.ListenAndServe(cctx.String("pullEndpoint"), nil) + return acceptJobs(ctx, nodeApi) }, } diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index f14e1c3de..5009f2a69 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -1,13 +1,14 @@ -package lotus_worker +package main import ( "context" - "golang.org/x/xerrors" "io" "net/http" "os" "path/filepath" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) @@ -20,9 +21,30 @@ type worker struct { sb *sectorbuilder.SectorBuilder } -func acceptJobs(ctx context.Context, api api.StorageMiner) error { +func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo string) error { + act, err := api.ActorAddress(ctx) + if err != nil { + return err + } + ssize, err := api.ActorSectorSize(ctx, act) + if err != nil { + return err + } + + sb, err := sectorbuilder.NewStandalone(§orbuilder.Config{ + SectorSize: ssize, + Miner: act, + WorkerThreads: 1, + CacheDir: filepath.Join(repo, "cache"), + SealedDir: filepath.Join(repo, "sealed"), + StagedDir: filepath.Join(repo, "staged"), + }) + w := &worker{ - api: api, + api: api, + minerEndpoint: endpoint, + repo: repo, + sb: sb, } tasks, err := api.WorkerQueue(ctx) @@ -33,8 +55,13 @@ func acceptJobs(ctx context.Context, api api.StorageMiner) error { for task := range tasks { res := w.processTask(ctx, task) - api.WorkerDone(ctx) + if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { + log.Error(err) + } } + + log.Warn("acceptJobs exit") + return nil } func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes { diff --git a/go.mod b/go.mod index 556a630a4..1ab3389a3 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/google/go-cmp v0.3.1 // indirect github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect + github.com/gorilla/mux v1.7.3 github.com/gorilla/websocket v1.4.0 github.com/hashicorp/go-multierror v1.0.0 github.com/hashicorp/golang-lru v0.5.3 diff --git a/go.sum b/go.sum index 71ff465c8..6fe298cc5 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= +github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= diff --git a/lib/sectorbuilder/files.go b/lib/sectorbuilder/files.go index 4fc3b8246..b94b643f9 100644 --- a/lib/sectorbuilder/files.go +++ b/lib/sectorbuilder/files.go @@ -44,6 +44,24 @@ func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) { return dir, err } +func (sb *SectorBuilder) OpenRemoteRead(typ string, sectorName string) (*os.File, error) { + switch typ { + case "staged": + return os.OpenFile(filepath.Join(sb.stagedDir, sectorName), os.O_RDONLY, 0644) + default: + return nil, xerrors.Errorf("unknown sector type for read: %s", typ) + } +} + +func (sb *SectorBuilder) OpenRemoteWrite(typ string, sectorName string) (*os.File, error) { + switch typ { + case "sealed": + return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_WRONLY|os.O_CREATE, 0644) + default: + return nil, xerrors.Errorf("unknown sector type for write: %s", typ) + } +} + func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { f, ok := r.(*os.File) if ok { diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index ca331e3b6..2731e6649 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -39,10 +39,10 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro } sb.remotes = append(sb.remotes, r) - go sb.remoteWorker(ctx, r) - sb.remoteLk.Unlock() + go sb.remoteWorker(ctx, r) + return taskCh, nil } @@ -92,6 +92,7 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { case <-sb.stopping: return } + case <-ctx.Done(): log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err()) return @@ -104,6 +105,10 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { case <-sb.stopping: return } + + r.lk.Lock() + r.busy = 0 + r.lk.Unlock() } } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 3cf8b100f..4ea6d5f67 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -82,7 +82,7 @@ type remote struct { lk sync.Mutex sealTasks chan<- WorkerTask - busy uint64 + busy uint64 // only for metrics } type Config struct { @@ -157,11 +157,37 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { sealTasks: make(chan workerCall), remoteResults: map[uint64]chan<- SealRes{}, + + stopping: make(chan struct{}), } return sb, nil } +func NewStandalone(cfg *Config) (*SectorBuilder, error) { + for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} { + if err := os.Mkdir(dir, 0755); err != nil { + if os.IsExist(err) { + continue + } + return nil, err + } + } + + return &SectorBuilder{ + handle: nil, + ds: nil, + ssize: cfg.SectorSize, + Miner: cfg.Miner, + stagedDir: cfg.StagedDir, + sealedDir: cfg.SealedDir, + cacheDir: cfg.CacheDir, + sealLocal: true, + rateLimit: make(chan struct{}, cfg.WorkerThreads), + stopping: make(chan struct{}), + }, nil +} + func (sb *SectorBuilder) RateLimit() func() { if cap(sb.rateLimit) == len(sb.rateLimit) { log.Warn("rate-limiting sectorbuilder call") diff --git a/node/impl/storminer.go b/node/impl/storminer.go index f9613a599..c0f61090d 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -2,6 +2,10 @@ package impl import ( "context" + "encoding/json" + "github.com/gorilla/mux" + "io" + "net/http" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" @@ -21,6 +25,57 @@ type StorageMinerAPI struct { Full api.FullNode } +func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { + if !api.HasPerm(r.Context(), api.PermAdmin) { + w.WriteHeader(401) + json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"}) + return + } + + mux := mux.NewRouter() + + mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") + mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") + + mux.ServeHTTP(w, r) +} + +func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + fr, err := sm.SectorBuilder.OpenRemoteRead(vars["type"], vars["sname"]) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + defer fr.Close() + + w.WriteHeader(200) + if _, err := io.Copy(w, fr); err != nil { + log.Error(err) + return + } +} + +func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + fr, err := sm.SectorBuilder.OpenRemoteWrite(vars["type"], vars["sname"]) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + defer fr.Close() + + w.WriteHeader(200) + if _, err := io.Copy(w, fr); err != nil { + log.Error(err) + return + } +} + func (sm *StorageMinerAPI) WorkerStats(context.Context) (api.WorkerStats, error) { free, reserved, total := sm.SectorBuilder.WorkerStats() return api.WorkerStats{ @@ -34,6 +89,10 @@ func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error return sm.SectorBuilderConfig.Miner, nil } +func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Address) (uint64, error) { + return sm.Full.StateMinerSectorSize(ctx, addr, nil) +} + func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) error { return sm.Miner.StoreGarbageData() } @@ -96,7 +155,7 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde } func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { - return sm.SectorBuilder.TaskDone(task, res) + return sm.SectorBuilder.TaskDone(ctx, task, res) } var _ api.StorageMiner = &StorageMinerAPI{} From 88bbcd80ea4efbf2a426a895c5888c0b8da82fdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 17:10:04 +0100 Subject: [PATCH 03/14] support remote SealPreCommit --- Makefile | 5 +++ cli/cmd.go | 29 +++++++++++++--- cmd/lotus-worker/main.go | 16 +++++++-- cmd/lotus-worker/sub.go | 2 +- lib/sectorbuilder/remote.go | 6 ++-- lib/sectorbuilder/sectorbuilder.go | 55 +++++++++++++++++++++++++++--- 6 files changed, 98 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 6faafaffe..084a50900 100644 --- a/Makefile +++ b/Makefile @@ -73,6 +73,11 @@ lotus-storage-miner: $(BUILD_DEPS) go build -o lotus-storage-miner ./cmd/lotus-storage-miner go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build +lotus-worker: $(BUILD_DEPS) + rm -f lotus-worker + go build -o lotus-worker ./cmd/lotus-worker + go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build + .PHONY: lotus-storage-miner CLEAN+=lotus-storage-miner diff --git a/cli/cmd.go b/cli/cmd.go index 30e9ba01a..11a1e09e0 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -8,6 +8,7 @@ import ( "syscall" logging "github.com/ipfs/go-log" + "github.com/mitchellh/go-homedir" manet "github.com/multiformats/go-multiaddr-net" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" @@ -27,20 +28,40 @@ const ( // ApiConnector returns API instance type ApiConnector func() api.FullNode -func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { - r, err := repo.NewFS(ctx.String(repoFlag)) +func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) { + p, err := homedir.Expand(ctx.String(repoFlag)) if err != nil { - return "", nil, err + return "", "", err + } + + r, err := repo.NewFS(p) + if err != nil { + return "", "", err } ma, err := r.APIEndpoint() if err != nil { - return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err) + return "", "", xerrors.Errorf("failed to get api endpoint: %w", err) } _, addr, err := manet.DialArgs(ma) + if err != nil { + return "", "", err + } + + return p, addr, nil +} + +func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { + rdir, addr, err := RepoInfo(ctx, repoFlag) if err != nil { return "", nil, err } + + r, err := repo.NewFS(rdir) + if err != nil { + return "", nil, err + } + var headers http.Header token, err := r.APIToken() if err != nil { diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 9634a4d16..a8c4ae315 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -34,8 +34,8 @@ func main() { Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME }, &cli.StringFlag{ - Name: "minerrepo", - EnvVars: []string{"LOTUS_MINER_PATH"}, + Name: "storagerepo", + EnvVars: []string{"LOTUS_STORAGE_PATH"}, Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME }, }, @@ -66,6 +66,16 @@ var runCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) + _, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo") + if err != nil { + return err + } + + r, _, err := lcli.RepoInfo(cctx, "repo") + if err != nil { + return err + } + v, err := nodeApi.Version(ctx) if err != nil { return err @@ -79,6 +89,6 @@ var runCmd = &cli.Command{ os.Exit(0) }() - return acceptJobs(ctx, nodeApi) + return acceptJobs(ctx, nodeApi, storageAddr, r) }, } diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index 5009f2a69..2a4cae317 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -80,7 +80,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) switch task.Type { case sectorbuilder.WorkerPreCommit: - rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.PublicPieceInfo) + rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces) if err != nil { return errRes(err) } diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index 2731e6649..d984baea5 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -20,13 +20,13 @@ type WorkerTask struct { SectorID uint64 // preCommit - SealTicket SealTicket - PublicPieceInfo []PublicPieceInfo + SealTicket SealTicket + Pieces []PublicPieceInfo } type workerCall struct { task WorkerTask - ret chan<- SealRes + ret chan SealRes } func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) { diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 4ea6d5f67..67353d069 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "unsafe" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" @@ -64,6 +65,7 @@ type SectorBuilder struct { sealTasks chan workerCall + taskCtr uint64 remoteLk sync.Mutex remotes []*remote remoteResults map[uint64]chan<- SealRes @@ -155,6 +157,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { sealLocal: sealLocal, rateLimit: make(chan struct{}, rlimit), + taskCtr: 1, sealTasks: make(chan workerCall), remoteResults: map[uint64]chan<- SealRes{}, @@ -182,16 +185,23 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) { stagedDir: cfg.StagedDir, sealedDir: cfg.SealedDir, cacheDir: cfg.CacheDir, + sealLocal: true, + taskCtr: 1, rateLimit: make(chan struct{}, cfg.WorkerThreads), stopping: make(chan struct{}), }, nil } -func (sb *SectorBuilder) RateLimit() func() { +func (sb *SectorBuilder) checkRateLimit() { if cap(sb.rateLimit) == len(sb.rateLimit) { - log.Warn("rate-limiting sectorbuilder call") + log.Warn("rate-limiting local sectorbuilder call") } +} + +func (sb *SectorBuilder) RateLimit() func() { + sb.checkRateLimit() + sb.rateLimit <- struct{}{} return func() { @@ -269,9 +279,46 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) } +func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { + select { + case ret := <-call.ret: + return ret.Rspco, ret.Err + case <-sb.stopping: + return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped") + } +} + func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { - ret := sb.RateLimit() - defer ret() + call := workerCall{ + task: WorkerTask{ + Type: WorkerPreCommit, + TaskID: atomic.AddUint64(&sb.taskCtr, 1), + SectorID: sectorID, + SealTicket: ticket, + Pieces: pieces, + }, + ret: make(chan SealRes), + } + + select { // prefer remote + case sb.sealTasks <- call: + return sb.sealPreCommitRemote(call) + default: + } + + sb.checkRateLimit() + + select { // use whichever is available + case sb.sealTasks <- call: + return sb.sealPreCommitRemote(call) + case sb.rateLimit <- struct{}{}: + } + + // local + + defer func() { + <-sb.rateLimit + }() cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { From 98b1de33b6ca9b6fef49da2b7c7b67085550856c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 17:23:42 +0100 Subject: [PATCH 04/14] Stats for remote workers --- api/api_storage.go | 8 +------- api/struct.go | 4 ++-- cmd/lotus-storage-miner/info.go | 5 ++++- lib/sectorbuilder/sectorbuilder.go | 29 +++++++++++++++++++++++++++-- node/impl/storminer.go | 10 +++------- 5 files changed, 37 insertions(+), 19 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index dd41fd2c3..e0778a534 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -66,7 +66,7 @@ type StorageMiner interface { SectorsRefs(context.Context) (map[string][]SealedRef, error) - WorkerStats(context.Context) (WorkerStats, error) + WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) // WorkerQueue registers a remote worker WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error) @@ -74,12 +74,6 @@ type StorageMiner interface { WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error } -type WorkerStats struct { - Free int - Reserved int // for PoSt - Total int -} - type SectorInfo struct { SectorID uint64 State SectorState diff --git a/api/struct.go b/api/struct.go index d60be3c38..d36fd1698 100644 --- a/api/struct.go +++ b/api/struct.go @@ -142,7 +142,7 @@ type StorageMinerStruct struct { SectorsList func(context.Context) ([]uint64, error) `perm:"read"` SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"` - WorkerStats func(context.Context) (WorkerStats, error) `perm:"read"` + WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"` WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` @@ -527,7 +527,7 @@ func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]Seal return c.Internal.SectorsRefs(ctx) } -func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (WorkerStats, error) { +func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.WorkerStats, error) { return c.Internal.WorkerStats(ctx) } diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index b1df59cb3..6727e3e1e 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -58,7 +58,10 @@ var infoCmd = &cli.Command{ if err != nil { return err } - fmt.Printf("Worker use: %d / %d (+%d)\n", wstat.Total-wstat.Reserved-wstat.Free, wstat.Total, wstat.Reserved) + + fmt.Printf("Worker use:\n") + fmt.Printf("\tLocal: %d / %d (+%d reserved)\n",wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved ) + fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal - wstat.RemotesFree, wstat.RemotesTotal) ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil) if err != nil { diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 67353d069..04f521a27 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -209,8 +209,33 @@ func (sb *SectorBuilder) RateLimit() func() { } } -func (sb *SectorBuilder) WorkerStats() (free, reserved, total int) { - return cap(sb.rateLimit) - len(sb.rateLimit), PoStReservedWorkers, cap(sb.rateLimit) + PoStReservedWorkers +type WorkerStats struct { + LocalFree int + LocalReserved int + LocalTotal int + // todo: post in progress + RemotesTotal int + RemotesFree int +} + +func (sb *SectorBuilder) WorkerStats() WorkerStats { + sb.remoteLk.Lock() + defer sb.remoteLk.Unlock() + + remoteFree := len(sb.remotes) + for _, r := range sb.remotes { + if r.busy > 0 { + remoteFree-- + } + } + + return WorkerStats{ + LocalFree: cap(sb.rateLimit) - len(sb.rateLimit), + LocalReserved: PoStReservedWorkers, + LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers, + RemotesTotal: len(sb.remotes), + RemotesFree: remoteFree, + } } func addressToProverID(a address.Address) [32]byte { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index c0f61090d..254aab31c 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -76,13 +76,9 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques } } -func (sm *StorageMinerAPI) WorkerStats(context.Context) (api.WorkerStats, error) { - free, reserved, total := sm.SectorBuilder.WorkerStats() - return api.WorkerStats{ - Free: free, - Reserved: reserved, - Total: total, - }, nil +func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) { + stat := sm.SectorBuilder.WorkerStats() + return stat, nil } func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) { From d4197bbadc028094a711199560c84742b8878680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 19:38:43 +0100 Subject: [PATCH 05/14] Working remote PreCommit --- Makefile | 4 +- build/params.go | 4 +- cli/cmd.go | 10 ++--- cmd/lotus-storage-miner/info.go | 4 +- cmd/lotus-storage-miner/run.go | 4 +- cmd/lotus-worker/main.go | 19 ++++------ cmd/lotus-worker/sub.go | 60 +++++++++++++++++++++++++++--- lib/sectorbuilder/remote.go | 15 +++++++- lib/sectorbuilder/sectorbuilder.go | 13 ++++--- lotuspond/main.go | 2 + node/impl/storminer.go | 9 +++++ 11 files changed, 108 insertions(+), 36 deletions(-) diff --git a/Makefile b/Makefile index 084a50900..58853783e 100644 --- a/Makefile +++ b/Makefile @@ -72,13 +72,13 @@ lotus-storage-miner: $(BUILD_DEPS) rm -f lotus-storage-miner go build -o lotus-storage-miner ./cmd/lotus-storage-miner go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build +.PHONY: lotus-storage-miner lotus-worker: $(BUILD_DEPS) rm -f lotus-worker go build -o lotus-worker ./cmd/lotus-worker go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build - -.PHONY: lotus-storage-miner +.PHONY: lotus-worker CLEAN+=lotus-storage-miner diff --git a/build/params.go b/build/params.go index 35ccd69f0..eb2d2a167 100644 --- a/build/params.go +++ b/build/params.go @@ -37,7 +37,7 @@ const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours // Consensus / Network // Seconds -const BlockDelay = 12 +const BlockDelay = 2 // Seconds const AllowableClockDrift = BlockDelay * 2 @@ -60,7 +60,7 @@ const WRatioDen = 2 // Proofs // Blocks -const ProvingPeriodDuration uint64 = 300 +const ProvingPeriodDuration uint64 = 30 // PoStChallangeTime sets the window in which post computation should happen // Blocks diff --git a/cli/cmd.go b/cli/cmd.go index 11a1e09e0..472eb9732 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -41,7 +41,7 @@ func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) { ma, err := r.APIEndpoint() if err != nil { - return "", "", xerrors.Errorf("failed to get api endpoint: %w", err) + return "", "", xerrors.Errorf("failed to get api endpoint: (%s) %w", p, err) } _, addr, err := manet.DialArgs(ma) if err != nil { @@ -51,7 +51,7 @@ func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) { return p, addr, nil } -func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { +func GetRawAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { rdir, addr, err := RepoInfo(ctx, repoFlag) if err != nil { return "", nil, err @@ -80,7 +80,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) { f = "storagerepo" } - addr, headers, err := getAPI(ctx, f) + addr, headers, err := GetRawAPI(ctx, f) if err != nil { return nil, nil, err } @@ -89,7 +89,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) { } func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) { - addr, headers, err := getAPI(ctx, "repo") + addr, headers, err := GetRawAPI(ctx, "repo") if err != nil { return nil, nil, err } @@ -98,7 +98,7 @@ func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error } func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) { - addr, headers, err := getAPI(ctx, "storagerepo") + addr, headers, err := GetRawAPI(ctx, "storagerepo") if err != nil { return nil, nil, err } diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 6727e3e1e..68757f455 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -60,8 +60,8 @@ var infoCmd = &cli.Command{ } fmt.Printf("Worker use:\n") - fmt.Printf("\tLocal: %d / %d (+%d reserved)\n",wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved ) - fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal - wstat.RemotesFree, wstat.RemotesTotal) + fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved) + fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal) ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil) if err != nil { diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index b083cb0d9..2145a90bc 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -126,8 +126,8 @@ var runCmd = &cli.Command{ rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) mux.Handle("/rpc/v0", rpcServer) - mux.HandleFunc("/remote", minerapi.(*impl.StorageMinerAPI).ServeRemote) - mux.Handle("/", http.DefaultServeMux) // pprof + mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote) + mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof ah := &auth.Handler{ Verify: minerapi.AuthVerify, diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index a8c4ae315..f2f2f0ffb 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/mitchellh/go-homedir" "os" logging "github.com/ipfs/go-log" @@ -44,7 +45,7 @@ func main() { } if err := app.Run(os.Args); err != nil { - log.Warn(err) + log.Warnf("%+v", err) return } } @@ -52,26 +53,22 @@ func main() { var runCmd = &cli.Command{ Name: "run", Usage: "Start lotus worker", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "pullEndpoint", - Value: "127.0.0.1:30003", - }, - }, Action: func(cctx *cli.Context) error { nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { - return err + return xerrors.Errorf("getting miner api: %w", err) } defer closer() ctx := lcli.ReqContext(cctx) + _, auth, err := lcli.GetRawAPI(cctx, "storagerepo") + _, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo") if err != nil { - return err + return xerrors.Errorf("getting miner repo: %w", err) } - r, _, err := lcli.RepoInfo(cctx, "repo") + r, err := homedir.Expand(cctx.String("repo")) if err != nil { return err } @@ -89,6 +86,6 @@ var runCmd = &cli.Command{ os.Exit(0) }() - return acceptJobs(ctx, nodeApi, storageAddr, r) + return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r) }, } diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index 2a4cae317..803aa6026 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -2,6 +2,7 @@ package main import ( "context" + "gopkg.in/cheggaaa/pb.v1" "io" "net/http" "os" @@ -17,11 +18,12 @@ type worker struct { api api.StorageMiner minerEndpoint string repo string + auth http.Header sb *sectorbuilder.SectorBuilder } -func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo string) error { +func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error { act, err := api.ActorAddress(ctx) if err != nil { return err @@ -38,11 +40,16 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo CacheDir: filepath.Join(repo, "cache"), SealedDir: filepath.Join(repo, "sealed"), StagedDir: filepath.Join(repo, "staged"), + MetadataDir: filepath.Join(repo, "meta"), }) + if err != nil { + return err + } w := &worker{ api: api, minerEndpoint: endpoint, + auth: auth, repo: repo, sb: sb, } @@ -53,8 +60,12 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo } for task := range tasks { + log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) + res := w.processTask(ctx, task) + log.Infof("Task %d done, err: %s", task.TaskID, res.Err) + if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { log.Error(err) } @@ -90,7 +101,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return errRes(err) } case sectorbuilder.WorkerCommit: - + panic("todo") } return res @@ -99,10 +110,20 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - resp, err := http.Get(w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)) + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Fetch %s", url) + + req, err := http.NewRequest("GET", url, nil) if err != nil { return err } + req.Header = w.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() out, err := os.Create(outname) @@ -111,9 +132,15 @@ func (w *worker) fetch(typ string, sectorID uint64) error { } defer out.Close() - // TODO: progress bar + bar := pb.New64(resp.ContentLength) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES - _, err = io.Copy(out, resp.Body) + bar.Start() + defer bar.Finish() + + _, err = io.Copy(out, bar.NewProxyReader(resp.Body)) return err } @@ -125,14 +152,35 @@ func (w *worker) push(typ string, sectorID uint64) error { return err } - req, err := http.NewRequest("PUT", w.minerEndpoint+"/remote/"+typ+"/"+w.sb.SectorName(sectorID), f) + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Push %s", url) + + fi, err := f.Stat() if err != nil { return err } + + bar := pb.New64(fi.Size()) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES + + bar.Start() + defer bar.Finish() + + req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f)) + if err != nil { + return err + } + req.Header = w.auth + resp, err := http.DefaultClient.Do(req) if err != nil { return err } + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 response: %d", resp.StatusCode) + } if err := f.Close(); err != nil { return err diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index d984baea5..00adb79f0 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -38,7 +38,8 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro busy: 0, } - sb.remotes = append(sb.remotes, r) + sb.remoteCtr++ + sb.remotes[sb.remoteCtr] = r sb.remoteLk.Unlock() go sb.remoteWorker(ctx, r) @@ -59,6 +60,18 @@ func (sb *SectorBuilder) returnTask(task workerCall) { func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { defer log.Warn("Remote worker disconnected") + defer func() { + sb.remoteLk.Lock() + defer sb.remoteLk.Unlock() + + for i, vr := range sb.remotes { + if vr == r { + delete(sb.remotes, i) + return + } + } + }() + for { select { case task := <-sb.sealTasks: diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 04f521a27..b3703dcb2 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -67,7 +67,8 @@ type SectorBuilder struct { taskCtr uint64 remoteLk sync.Mutex - remotes []*remote + remoteCtr int + remotes map[int]*remote remoteResults map[uint64]chan<- SealRes stopping chan struct{} @@ -160,6 +161,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { taskCtr: 1, sealTasks: make(chan workerCall), remoteResults: map[uint64]chan<- SealRes{}, + remotes: map[int]*remote{}, stopping: make(chan struct{}), } @@ -169,7 +171,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { func NewStandalone(cfg *Config) (*SectorBuilder, error) { for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} { - if err := os.Mkdir(dir, 0755); err != nil { + if err := os.MkdirAll(dir, 0755); err != nil { if os.IsExist(err) { continue } @@ -188,6 +190,7 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) { sealLocal: true, taskCtr: 1, + remotes: map[int]*remote{}, rateLimit: make(chan struct{}, cfg.WorkerThreads), stopping: make(chan struct{}), }, nil @@ -210,12 +213,12 @@ func (sb *SectorBuilder) RateLimit() func() { } type WorkerStats struct { - LocalFree int + LocalFree int LocalReserved int - LocalTotal int + LocalTotal int // todo: post in progress RemotesTotal int - RemotesFree int + RemotesFree int } func (sb *SectorBuilder) WorkerStats() WorkerStats { diff --git a/lotuspond/main.go b/lotuspond/main.go index d3238dad7..17ad7c970 100644 --- a/lotuspond/main.go +++ b/lotuspond/main.go @@ -88,6 +88,8 @@ var shCmd = &cli.Command{ } } + shcmd.Env = append(os.Environ(), shcmd.Env...) + shcmd.Stdin = os.Stdin shcmd.Stdout = os.Stdout shcmd.Stderr = os.Stderr diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 254aab31c..bef961067 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,6 +3,7 @@ package impl import ( "context" "encoding/json" + "fmt" "github.com/gorilla/mux" "io" "net/http" @@ -37,6 +38,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") + log.Infof("SERVEGETREMOTE %s", r.URL) + mux.ServeHTTP(w, r) } @@ -51,6 +54,12 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques } defer fr.Close() + fi, err := fr.Stat() + if err != nil { + return + } + + w.Header().Set("Content-Length", fmt.Sprint(fi.Size())) w.WriteHeader(200) if _, err := io.Copy(w, fr); err != nil { log.Error(err) From b726b952984345051e6369592a4f55554d3b6811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 20:51:48 +0100 Subject: [PATCH 06/14] WIP remote sector CommitSseal --- cmd/lotus-worker/sub.go | 16 +++++++-- lib/sectorbuilder/files.go | 2 ++ lib/sectorbuilder/remote.go | 4 +++ lib/sectorbuilder/sectorbuilder.go | 56 ++++++++++++++++++++++++++++-- node/impl/storminer.go | 4 ++- 5 files changed, 75 insertions(+), 7 deletions(-) diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index 803aa6026..db7c70091 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -97,11 +97,20 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) } res.Rspco = rspco + // TODO: push cache + if err := w.push("sealed", task.SectorID); err != nil { return errRes(err) } case sectorbuilder.WorkerCommit: - panic("todo") + proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, nil, task.Rspco) + if err != nil { + return errRes(err) + } + + res.Proof = proof + + // TODO: Push cache } return res @@ -167,7 +176,7 @@ func (w *worker) push(typ string, sectorID uint64) error { bar.Start() defer bar.Finish() - + //todo set content size req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f)) if err != nil { return err @@ -195,7 +204,8 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) case sectorbuilder.WorkerPreCommit: err = w.fetch("staged", sectorID) case sectorbuilder.WorkerCommit: - panic("todo") + err = w.fetch("sealed", sectorID) + // todo: cache } if err != nil { return xerrors.Errorf("fetch failed: %w", err) diff --git a/lib/sectorbuilder/files.go b/lib/sectorbuilder/files.go index b94b643f9..0869d4030 100644 --- a/lib/sectorbuilder/files.go +++ b/lib/sectorbuilder/files.go @@ -48,6 +48,8 @@ func (sb *SectorBuilder) OpenRemoteRead(typ string, sectorName string) (*os.File switch typ { case "staged": return os.OpenFile(filepath.Join(sb.stagedDir, sectorName), os.O_RDONLY, 0644) + case "sealed": + return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_RDONLY, 0644) default: return nil, xerrors.Errorf("unknown sector type for read: %s", typ) } diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index 00adb79f0..78477f364 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -22,6 +22,10 @@ type WorkerTask struct { // preCommit SealTicket SealTicket Pieces []PublicPieceInfo + + // commit + SealSeed SealSeed + Rspco RawSealPreCommitOutput } type workerCall struct { diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index b3703dcb2..f017635ad 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -387,9 +387,19 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece return rspco, nil } -func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) { - ret := sb.RateLimit() - defer ret() +func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { + select { + case ret := <-call.ret: + return ret.Proof, ret.Err + case <-sb.stopping: + return nil, xerrors.New("sectorbuilder stopped") + } +} + +func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { + defer func() { + <-sb.rateLimit + }() cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { @@ -411,6 +421,41 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) } + return proof, nil +} + +func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) { + call := workerCall{ + task: WorkerTask{ + Type: WorkerCommit, + TaskID: atomic.AddUint64(&sb.taskCtr, 1), + SectorID: sectorID, + SealTicket: ticket, + Pieces: pieces, + + SealSeed: seed, + Rspco: rspco, + }, + ret: make(chan SealRes), + } + + select { // prefer remote + case sb.sealTasks <- call: + proof, err = sb.sealCommitRemote(call) + default: + sb.checkRateLimit() + + select { // use whichever is available + case sb.sealTasks <- call: + proof, err = sb.sealCommitRemote(call) + case sb.rateLimit <- struct{}{}: + proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco) + } + } + if err != nil { + return nil, xerrors.Errorf("commit: %w", err) + } + pmeta := make([]sectorbuilder.PieceMetadata, len(pieces)) for i, piece := range pieces { pmeta[i] = sectorbuilder.PieceMetadata{ @@ -425,6 +470,11 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea return nil, err } + cacheDir, err := sb.sectorCacheDir(sectorID) + if err != nil { + return nil, err + } + err = sectorbuilder.ImportSealedSector( sb.handle, sectorID, diff --git a/node/impl/storminer.go b/node/impl/storminer.go index bef961067..b1b4f9f82 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -79,10 +79,12 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques defer fr.Close() w.WriteHeader(200) - if _, err := io.Copy(w, fr); err != nil { + n, err := io.Copy(w, fr) + if err != nil { log.Error(err) return } + log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], n) } func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) { From 3281e9448a554146c6a81738364345157427a3a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 22 Nov 2019 16:48:02 +0100 Subject: [PATCH 07/14] fix rspco serialization --- cmd/lotus-worker/sub.go | 14 +++----- lib/sectorbuilder/sectorbuilder.go | 52 ++++++++++++++++++++++++------ node/impl/storminer.go | 4 ++- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index db7c70091..e8be9085c 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -84,7 +84,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) } if err := w.fetchSector(task.SectorID, task.Type); err != nil { - return errRes(err) + return errRes(xerrors.Errorf("fetching sector: %w", err)) } var res sectorbuilder.SealRes @@ -93,19 +93,19 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) case sectorbuilder.WorkerPreCommit: rspco, err := w.sb.SealPreCommit(task.SectorID, task.SealTicket, task.Pieces) if err != nil { - return errRes(err) + return errRes(xerrors.Errorf("precomitting: %w", err)) } - res.Rspco = rspco + res.Rspco = rspco.ToJson() // TODO: push cache if err := w.push("sealed", task.SectorID); err != nil { - return errRes(err) + return errRes(xerrors.Errorf("pushing precommited data: %w", err)) } case sectorbuilder.WorkerCommit: proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, nil, task.Rspco) if err != nil { - return errRes(err) + return errRes(xerrors.Errorf("comitting: %w", err)) } res.Proof = proof @@ -191,10 +191,6 @@ func (w *worker) push(typ string, sectorID uint64) error { return xerrors.Errorf("non-200 response: %d", resp.StatusCode) } - if err := f.Close(); err != nil { - return err - } - return resp.Body.Close() } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index f017635ad..941728e0a 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -43,7 +43,7 @@ type SealCommitOutput = sectorbuilder.SealCommitOutput type PublicPieceInfo = sectorbuilder.PublicPieceInfo -type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput +type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput const CommLen = sectorbuilder.CommitmentBytesLen @@ -74,11 +74,36 @@ type SectorBuilder struct { stopping chan struct{} } -type SealRes struct { - Err error `json:"omitempty"` +type JsonRSPCO struct { + CommC []byte + CommD []byte + CommR []byte + CommRLast []byte +} - Proof []byte `json:"omitempty"` - Rspco RawSealPreCommitOutput `json:"omitempty"` +func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO { + return JsonRSPCO{ + CommC: rspco.CommC[:], + CommD: rspco.CommD[:], + CommR: rspco.CommR[:], + CommRLast: rspco.CommRLast[:], + } +} + +func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { + var out RawSealPreCommitOutput + copy(out.CommC[:], rspco.CommC) + copy(out.CommD[:], rspco.CommD) + copy(out.CommR[:], rspco.CommR) + copy(out.CommRLast[:], rspco.CommRLast) + return out +} + +type SealRes struct { + Err error + + Proof []byte + Rspco JsonRSPCO } type remote struct { @@ -310,7 +335,7 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { select { case ret := <-call.ret: - return ret.Rspco, ret.Err + return ret.Rspco.rspco(), ret.Err case <-sb.stopping: return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped") } @@ -350,12 +375,12 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { - return RawSealPreCommitOutput{}, err + return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err) } sealedPath, err := sb.SealedSectorPath(sectorID) if err != nil { - return RawSealPreCommitOutput{}, err + return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err) } var sum uint64 @@ -384,7 +409,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err) } - return rspco, nil + return RawSealPreCommitOutput(rspco), nil } func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { @@ -415,9 +440,12 @@ func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, see ticket.TicketBytes, seed.TicketBytes, pieces, - rspco, + sectorbuilder.RawSealPreCommitOutput(rspco), ) if err != nil { + log.Warn("StandaloneSealCommit error: ", err) + log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco) + return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) } @@ -456,6 +484,10 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea return nil, xerrors.Errorf("commit: %w", err) } + if pieceKeys == nil { + return + } + pmeta := make([]sectorbuilder.PieceMetadata, len(pieces)) for i, piece := range pieces { pmeta[i] = sectorbuilder.PieceMetadata{ diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b1b4f9f82..083097827 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -79,7 +79,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques defer fr.Close() w.WriteHeader(200) - n, err := io.Copy(w, fr) + n, err := io.Copy(fr, r.Body) if err != nil { log.Error(err) return @@ -162,6 +162,8 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde } func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { + log.Infof("WDUN RSPKO %v", res.Rspco) + return sm.SectorBuilder.TaskDone(ctx, task, res) } From 05e631235c39d3b351a8ee845f086ad0d646ca0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 22 Nov 2019 17:25:56 +0100 Subject: [PATCH 08/14] Rename to lotus-seal-worker --- .gitignore | 1 + Makefile | 10 +++++----- cmd/{lotus-worker => lotus-seal-worker}/main.go | 2 +- cmd/{lotus-worker => lotus-seal-worker}/sub.go | 0 4 files changed, 7 insertions(+), 6 deletions(-) rename cmd/{lotus-worker => lotus-seal-worker}/main.go (98%) rename cmd/{lotus-worker => lotus-seal-worker}/sub.go (100%) diff --git a/.gitignore b/.gitignore index 6fef7d758..b4d957e5c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /lotus /lotus-storage-miner +/lotus-seal-worker /pond /townhall /fountain diff --git a/Makefile b/Makefile index 58853783e..b2884b0ac 100644 --- a/Makefile +++ b/Makefile @@ -74,11 +74,11 @@ lotus-storage-miner: $(BUILD_DEPS) go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build .PHONY: lotus-storage-miner -lotus-worker: $(BUILD_DEPS) - rm -f lotus-worker - go build -o lotus-worker ./cmd/lotus-worker - go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build -.PHONY: lotus-worker +lotus-seal-worker: $(BUILD_DEPS) + rm -f lotus-seal-worker + go build -o lotus-seal-worker ./cmd/lotus-seal-worker + go run github.com/GeertJohan/go.rice/rice append --exec lotus-seal-worker -i ./build +.PHONY: lotus-seal-worker CLEAN+=lotus-storage-miner diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-seal-worker/main.go similarity index 98% rename from cmd/lotus-worker/main.go rename to cmd/lotus-seal-worker/main.go index f2f2f0ffb..8fe1f8e9d 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -25,7 +25,7 @@ func main() { } app := &cli.App{ - Name: "lotus-worker", + Name: "lotus-seal-worker", Usage: "Remote storage miner worker", Version: build.Version, Flags: []cli.Flag{ diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-seal-worker/sub.go similarity index 100% rename from cmd/lotus-worker/sub.go rename to cmd/lotus-seal-worker/sub.go From ed9279cf0c34d252bc220091f348064d03623ef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 30 Nov 2019 10:25:31 +0100 Subject: [PATCH 09/14] Some fixes and dev utils --- cmd/lotus-seal-worker/sub.go | 4 ++-- cmd/lotus-storage-miner/init.go | 5 +++++ lib/sectorbuilder/post.go | 1 - lib/sectorbuilder/sectorbuilder.go | 13 ++++++++++--- node/modules/testing/genesis.go | 24 +++++++++++++++--------- scripts/dev/drop-local-repos | 5 +++++ scripts/dev/gen-daemon | 8 ++++++++ scripts/dev/sminer-init | 7 +++++++ 8 files changed, 52 insertions(+), 15 deletions(-) delete mode 100644 lib/sectorbuilder/post.go create mode 100755 scripts/dev/drop-local-repos create mode 100755 scripts/dev/gen-daemon create mode 100755 scripts/dev/sminer-init diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index e8be9085c..436b92dd4 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -103,7 +103,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return errRes(xerrors.Errorf("pushing precommited data: %w", err)) } case sectorbuilder.WorkerCommit: - proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, nil, task.Rspco) + proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco) if err != nil { return errRes(xerrors.Errorf("comitting: %w", err)) } @@ -210,5 +210,5 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) } func errRes(err error) sectorbuilder.SealRes { - return sectorbuilder.SealRes{Err: err} + return sectorbuilder.SealRes{Err: err.Error()} } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index de1164cb5..3759d976d 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -207,6 +207,11 @@ func migratePreSealedSectors(presealsb string, repoPath string, mds dtypes.Metad } func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir string, maddr address.Address, mds dtypes.MetadataDS) error { + presealDir, err := homedir.Expand(presealDir) + if err != nil { + return xerrors.Errorf("expanding preseal dir: %w", err) + } + b, err := ioutil.ReadFile(filepath.Join(presealDir, "pre-seal-"+maddr.String()+".json")) if err != nil { return xerrors.Errorf("reading preseal metadata: %w", err) diff --git a/lib/sectorbuilder/post.go b/lib/sectorbuilder/post.go deleted file mode 100644 index 6d99fae27..000000000 --- a/lib/sectorbuilder/post.go +++ /dev/null @@ -1 +0,0 @@ -package sectorbuilder diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 2cbcf8dd4..eaebde51f 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -97,7 +97,7 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { } type SealRes struct { - Err error + Err string Proof []byte Rspco JsonRSPCO @@ -322,7 +322,11 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { select { case ret := <-call.ret: - return ret.Rspco.rspco(), ret.Err + var err error + if ret.Err != "" { + err = xerrors.New(ret.Err) + } + return ret.Rspco.rspco(), err case <-sb.stopping: return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped") } @@ -402,7 +406,10 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { select { case ret := <-call.ret: - return ret.Proof, ret.Err + if ret.Err != "" { + err = xerrors.New(ret.Err) + } + return ret.Proof, err case <-sb.stopping: return nil, xerrors.New("sectorbuilder stopped") } diff --git a/node/modules/testing/genesis.go b/node/modules/testing/genesis.go index 6fbc0ccfc..1e9215972 100644 --- a/node/modules/testing/genesis.go +++ b/node/modules/testing/genesis.go @@ -18,6 +18,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/ipfs/go-merkledag" peer "github.com/libp2p/go-libp2p-peer" + "github.com/mitchellh/go-homedir" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/gen" @@ -72,18 +73,23 @@ func MakeGenesis(outFile, preseal string) func(bs dtypes.ChainBlockstore, w *wal return func(bs dtypes.ChainBlockstore, w *wallet.Wallet) modules.Genesis { return func() (*types.BlockHeader, error) { glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network") + preseal, err := homedir.Expand(preseal) + if err != nil { + return nil, xerrors.Errorf("expanding preseals json path: %w", err) + } + fdata, err := ioutil.ReadFile(preseal) if err != nil { + return nil, xerrors.Errorf("reading preseals json: %w", err) + } + + var preseals map[string]genesis.GenesisMiner + if err := json.Unmarshal(fdata, &preseals); err != nil { return nil, err } - var preseal map[string]genesis.GenesisMiner - if err := json.Unmarshal(fdata, &preseal); err != nil { - return nil, err - } - - minerAddresses := make([]address.Address, 0, len(preseal)) - for s := range preseal { + minerAddresses := make([]address.Address, 0, len(preseals)) + for s := range preseals { a, err := address.NewFromString(s) if err != nil { return nil, err @@ -96,13 +102,13 @@ func MakeGenesis(outFile, preseal string) func(bs dtypes.ChainBlockstore, w *wal gmc := &gen.GenMinerCfg{ PeerIDs: []peer.ID{"peer ID 1"}, - PreSeals: preseal, + PreSeals: preseals, MinerAddrs: minerAddresses, } addrs := map[address.Address]types.BigInt{} - for _, miner := range preseal { + for _, miner := range preseals { if _, err := w.Import(&miner.Key); err != nil { return nil, xerrors.Errorf("importing miner key: %w", err) } diff --git a/scripts/dev/drop-local-repos b/scripts/dev/drop-local-repos new file mode 100755 index 000000000..df43d3e6c --- /dev/null +++ b/scripts/dev/drop-local-repos @@ -0,0 +1,5 @@ +#!/usr/bin/env sh + +set -o xtrace + +rm -rf ~/.lotus ~/.lotusstorage/ ~/.genesis-sectors \ No newline at end of file diff --git a/scripts/dev/gen-daemon b/scripts/dev/gen-daemon new file mode 100755 index 000000000..fdadc180f --- /dev/null +++ b/scripts/dev/gen-daemon @@ -0,0 +1,8 @@ +#!/usr/bin/env sh + +set -o xtrace + +export TRUST_PARAMS=1 + +go run -tags=debug ./cmd/lotus-seed pre-seal +go run -tags=debug ./cmd/lotus daemon --lotus-make-random-genesis=devel.gen --genesis-presealed-sectors=~/.genesis-sectors/pre-seal-t0101.json diff --git a/scripts/dev/sminer-init b/scripts/dev/sminer-init new file mode 100755 index 000000000..b14e6e14b --- /dev/null +++ b/scripts/dev/sminer-init @@ -0,0 +1,7 @@ +#!/usr/bin/env sh + +set -o xtrace + +export TRUST_PARAMS=1 + +go run -tags=debug ./cmd/lotus-storage-miner init --actor=t0101 --genesis-miner --pre-sealed-sectors=~/.genesis-sectors From 89556819aeb1248b409b448bab3be20f7af11fb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 30 Nov 2019 14:22:50 +0100 Subject: [PATCH 10/14] seal-worker: Handle cache --- cmd/lotus-seal-worker/sub.go | 98 ++++++++++++++++++++++-------- lib/sectorbuilder/files.go | 19 ++---- lib/sectorbuilder/sectorbuilder.go | 1 + node/impl/storminer.go | 86 +++++++++++++++++++++----- scripts/dev/drop-local-repos | 2 +- 5 files changed, 153 insertions(+), 53 deletions(-) diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index 436b92dd4..04480e931 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -2,8 +2,11 @@ package main import ( "context" + files "github.com/ipfs/go-ipfs-files" "gopkg.in/cheggaaa/pb.v1" "io" + "mime" + "mime/multipart" "net/http" "os" "path/filepath" @@ -64,7 +67,7 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth res := w.processTask(ctx, task) - log.Infof("Task %d done, err: %s", task.TaskID, res.Err) + log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { log.Error(err) @@ -97,11 +100,13 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) } res.Rspco = rspco.ToJson() - // TODO: push cache - if err := w.push("sealed", task.SectorID); err != nil { return errRes(xerrors.Errorf("pushing precommited data: %w", err)) } + + if err := w.push("cache", task.SectorID); err != nil { + return errRes(xerrors.Errorf("pushing precommited data: %w", err)) + } case sectorbuilder.WorkerCommit: proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco) if err != nil { @@ -110,7 +115,9 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) res.Proof = proof - // TODO: Push cache + if err := w.push("cache", task.SectorID); err != nil { + return errRes(xerrors.Errorf("pushing precommited data: %w", err)) + } } return res @@ -120,56 +127,82 @@ func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Fetch %s", url) + log.Infof("Fetch %s %s", typ, url) req, err := http.NewRequest("GET", url, nil) if err != nil { - return err + return xerrors.Errorf("request: %w", err) } req.Header = w.auth resp, err := http.DefaultClient.Do(req) if err != nil { - return err + return xerrors.Errorf("do request: %w", err) } defer resp.Body.Close() - out, err := os.Create(outname) - if err != nil { - return err - } - defer out.Close() - bar := pb.New64(resp.ContentLength) bar.ShowPercent = true bar.ShowSpeed = true bar.Units = pb.U_BYTES + barreader := bar.NewProxyReader(resp.Body) + bar.Start() defer bar.Finish() - _, err = io.Copy(out, bar.NewProxyReader(resp.Body)) - return err + mediatype, p, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return xerrors.Errorf("parse media type: %w", err) + } + + var file files.Node + switch mediatype { + case "multipart/form-data": + mpr := multipart.NewReader(barreader, p["boundary"]) + + file, err = files.NewFileFromPartReader(mpr, mediatype) + if err != nil { + return xerrors.Errorf("call to NewFileFromPartReader failed: %w", err) + } + + case "application/octet-stream": + file = files.NewReaderFile(barreader) + default: + return xerrors.Errorf("unknown content type: '%s'", mediatype) + } + + // WriteTo is unhappy when things exist + if err := os.RemoveAll(outname); err != nil { + return xerrors.Errorf("removing dest: %w", err) + } + + return files.WriteTo(file, outname) } func (w *worker) push(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - f, err := os.OpenFile(outname, os.O_RDONLY, 0644) + stat, err := os.Stat(outname) + if err != nil { + return err + } + + f, err := files.NewSerialFile(outname, false, stat) if err != nil { return err } url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Push %s", url) + log.Infof("Push %s %s", typ, url) - fi, err := f.Stat() + sz, err := f.Size() if err != nil { - return err + return xerrors.Errorf("getting size: %w", err) } - bar := pb.New64(fi.Size()) + bar := pb.New64(sz) bar.ShowPercent = true bar.ShowSpeed = true bar.Units = pb.U_BYTES @@ -177,11 +210,25 @@ func (w *worker) push(typ string, sectorID uint64) error { bar.Start() defer bar.Finish() //todo set content size - req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f)) + + header := w.auth + + var r io.Reader + r, file := f.(files.File) + if !file { + mfr := files.NewMultiFileReader(f.(files.Directory), true) + + header.Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) + r = mfr + } else { + header.Set("Content-Type", "application/octet-stream") + } + + req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r)) if err != nil { return err } - req.Header = w.auth + req.Header = header resp, err := http.DefaultClient.Do(req) if err != nil { @@ -201,7 +248,10 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) err = w.fetch("staged", sectorID) case sectorbuilder.WorkerCommit: err = w.fetch("sealed", sectorID) - // todo: cache + if err != nil { + return xerrors.Errorf("fetch sealed: %w", err) + } + err = w.fetch("cache", sectorID) } if err != nil { return xerrors.Errorf("fetch failed: %w", err) @@ -210,5 +260,5 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) } func errRes(err error) sectorbuilder.SealRes { - return sectorbuilder.SealRes{Err: err.Error()} + return sectorbuilder.SealRes{Err: err.Error(), GoErr: err} } diff --git a/lib/sectorbuilder/files.go b/lib/sectorbuilder/files.go index 0869d4030..876bc71b7 100644 --- a/lib/sectorbuilder/files.go +++ b/lib/sectorbuilder/files.go @@ -44,23 +44,16 @@ func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) { return dir, err } -func (sb *SectorBuilder) OpenRemoteRead(typ string, sectorName string) (*os.File, error) { +func (sb *SectorBuilder) GetPath(typ string, sectorName string) (string, error) { switch typ { case "staged": - return os.OpenFile(filepath.Join(sb.stagedDir, sectorName), os.O_RDONLY, 0644) + return filepath.Join(sb.stagedDir, sectorName), nil case "sealed": - return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_RDONLY, 0644) + return filepath.Join(sb.sealedDir, sectorName), nil + case "cache": + return filepath.Join(sb.cacheDir, sectorName), nil default: - return nil, xerrors.Errorf("unknown sector type for read: %s", typ) - } -} - -func (sb *SectorBuilder) OpenRemoteWrite(typ string, sectorName string) (*os.File, error) { - switch typ { - case "sealed": - return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_WRONLY|os.O_CREATE, 0644) - default: - return nil, xerrors.Errorf("unknown sector type for write: %s", typ) + return "", xerrors.Errorf("unknown sector type for write: %s", typ) } } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index eaebde51f..b8e08c3c4 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -98,6 +98,7 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { type SealRes struct { Err string + GoErr error `json:"-"` Proof []byte Rspco JsonRSPCO diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a4f04cd62..96884170f 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,17 +3,18 @@ package impl import ( "context" "encoding/json" - "fmt" - "github.com/gorilla/mux" - "io" - "net/http" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/gorilla/mux" + files "github.com/ipfs/go-ipfs-files" + "io" + "mime" + "net/http" + "os" ) type StorageMinerAPI struct { @@ -48,22 +49,40 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - fr, err := sm.SectorBuilder.OpenRemoteRead(vars["type"], vars["sname"]) + path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) if err != nil { log.Error(err) w.WriteHeader(500) return } - defer fr.Close() - fi, err := fr.Stat() + stat, err := os.Stat(path) if err != nil { + log.Error(err) + w.WriteHeader(500) return } - w.Header().Set("Content-Length", fmt.Sprint(fi.Size())) + f, err := files.NewSerialFile(path, false, stat) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + var rd io.Reader + rd, file := f.(files.File) + if !file { + mfr := files.NewMultiFileReader(f.(files.Directory), true) + + w.Header().Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) + rd = mfr + } else { + w.Header().Set("Content-Type", "application/octet-stream") + } + w.WriteHeader(200) - if _, err := io.Copy(w, fr); err != nil { + if _, err := io.Copy(w, rd); err != nil { log.Error(err) return } @@ -72,21 +91,58 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - fr, err := sm.SectorBuilder.OpenRemoteWrite(vars["type"], vars["sname"]) + path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) if err != nil { log.Error(err) w.WriteHeader(500) return } - defer fr.Close() - w.WriteHeader(200) - n, err := io.Copy(fr, r.Body) + var file files.Node + + mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { log.Error(err) + w.WriteHeader(500) return } - log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], n) + + switch mediatype { + case "multipart/form-data": + mpr, err := r.MultipartReader() + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + file, err = files.NewFileFromPartReader(mpr, mediatype) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + default: + file = files.NewReaderFile(r.Body) + } + + // WriteTo is unhappy when things exist (also cleans up cache after Commit) + if err := os.RemoveAll(path); err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + if err := files.WriteTo(file, path); err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + w.WriteHeader(200) + + log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength) } func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) { diff --git a/scripts/dev/drop-local-repos b/scripts/dev/drop-local-repos index df43d3e6c..939030bad 100755 --- a/scripts/dev/drop-local-repos +++ b/scripts/dev/drop-local-repos @@ -2,4 +2,4 @@ set -o xtrace -rm -rf ~/.lotus ~/.lotusstorage/ ~/.genesis-sectors \ No newline at end of file +rm -rf ~/.lotus ~/.lotusstorage/ ~/.genesis-sectors ~/.lotusworker From a7239d7f0165c501e088a11dcf21680e71bb625b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Dec 2019 11:05:35 +0100 Subject: [PATCH 11/14] allow setting api host in storageminer --- cmd/lotus-storage-miner/run.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index b51d14fb2..27fa95816 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -30,7 +30,11 @@ var runCmd = &cli.Command{ Flags: []cli.Flag{ &cli.StringFlag{ Name: "api", - Value: "", + Value: "2345", + }, + &cli.StringFlag{ + Name: "apihost", + Value: "127.0.0.1", }, &cli.BoolFlag{ Name: "enable-gpu-proving", @@ -83,9 +87,9 @@ var runCmd = &cli.Command{ node.Online(), node.Repo(r), - node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") }, + node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") || cctx.IsSet("apihost") }, node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { - apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + + apima, err := multiaddr.NewMultiaddr("/ip4/"+cctx.String("apihost")+"/tcp/" + cctx.String("api")) if err != nil { return err From 28dde1a2d382c9ba822b4a3337050d5b52cd39e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 15:58:28 +0100 Subject: [PATCH 12/14] worker: Use system tar for moving cache around --- cmd/lotus-seal-worker/sub.go | 136 ---------------------------- cmd/lotus-seal-worker/transfer.go | 141 ++++++++++++++++++++++++++++++ extern/filecoin-ffi | 2 +- lib/systar/systar.go | 49 +++++++++++ node/impl/storminer.go | 59 +++++-------- 5 files changed, 211 insertions(+), 176 deletions(-) create mode 100644 cmd/lotus-seal-worker/transfer.go create mode 100644 lib/systar/systar.go diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index 0dc62b4cb..416f8e373 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -123,142 +123,6 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return res } -func (w *worker) fetch(typ string, sectorID uint64) error { - outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Fetch %s %s", typ, url) - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return xerrors.Errorf("request: %w", err) - } - req.Header = w.auth - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return xerrors.Errorf("do request: %w", err) - } - - defer resp.Body.Close() - - bar := pb.New64(resp.ContentLength) - bar.ShowPercent = true - bar.ShowSpeed = true - bar.Units = pb.U_BYTES - - barreader := bar.NewProxyReader(resp.Body) - - bar.Start() - defer bar.Finish() - - mediatype, p, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) - if err != nil { - return xerrors.Errorf("parse media type: %w", err) - } - - var file files.Node - switch mediatype { - case "multipart/form-data": - mpr := multipart.NewReader(barreader, p["boundary"]) - - file, err = files.NewFileFromPartReader(mpr, mediatype) - if err != nil { - return xerrors.Errorf("call to NewFileFromPartReader failed: %w", err) - } - - case "application/octet-stream": - file = files.NewReaderFile(barreader) - default: - return xerrors.Errorf("unknown content type: '%s'", mediatype) - } - - // WriteTo is unhappy when things exist - if err := os.RemoveAll(outname); err != nil { - return xerrors.Errorf("removing dest: %w", err) - } - - return files.WriteTo(file, outname) -} - -func (w *worker) push(typ string, sectorID uint64) error { - outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - - stat, err := os.Stat(outname) - if err != nil { - return err - } - - f, err := files.NewSerialFile(outname, false, stat) - if err != nil { - return err - } - - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Push %s %s", typ, url) - - sz, err := f.Size() - if err != nil { - return xerrors.Errorf("getting size: %w", err) - } - - bar := pb.New64(sz) - bar.ShowPercent = true - bar.ShowSpeed = true - bar.Units = pb.U_BYTES - - bar.Start() - defer bar.Finish() - //todo set content size - - header := w.auth - - var r io.Reader - r, file := f.(files.File) - if !file { - mfr := files.NewMultiFileReader(f.(files.Directory), true) - - header.Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) - r = mfr - } else { - header.Set("Content-Type", "application/octet-stream") - } - - req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r)) - if err != nil { - return err - } - req.Header = header - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - if resp.StatusCode != 200 { - return xerrors.Errorf("non-200 response: %d", resp.StatusCode) - } - - return resp.Body.Close() -} - -func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { - var err error - switch typ { - case sectorbuilder.WorkerPreCommit: - err = w.fetch("staged", sectorID) - case sectorbuilder.WorkerCommit: - err = w.fetch("sealed", sectorID) - if err != nil { - return xerrors.Errorf("fetch sealed: %w", err) - } - err = w.fetch("cache", sectorID) - } - if err != nil { - return xerrors.Errorf("fetch failed: %w", err) - } - return nil -} - func errRes(err error) sectorbuilder.SealRes { return sectorbuilder.SealRes{Err: err.Error(), GoErr: err} } diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go new file mode 100644 index 000000000..b363e2324 --- /dev/null +++ b/cmd/lotus-seal-worker/transfer.go @@ -0,0 +1,141 @@ +package main + +import ( + "io" + "mime" + "net/http" + "os" + + files "github.com/ipfs/go-ipfs-files" + "golang.org/x/xerrors" + "gopkg.in/cheggaaa/pb.v1" + "path/filepath" + + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/lib/systar" +) + +func (w *worker) fetch(typ string, sectorID uint64) error { + outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Fetch %s %s", typ, url) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return xerrors.Errorf("request: %w", err) + } + req.Header = w.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return xerrors.Errorf("do request: %w", err) + } + + defer resp.Body.Close() + + bar := pb.New64(resp.ContentLength) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES + + barreader := bar.NewProxyReader(resp.Body) + + bar.Start() + defer bar.Finish() + + mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return xerrors.Errorf("parse media type: %w", err) + } + + // WriteTo is unhappy when things exist + if err := os.RemoveAll(outname); err != nil { + return xerrors.Errorf("removing dest: %w", err) + } + + switch mediatype { + case "application/x-tar": + return systar.ExtractTar(barreader, outname) + case "application/octet-stream": + return files.WriteTo(files.NewReaderFile(barreader), outname) + default: + return xerrors.Errorf("unknown content type: '%s'", mediatype) + } + +} + +func (w *worker) push(typ string, sectorID uint64) error { + filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Push %s %s", typ, url) + + stat, err := os.Stat(filename) + if err != nil { + return err + } + + var r io.Reader + if stat.IsDir() { + r, err = systar.TarDirectory(filename) + } else { + r, err = os.OpenFile(filename, os.O_RDONLY, 0644) + } + if err != nil { + return xerrors.Errorf("opening push reader: %w", err) + } + + bar := pb.New64(0) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.ShowCounters = true + bar.Units = pb.U_BYTES + + bar.Start() + defer bar.Finish() + //todo set content size + + header := w.auth + + + if stat.IsDir() { + header.Set("Content-Type", "application/x-tar") + } else { + header.Set("Content-Type", "application/octet-stream") + } + + req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r)) + if err != nil { + return err + } + req.Header = header + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 response: %d", resp.StatusCode) + } + + return resp.Body.Close() +} + +func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { + var err error + switch typ { + case sectorbuilder.WorkerPreCommit: + err = w.fetch("staged", sectorID) + case sectorbuilder.WorkerCommit: + err = w.fetch("sealed", sectorID) + if err != nil { + return xerrors.Errorf("fetch sealed: %w", err) + } + err = w.fetch("cache", sectorID) + } + if err != nil { + return xerrors.Errorf("fetch failed: %w", err) + } + return nil +} diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 9faf00cb5..ebb3e13ad 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 9faf00cb536fd86559440a09de9131520ae1ca0e +Subproject commit ebb3e13addf13059658ba92e84c9ce4300fbdf25 diff --git a/lib/systar/systar.go b/lib/systar/systar.go new file mode 100644 index 000000000..1fb42d108 --- /dev/null +++ b/lib/systar/systar.go @@ -0,0 +1,49 @@ +package systar + +import ( + "io" + "os/exec" + "path/filepath" +) + +func ExtractTar(body io.Reader, dest string) error { + cmd := exec.Command("tar", "-xS", "-C", dest) + cmd.Stdin = body + return cmd.Run() +} + +func TarDirectory(file string) (io.ReadCloser, error) { + // use system builtin tar, golang one doesn't support sparse files + + dir := filepath.Dir(file) + base := filepath.Base(file) + + i, o := io.Pipe() + + // don't bother with compression, it's mostly random data + cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base) + cmd.Stdout = o + if err := cmd.Start(); err != nil { + return nil, err + } + + return &struct { + io.Reader + io.Closer + }{ + Reader: i, + Closer: closer(func() error { + e1 := i.Close() + if err := cmd.Wait(); err != nil { + return err + } + + return e1 + }), + }, nil +} + +type closer func() error +func (cl closer) Close() error { + return cl() +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index dd4c6886b..5e413c805 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/lib/systar" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -63,24 +64,20 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques return } - f, err := files.NewSerialFile(path, false, stat) + var rd io.Reader + if stat.IsDir() { + rd, err = systar.TarDirectory(path) + w.Header().Set("Content-Type", "application/x-tar") + } else { + rd, err = os.OpenFile(path, os.O_RDONLY, 0644) + w.Header().Set("Content-Type", "application/octet-stream") + } if err != nil { log.Error(err) w.WriteHeader(500) return } - var rd io.Reader - rd, file := f.(files.File) - if !file { - mfr := files.NewMultiFileReader(f.(files.Directory), true) - - w.Header().Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) - rd = mfr - } else { - w.Header().Set("Content-Type", "application/octet-stream") - } - w.WriteHeader(200) if _, err := io.Copy(w, rd); err != nil { log.Error(err) @@ -98,8 +95,6 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - var file files.Node - mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { log.Error(err) @@ -107,37 +102,23 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - switch mediatype { - case "multipart/form-data": - mpr, err := r.MultipartReader() - if err != nil { - log.Error(err) - w.WriteHeader(500) - return - } - - file, err = files.NewFileFromPartReader(mpr, mediatype) - if err != nil { - log.Error(err) - w.WriteHeader(500) - return - } - - default: - file = files.NewReaderFile(r.Body) - } - - // WriteTo is unhappy when things exist (also cleans up cache after Commit) if err := os.RemoveAll(path); err != nil { log.Error(err) w.WriteHeader(500) return } - if err := files.WriteTo(file, path); err != nil { - log.Error(err) - w.WriteHeader(500) - return + switch mediatype { + case "application/x-tar": + if err := systar.ExtractTar(r.Body, path); err != nil { + return + } + default: + if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil { + log.Error(err) + w.WriteHeader(500) + return + } } w.WriteHeader(200) From b2001db9d530298c718d2f1f25b212b7730bf076 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 17:53:32 +0100 Subject: [PATCH 13/14] worker: Fix rebaining bugs --- Makefile | 6 ++--- build/params_devnet.go | 6 +++++ build/params_shared.go | 6 ----- cmd/lotus-seal-worker/main.go | 2 +- cmd/lotus-seal-worker/sub.go | 24 +++++++++---------- cmd/lotus-seal-worker/transfer.go | 16 +++++++++---- cmd/lotus-storage-miner/init.go | 6 ++++- cmd/lotus-storage-miner/run.go | 38 ++++++++++++++++++------------ lib/sectorbuilder/remote.go | 2 +- lib/sectorbuilder/sectorbuilder.go | 16 ++++++------- lib/sectorbuilder/simple.go | 2 +- lib/systar/systar.go | 34 +++++++++++++------------- lotuspond/front/src/StorageNode.js | 5 ++++ lotuspond/spawn.go | 6 ++--- node/impl/storminer.go | 7 +++--- 15 files changed, 99 insertions(+), 77 deletions(-) diff --git a/Makefile b/Makefile index 0a3235bc0..90dc91d5d 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ deps: $(BUILD_DEPS) .PHONY: deps debug: GOFLAGS=-tags=debug -debug: lotus lotus-storage-miner +debug: lotus lotus-storage-miner lotus-seal-worker lotus-seed lotus: $(BUILD_DEPS) rm -f lotus @@ -65,7 +65,7 @@ BINS+=lotus-storage-miner lotus-seal-worker: $(BUILD_DEPS) rm -f lotus-seal-worker - go build -o lotus-seal-worker ./cmd/lotus-seal-worker + go build $(GOFLAGS) -o lotus-seal-worker ./cmd/lotus-seal-worker go run github.com/GeertJohan/go.rice/rice append --exec lotus-seal-worker -i ./build .PHONY: lotus-seal-worker BINS+=lotus-seal-worker @@ -81,7 +81,7 @@ install: lotus-seed: $(BUILD_DEPS) rm -f lotus-seed - go build -o lotus-seed ./cmd/lotus-seed + go build $(GOFLAGS) -o lotus-seed ./cmd/lotus-seed go run github.com/GeertJohan/go.rice/rice append --exec lotus-seed -i ./build .PHONY: lotus-seed diff --git a/build/params_devnet.go b/build/params_devnet.go index 6f53d2704..6c4dab128 100644 --- a/build/params_devnet.go +++ b/build/params_devnet.go @@ -2,6 +2,12 @@ package build +var SectorSizes = []uint64{ + 16 << 20, + 256 << 20, + 1 << 30, +} + // Seconds const BlockDelay = 30 diff --git a/build/params_shared.go b/build/params_shared.go index 40eb643f0..3d3bc9aac 100644 --- a/build/params_shared.go +++ b/build/params_shared.go @@ -12,12 +12,6 @@ import ( const UnixfsChunkSize uint64 = 1 << 20 const UnixfsLinksPerLevel = 1024 -var SectorSizes = []uint64{ - 16 << 20, - 256 << 20, - 1 << 30, -} - func SupportedSectorSize(ssize uint64) bool { for _, ss := range SectorSizes { if ssize == ss { diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 8fe1f8e9d..3fa05d037 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -83,7 +83,7 @@ var runCmd = &cli.Command{ go func() { <-ctx.Done() - os.Exit(0) + log.Warn("Shutting down..") }() return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r) diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index 416f8e373..b3cb828e0 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -2,13 +2,7 @@ package main import ( "context" - files "github.com/ipfs/go-ipfs-files" - "gopkg.in/cheggaaa/pb.v1" - "io" - "mime" - "mime/multipart" "net/http" - "os" "path/filepath" "golang.org/x/xerrors" @@ -62,15 +56,21 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth return err } - for task := range tasks { - log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) +loop: + for { + select { + case task := <-tasks: + log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) - res := w.processTask(ctx, task) + res := w.processTask(ctx, task) - log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) + log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) - if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { - log.Error(err) + if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { + log.Error(err) + } + case <-ctx.Done(): + break loop } } diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index b363e2324..68727c056 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -31,9 +31,12 @@ func (w *worker) fetch(typ string, sectorID uint64) error { if err != nil { return xerrors.Errorf("do request: %w", err) } - defer resp.Body.Close() + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 code: %d", resp.StatusCode) + } + bar := pb.New64(resp.ContentLength) bar.ShowPercent = true bar.ShowSpeed = true @@ -49,14 +52,13 @@ func (w *worker) fetch(typ string, sectorID uint64) error { return xerrors.Errorf("parse media type: %w", err) } - // WriteTo is unhappy when things exist if err := os.RemoveAll(outname); err != nil { return xerrors.Errorf("removing dest: %w", err) } switch mediatype { case "application/x-tar": - return systar.ExtractTar(barreader, outname) + return systar.ExtractTar(barreader, filepath.Dir(outname)) case "application/octet-stream": return files.WriteTo(files.NewReaderFile(barreader), outname) default: @@ -98,7 +100,6 @@ func (w *worker) push(typ string, sectorID uint64) error { header := w.auth - if stat.IsDir() { header.Set("Content-Type", "application/x-tar") } else { @@ -119,7 +120,12 @@ func (w *worker) push(typ string, sectorID uint64) error { return xerrors.Errorf("non-200 response: %d", resp.StatusCode) } - return resp.Body.Close() + if err := resp.Body.Close(); err != nil { + return err + } + + // TODO: keep files around for later stages of sealing + return os.RemoveAll(filename) } func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 359277cce..65b6c9597 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -71,6 +71,10 @@ var initCmd = &cli.Command{ Name: "pre-sealed-sectors", Usage: "specify set of presealed sectors for starting as a genesis miner", }, + &cli.BoolFlag{ + Name: "nosync", + Usage: "don't check full-node sync status", + }, }, Action: func(cctx *cli.Context) error { log.Info("Initializing lotus storage miner") @@ -91,7 +95,7 @@ var initCmd = &cli.Command{ log.Info("Checking full node sync status") - if !cctx.Bool("genesis-miner") { + if !cctx.Bool("genesis-miner") && !cctx.Bool("nosync") { if err := lcli.SyncWait(ctx, api); err != nil { return xerrors.Errorf("sync wait: %w", err) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 0dbba9159..b96c20f6e 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -24,6 +24,8 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) +const defaultListen = "/ip4/127.0.0.1/tcp/" + var runCmd = &cli.Command{ Name: "run", Usage: "Start a lotus storage miner process", @@ -32,18 +34,14 @@ var runCmd = &cli.Command{ Name: "api", Value: "2345", }, - &cli.StringFlag{ - Name: "apihost", - Value: "127.0.0.1", - }, &cli.BoolFlag{ Name: "enable-gpu-proving", - Usage: "Enable use of GPU for mining operations", + Usage: "enable use of GPU for mining operations", Value: true, }, &cli.BoolFlag{ Name: "nosync", - Usage: "Don't check full-node sync status", + Usage: "don't check full-node sync status", }, }, Action: func(cctx *cli.Context) error { @@ -99,15 +97,13 @@ var runCmd = &cli.Command{ node.Online(), node.Repo(r), - node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") || cctx.IsSet("apihost") }, - node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { - apima, err := multiaddr.NewMultiaddr("/ip4/"+cctx.String("apihost")+"/tcp/" + - cctx.String("api")) - if err != nil { - return err - } - return lr.SetAPIEndpoint(apima) - })), + node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { + apima, err := parseApi(cctx.String("api")) + if err != nil { + return err + } + return lr.SetAPIEndpoint(apima) + }), node.Override(new(api.FullNode), nodeApi), ) @@ -170,3 +166,15 @@ var runCmd = &cli.Command{ return srv.Serve(manet.NetListener(lst)) }, } + +func parseApi(api string) (multiaddr.Multiaddr, error) { + if api == "" { + return nil, xerrors.New("empty --api") + } + + if api[0] != '/' { + api = defaultListen + api + } + + return multiaddr.NewMultiaddr(api) +} diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index 78477f364..5add710ff 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -35,6 +35,7 @@ type workerCall struct { func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) { sb.remoteLk.Lock() + defer sb.remoteLk.Unlock() taskCh := make(chan WorkerTask) r := &remote{ @@ -44,7 +45,6 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro sb.remoteCtr++ sb.remotes[sb.remoteCtr] = r - sb.remoteLk.Unlock() go sb.remoteWorker(ctx, r) diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index cc11df891..400bdb058 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -101,7 +101,7 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { } type SealRes struct { - Err string + Err string GoErr error `json:"-"` Proof []byte @@ -201,15 +201,15 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) { } return &SectorBuilder{ - ds: nil, + ds: nil, - ssize: cfg.SectorSize, + ssize: cfg.SectorSize, - Miner: cfg.Miner, - stagedDir: cfg.StagedDir, - sealedDir: cfg.SealedDir, - cacheDir: cfg.CacheDir, - unsealedDir:cfg.UnsealedDir, + Miner: cfg.Miner, + stagedDir: cfg.StagedDir, + sealedDir: cfg.SealedDir, + cacheDir: cfg.CacheDir, + unsealedDir: cfg.UnsealedDir, sealLocal: true, taskCtr: 1, diff --git a/lib/sectorbuilder/simple.go b/lib/sectorbuilder/simple.go index a5360e102..f20395ddd 100644 --- a/lib/sectorbuilder/simple.go +++ b/lib/sectorbuilder/simple.go @@ -76,4 +76,4 @@ func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen] func GenerateDataCommitment(ssize uint64, pieces []sectorbuilder.PublicPieceInfo) ([CommLen]byte, error) { return sectorbuilder.GenerateDataCommitment(ssize, pieces) -} \ No newline at end of file +} diff --git a/lib/systar/systar.go b/lib/systar/systar.go index 1fb42d108..c83999376 100644 --- a/lib/systar/systar.go +++ b/lib/systar/systar.go @@ -1,12 +1,22 @@ package systar import ( + "golang.org/x/xerrors" "io" + "os" "os/exec" "path/filepath" + + logging "github.com/ipfs/go-log" ) +var log = logging.Logger("systar") + func ExtractTar(body io.Reader, dest string) error { + if err := os.MkdirAll(dest, 0755); err != nil { + return xerrors.Errorf("creating dest directory: %w", err) + } + cmd := exec.Command("tar", "-xS", "-C", dest) cmd.Stdin = body return cmd.Run() @@ -27,23 +37,11 @@ func TarDirectory(file string) (io.ReadCloser, error) { return nil, err } - return &struct { - io.Reader - io.Closer - }{ - Reader: i, - Closer: closer(func() error { - e1 := i.Close() - if err := cmd.Wait(); err != nil { - return err - } + go func() { + if err := o.CloseWithError(cmd.Wait()); err != nil { + log.Error(err) + } + }() - return e1 - }), - }, nil -} - -type closer func() error -func (cl closer) Close() error { - return cl() + return i, nil } diff --git a/lotuspond/front/src/StorageNode.js b/lotuspond/front/src/StorageNode.js index 903729d9f..7851fbb4a 100644 --- a/lotuspond/front/src/StorageNode.js +++ b/lotuspond/front/src/StorageNode.js @@ -16,6 +16,11 @@ let sealCodes = [ "PreCommitted", "Committing", "Proving", + + "SealFailed", + "PreCommitFailed", + "SealCommitFailed", + "CommitFailed", ] class StorageNode extends React.Component { diff --git a/lotuspond/spawn.go b/lotuspond/spawn.go index 5eb8ca52d..bcf2dd001 100644 --- a/lotuspond/spawn.go +++ b/lotuspond/spawn.go @@ -136,7 +136,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { return nodeInfo{}, err } - initArgs := []string{"init"} + initArgs := []string{"init", "--nosync"} if fullNodeRepo == api.running[1].meta.Repo { initArgs = []string{"init", "--actor=t0101", "--genesis-miner", "--pre-sealed-sectors=" + filepath.Join(fullNodeRepo, "preseal")} } @@ -154,7 +154,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { mux := newWsMux() - cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) + cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id), "--nosync") cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo) @@ -214,7 +214,7 @@ func (api *api) RestartNode(id int32) (nodeInfo, error) { var cmd *exec.Cmd if nd.meta.Storage { - cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) + cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id), "--nosync") } else { cmd = exec.Command("./lotus", "daemon", "--api", fmt.Sprintf("%d", 2500+id)) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 5e413c805..25cdf16b9 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -16,6 +16,7 @@ import ( "mime" "net/http" "os" + "path/filepath" ) type StorageMinerAPI struct { @@ -110,7 +111,9 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques switch mediatype { case "application/x-tar": - if err := systar.ExtractTar(r.Body, path); err != nil { + if err := systar.ExtractTar(r.Body, filepath.Dir(path)); err != nil { + log.Error(err) + w.WriteHeader(500) return } default: @@ -203,8 +206,6 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde } func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { - log.Infof("WDUN RSPKO %v", res.Rspco) - return sm.SectorBuilder.TaskDone(ctx, task, res) } From 5133c357dc5cb468c1e1f3c24628e617a4920fd7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 4 Dec 2019 14:14:50 -0800 Subject: [PATCH 14/14] Better error checks --- storage/miner.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/storage/miner.go b/storage/miner.go index 524319033..dcb6b1155 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -109,7 +109,11 @@ func (m *Miner) Run(ctx context.Context) error { } go fps.run(ctx) - go m.sectorStateLoop(ctx) + if err := m.sectorStateLoop(ctx); err != nil { + log.Error(err) + return xerrors.Errorf("failed to startup sector state loop: %w", err) + } + return nil }