Merge pull request #755 from filecoin-project/feat/worker-restrict-type

sectorbuilder: Allow to restrict task types
This commit is contained in:
Whyrusleeping 2019-12-07 21:19:35 +01:00 committed by GitHub
commit 8127676f73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 326 additions and 181 deletions

View File

@ -73,7 +73,7 @@ type StorageMiner interface {
WorkerStats(context.Context) (sectorbuilder.WorkerStats, error)
// WorkerQueue registers a remote worker
WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error)
WorkerQueue(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
}

View File

@ -143,8 +143,8 @@ type StorageMinerStruct struct {
WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"`
WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
}
}
@ -522,8 +522,8 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.Wor
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx)
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx, cfg)
}
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {

View File

@ -8,10 +8,10 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-hamt-ipld"
"github.com/minio/blake2b-simd"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
"github.com/minio/blake2b-simd"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
vstate "github.com/filecoin-project/chain-validation/pkg/state"
vactors "github.com/filecoin-project/chain-validation/pkg/state/actors"
@ -327,7 +327,7 @@ func fromActorCode(code vactors.ActorCodeID) cid.Cid {
}
}
func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address{
func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address {
switch addr {
case vactors.InitAddress:
out, err := vaddress.NewFromBytes(actors.InitAddress.Bytes())
@ -336,25 +336,25 @@ func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address{
}
return out
case vactors.NetworkAddress:
out, err := vaddress.NewFromBytes(actors.NetworkAddress.Bytes())
out, err := vaddress.NewFromBytes(actors.NetworkAddress.Bytes())
if err != nil {
panic(err)
}
return out
case vactors.StorageMarketAddress:
out, err := vaddress.NewFromBytes(actors.StorageMarketAddress.Bytes())
out, err := vaddress.NewFromBytes(actors.StorageMarketAddress.Bytes())
if err != nil {
panic(err)
}
return out
case vactors.BurntFundsAddress:
out, err := vaddress.NewFromBytes(actors.BurntFundsAddress.Bytes())
out, err := vaddress.NewFromBytes(actors.BurntFundsAddress.Bytes())
if err != nil {
panic(err)
}
return out
case vactors.StoragePowerAddress:
out, err := vaddress.NewFromBytes(actors.StoragePowerAddress.Bytes())
out, err := vaddress.NewFromBytes(actors.StoragePowerAddress.Bytes())
if err != nil {
panic(err)
}

View File

@ -10,8 +10,8 @@ import (
)
var dotCmd = &cli.Command{
Name: "dot",
Usage: "generate dot graphs",
Name: "dot",
Usage: "generate dot graphs",
ArgsUsage: "<minHeight> <toseeHeight>",
Action: func(cctx *cli.Context) error {
st, err := openStorage(cctx.String("db"))

View File

@ -1,9 +1,10 @@
package main
import (
"github.com/mitchellh/go-homedir"
"os"
"github.com/mitchellh/go-homedir"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
@ -44,6 +45,12 @@ func main() {
Usage: "enable use of GPU for mining operations",
Value: true,
},
&cli.BoolFlag{
Name: "no-precommit",
},
&cli.BoolFlag{
Name: "no-commit",
},
},
Commands: local,
@ -95,6 +102,6 @@ var runCmd = &cli.Command{
log.Warn("Shutting down..")
}()
return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r)
return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r, cctx.Bool("no-precommit"), cctx.Bool("no-commit"))
},
}

View File

@ -7,12 +7,13 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type worker struct {
api api.StorageMiner
api lapi.StorageMiner
minerEndpoint string
repo string
auth http.Header
@ -20,7 +21,7 @@ type worker struct {
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error {
func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
act, err := api.ActorAddress(ctx)
if err != nil {
return err
@ -43,6 +44,10 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth
return err
}
if err := build.GetParams(ssize); err != nil {
return xerrors.Errorf("get params: %w", err)
}
w := &worker{
api: api,
minerEndpoint: endpoint,
@ -51,13 +56,18 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth
sb: sb,
}
tasks, err := api.WorkerQueue(ctx)
tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{
NoPreCommit: noprecommit,
NoCommit: nocommit,
})
if err != nil {
return err
}
loop:
for {
log.Infof("Waiting for new task")
select {
case task := <-tasks:
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type)
@ -90,6 +100,8 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
return errRes(xerrors.Errorf("fetching sector: %w", err))
}
log.Infof("Data fetched, starting computation")
var res sectorbuilder.SealRes
switch task.Type {

View File

@ -12,7 +12,7 @@ import (
"path/filepath"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/lib/systar"
"github.com/filecoin-project/lotus/lib/tarutil"
)
func (w *worker) fetch(typ string, sectorID uint64) error {
@ -58,7 +58,7 @@ func (w *worker) fetch(typ string, sectorID uint64) error {
switch mediatype {
case "application/x-tar":
return systar.ExtractTar(barreader, filepath.Dir(outname))
return tarutil.ExtractTar(barreader, outname)
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(barreader), outname)
default:
@ -80,7 +80,7 @@ func (w *worker) push(typ string, sectorID uint64) error {
var r io.Reader
if stat.IsDir() {
r, err = systar.TarDirectory(filename)
r, err = tarutil.TarDirectory(filename)
} else {
r, err = os.OpenFile(filename, os.O_RDONLY, 0644)
}

View File

@ -63,6 +63,12 @@ var infoCmd = &cli.Command{
fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved)
fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal)
fmt.Printf("Queues:\n")
fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait)
fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait)
fmt.Printf("\tCommit: %d\n", wstat.CommitWait)
fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait)
eps, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil)
if err != nil {
return err

View File

@ -249,10 +249,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
CommP: sector.CommD[:],
},
},
CommC: nil,
CommD: sector.CommD[:],
CommR: sector.CommR[:],
CommRLast: nil,
Proof: nil,
Ticket: storage.SealTicket{},
PreCommitMessage: nil,
@ -358,7 +356,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err
}
sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2)(mds, api)
sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2, false, false)(mds, api)
if err != nil {
return xerrors.Errorf("getting genesis miner sector builder config: %w", err)
}

View File

@ -2,6 +2,7 @@ package main
import (
"fmt"
"sort"
"strconv"
"golang.org/x/xerrors"
@ -125,6 +126,10 @@ var sectorsListCmd = &cli.Command{
commitedIDs[info.SectorID] = struct{}{}
}
sort.Slice(list, func(i, j int) bool {
return list[i] < list[j]
})
for _, s := range list {
st, err := nodeApi.SectorsStatus(ctx, s)
if err != nil {

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 6d9e80001bfa2d80eec4e157da46d783038d9b42
Subproject commit fe188cfa1e082e4c41cadeb17a3a7c9e43ae6f03

View File

@ -33,7 +33,7 @@ type workerCall struct {
ret chan SealRes
}
func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) {
func (sb *SectorBuilder) AddWorker(ctx context.Context, cfg WorkerCfg) (<-chan WorkerTask, error) {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
@ -46,22 +46,32 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro
sb.remoteCtr++
sb.remotes[sb.remoteCtr] = r
go sb.remoteWorker(ctx, r)
go sb.remoteWorker(ctx, r, cfg)
return taskCh, nil
}
func (sb *SectorBuilder) returnTask(task workerCall) {
var ret chan workerCall
switch task.task.Type {
case WorkerPreCommit:
ret = sb.precommitTasks
case WorkerCommit:
ret = sb.commitTasks
default:
log.Error("unknown task type", task.task.Type)
}
go func() {
select {
case sb.sealTasks <- task:
case ret <- task:
case <-sb.stopping:
return
}
}()
}
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote, cfg WorkerCfg) {
defer log.Warn("Remote worker disconnected")
defer func() {
@ -76,47 +86,21 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
}
}()
precommits := sb.precommitTasks
if cfg.NoPreCommit {
precommits = nil
}
commits := sb.commitTasks
if cfg.NoCommit {
commits = nil
}
for {
select {
case task := <-sb.sealTasks:
resCh := make(chan SealRes)
sb.remoteLk.Lock()
sb.remoteResults[task.task.TaskID] = resCh
sb.remoteLk.Unlock()
// send the task
select {
case r.sealTasks <- task.task:
case <-ctx.Done():
sb.returnTask(task)
return
}
r.lk.Lock()
r.busy = task.task.TaskID
r.lk.Unlock()
// wait for the result
select {
case res := <-resCh:
// send the result back to the caller
select {
case task.ret <- res:
case <-ctx.Done():
return
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
case <-sb.stopping:
return
}
case task := <-commits:
sb.doTask(ctx, r, task)
case task := <-precommits:
sb.doTask(ctx, r, task)
case <-ctx.Done():
return
case <-sb.stopping:
@ -129,6 +113,46 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
}
}
func (sb *SectorBuilder) doTask(ctx context.Context, r *remote, task workerCall) {
resCh := make(chan SealRes)
sb.remoteLk.Lock()
sb.remoteResults[task.task.TaskID] = resCh
sb.remoteLk.Unlock()
// send the task
select {
case r.sealTasks <- task.task:
case <-ctx.Done():
sb.returnTask(task)
return
}
r.lk.Lock()
r.busy = task.task.TaskID
r.lk.Unlock()
// wait for the result
select {
case res := <-resCh:
// send the result back to the caller
select {
case task.ret <- res:
case <-ctx.Done():
return
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
case <-sb.stopping:
return
}
}
func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error {
sb.remoteLk.Lock()
rres, ok := sb.remoteResults[task]

View File

@ -45,6 +45,13 @@ type EPostCandidate = sectorbuilder.Candidate
const CommLen = sectorbuilder.CommitmentBytesLen
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
// TODO: 'cost' info, probably in terms of sealing + transfer speed
}
type SectorBuilder struct {
ds dtypes.MetadataDS
idLk sync.Mutex
@ -61,10 +68,12 @@ type SectorBuilder struct {
unsealLk sync.Mutex
sealLocal bool
rateLimit chan struct{}
noCommit bool
noPreCommit bool
rateLimit chan struct{}
sealTasks chan workerCall
precommitTasks chan workerCall
commitTasks chan workerCall
taskCtr uint64
remoteLk sync.Mutex
@ -72,31 +81,30 @@ type SectorBuilder struct {
remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes
addPieceWait int32
preCommitWait int32
commitWait int32
unsealWait int32
stopping chan struct{}
}
type JsonRSPCO struct {
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
CommD []byte
CommR []byte
}
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{
CommC: rspco.CommC[:],
CommD: rspco.CommD[:],
CommR: rspco.CommR[:],
CommRLast: rspco.CommRLast[:],
CommD: rspco.CommD[:],
CommR: rspco.CommR[:],
}
}
func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
var out RawSealPreCommitOutput
copy(out.CommC[:], rspco.CommC)
copy(out.CommD[:], rspco.CommD)
copy(out.CommR[:], rspco.CommR)
copy(out.CommRLast[:], rspco.CommRLast)
return out
}
@ -121,6 +129,8 @@ type Config struct {
WorkerThreads uint8
FallbackLastID uint64
NoCommit bool
NoPreCommit bool
CacheDir string
SealedDir string
@ -179,13 +189,15 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
Miner: cfg.Miner,
sealLocal: sealLocal,
rateLimit: make(chan struct{}, rlimit),
noPreCommit: cfg.NoPreCommit || !sealLocal,
noCommit: cfg.NoCommit || !sealLocal,
rateLimit: make(chan struct{}, rlimit),
taskCtr: 1,
sealTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{},
taskCtr: 1,
precommitTasks: make(chan workerCall),
commitTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{},
stopping: make(chan struct{}),
}
@ -214,7 +226,6 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) {
cacheDir: cfg.CacheDir,
unsealedDir: cfg.UnsealedDir,
sealLocal: true,
taskCtr: 1,
remotes: map[int]*remote{},
rateLimit: make(chan struct{}, cfg.WorkerThreads),
@ -245,6 +256,11 @@ type WorkerStats struct {
// todo: post in progress
RemotesTotal int
RemotesFree int
AddPieceWait int
PreCommitWait int
CommitWait int
UnsealWait int
}
func (sb *SectorBuilder) WorkerStats() WorkerStats {
@ -264,6 +280,11 @@ func (sb *SectorBuilder) WorkerStats() WorkerStats {
LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers,
RemotesTotal: len(sb.remotes),
RemotesFree: remoteFree,
AddPieceWait: int(atomic.LoadInt32(&sb.addPieceWait)),
PreCommitWait: int(atomic.LoadInt32(&sb.preCommitWait)),
CommitWait: int(atomic.LoadInt32(&sb.commitWait)),
UnsealWait: int(atomic.LoadInt32(&sb.unsealWait)),
}
}
@ -288,7 +309,9 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
defer ret()
f, werr, err := toReadableFile(file, int64(pieceSize))
@ -321,8 +344,11 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
}
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
defer ret()
atomic.AddInt32(&sb.unsealWait, -1)
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock()
@ -389,6 +415,8 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
}
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
atomic.AddInt32(&sb.preCommitWait, -1)
select {
case ret := <-call.ret:
var err error
@ -413,8 +441,10 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.preCommitWait, 1)
select { // prefer remote
case sb.sealTasks <- call:
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
default:
}
@ -422,16 +452,18 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
sb.checkRateLimit()
rl := sb.rateLimit
if !sb.sealLocal {
if sb.noPreCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.sealTasks <- call:
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
case rl <- struct{}{}:
}
atomic.AddInt32(&sb.preCommitWait, -1)
// local
defer func() {
@ -474,10 +506,14 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
log.Infof("PRECOMMIT FFI RSPCO %v", rspco)
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
select {
case ret := <-call.ret:
if ret.Err != "" {
@ -490,6 +526,8 @@ func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err er
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
defer func() {
<-sb.rateLimit
}()
@ -535,19 +573,21 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.commitWait, 1)
select { // prefer remote
case sb.sealTasks <- call:
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
rl := sb.rateLimit
if !sb.sealLocal {
if sb.noCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.sealTasks <- call:
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -21,6 +22,10 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
func init() {
logging.SetLogLevel("*", "INFO")
}
const sectorSize = 1024
type seal struct {

View File

@ -1,47 +0,0 @@
package systar
import (
"golang.org/x/xerrors"
"io"
"os"
"os/exec"
"path/filepath"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("systar")
func ExtractTar(body io.Reader, dest string) error {
if err := os.MkdirAll(dest, 0755); err != nil {
return xerrors.Errorf("creating dest directory: %w", err)
}
cmd := exec.Command("tar", "-xS", "-C", dest)
cmd.Stdin = body
return cmd.Run()
}
func TarDirectory(file string) (io.ReadCloser, error) {
// use system builtin tar, golang one doesn't support sparse files
dir := filepath.Dir(file)
base := filepath.Base(file)
i, o := io.Pipe()
// don't bother with compression, it's mostly random data
cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base)
cmd.Stdout = o
if err := cmd.Start(); err != nil {
return nil, err
}
go func() {
if err := o.CloseWithError(cmd.Wait()); err != nil {
log.Error(err)
}
}()
return i, nil
}

88
lib/tarutil/systar.go Normal file
View File

@ -0,0 +1,88 @@
package tarutil
import (
"archive/tar"
"golang.org/x/xerrors"
"io"
"io/ioutil"
"os"
"path/filepath"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("tarutil")
func ExtractTar(body io.Reader, dir string) error {
if err := os.MkdirAll(dir, 0755); err != nil {
return xerrors.Errorf("mkdir: %w", err)
}
tr := tar.NewReader(body)
for {
header, err := tr.Next()
switch err {
default:
return err
case io.EOF:
return nil
case nil:
}
f, err := os.Create(filepath.Join(dir, header.Name))
if err != nil {
return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
}
if _, err := io.Copy(f, tr); err != nil {
return err
}
}
}
func TarDirectory(dir string) (io.ReadCloser, error) {
r, w := io.Pipe()
go func() {
_ = w.CloseWithError(writeTarDirectory(dir, w))
}()
return r, nil
}
func writeTarDirectory(dir string, w io.Writer) error {
tw := tar.NewWriter(w)
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, file := range files {
h, err := tar.FileInfoHeader(file, "")
if err != nil {
return xerrors.Errorf("getting header for file %s: %w", file.Name(), err)
}
if err := tw.WriteHeader(h); err != nil {
return xerrors.Errorf("wiritng header for file %s: %w", file.Name(), err)
}
f, err := os.OpenFile(filepath.Join(dir, file.Name()), os.O_RDONLY, 644)
if err != nil {
return xerrors.Errorf("opening %s for reading: %w", file.Name(), err)
}
if _, err := io.Copy(tw, f); err != nil {
return xerrors.Errorf("copy data for file %s: %w", file.Name(), err)
}
if err := f.Close(); err != nil {
return err
}
}
return nil
}

View File

@ -334,7 +334,10 @@ func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option {
return Options(
ConfigCommon(&cfg.Common),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, cfg.SectorBuilder.WorkerCount)),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path,
cfg.SectorBuilder.WorkerCount,
cfg.SectorBuilder.DisableLocalPreCommit,
cfg.SectorBuilder.DisableLocalCommit)),
)
}

View File

@ -50,6 +50,9 @@ type Metrics struct {
type SectorBuilder struct {
Path string
WorkerCount uint
DisableLocalPreCommit bool
DisableLocalCommit bool
}
func defCommon() Common {

View File

@ -7,17 +7,17 @@ import (
"mime"
"net/http"
"os"
"path/filepath"
"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/lib/systar"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"
)
type StorageMinerAPI struct {
@ -68,7 +68,7 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
var rd io.Reader
if stat.IsDir() {
rd, err = systar.TarDirectory(path)
rd, err = tarutil.TarDirectory(path)
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
@ -112,7 +112,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
switch mediatype {
case "application/x-tar":
if err := systar.ExtractTar(r.Body, filepath.Dir(path)); err != nil {
if err := tarutil.ExtractTar(r.Body, path); err != nil {
log.Error(err)
w.WriteHeader(500)
return
@ -206,8 +206,8 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state a
return sm.Miner.UpdateSectorState(ctx, id, state)
}
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx)
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx, cfg)
}
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {

View File

@ -52,7 +52,7 @@ func GetParams(sbc *sectorbuilder.Config) error {
return nil
}
func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) {
func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) {
return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) {
minerAddr, err := minerAddrFromDS(ds)
if err != nil {
@ -79,9 +79,12 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD
staging := filepath.Join(sp, "staging")
sb := &sectorbuilder.Config{
Miner: minerAddr,
SectorSize: ssize,
Miner: minerAddr,
SectorSize: ssize,
WorkerThreads: uint8(threads),
NoPreCommit: noprecommit,
NoCommit: nocommit,
CacheDir: cache,
UnsealedDir: unsealed,

View File

@ -263,11 +263,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.t.CommC ([]uint8) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommC)))); err != nil {
// t.t.Pad0 ([]uint8) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Pad0)))); err != nil {
return err
}
if _, err := w.Write(t.CommC); err != nil {
if _, err := w.Write(t.Pad0); err != nil {
return err
}
@ -287,11 +287,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.t.CommRLast ([]uint8) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommRLast)))); err != nil {
// t.t.Pad1 ([]uint8) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Pad1)))); err != nil {
return err
}
if _, err := w.Write(t.CommRLast); err != nil {
if _, err := w.Write(t.Pad1); err != nil {
return err
}
@ -408,21 +408,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
t.Pieces[i] = v
}
// t.t.CommC ([]uint8) (slice)
// t.t.Pad0 ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.CommC: array too large (%d)", extra)
return fmt.Errorf("t.Pad0: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommC = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommC); err != nil {
t.Pad0 = make([]byte, extra)
if _, err := io.ReadFull(br, t.Pad0); err != nil {
return err
}
// t.t.CommD ([]uint8) (slice)
@ -459,21 +459,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
if _, err := io.ReadFull(br, t.CommR); err != nil {
return err
}
// t.t.CommRLast ([]uint8) (slice)
// t.t.Pad1 ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.CommRLast: array too large (%d)", extra)
return fmt.Errorf("t.Pad1: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommRLast = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommRLast); err != nil {
t.Pad1 = make([]byte, extra)
if _, err := io.ReadFull(br, t.Pad1); err != nil {
return err
}
// t.t.Proof ([]uint8) (slice)

View File

@ -75,10 +75,8 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp
}
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
info.CommC = rspco.CommC[:]
info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:]
info.CommRLast = rspco.CommRLast[:]
info.Ticket = SealTicket{
BlockHeight: ticket.BlockHeight,
TicketBytes: ticket.TicketBytes[:],

View File

@ -55,12 +55,12 @@ type SectorInfo struct {
Pieces []Piece
// PreCommit
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
Proof []byte
Ticket SealTicket
Pad0 []byte // TODO: legacy placeholder, remove
CommD []byte
CommR []byte
Pad1 []byte // TODO: legacy placeholder, remove
Proof []byte
Ticket SealTicket
PreCommitMessage *cid.Cid
@ -105,10 +105,8 @@ func (t *SectorInfo) existingPieces() []uint64 {
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out
}

View File

@ -167,9 +167,11 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState
s.LastErr = ""
if update.err != nil {
s.LastErr = fmt.Sprintf("%+v", update.err)
if s.LastErr != "" {
s.LastErr += "---------\n\n"
}
s.LastErr += fmt.Sprintf("entering state %s: %+v", api.SectorStates[update.newState], update.err)
}
if update.mut != nil {