sectorbuilder: Allow to restrict task types

License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Łukasz Magiera 2019-12-06 01:27:32 +01:00 committed by Jakub Sztandera
parent 4e36d9198b
commit 83924e6b97
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
24 changed files with 326 additions and 181 deletions

View File

@ -73,7 +73,7 @@ type StorageMiner interface {
WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error)
// WorkerQueue registers a remote worker // WorkerQueue registers a remote worker
WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error) WorkerQueue(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
} }

View File

@ -143,7 +143,7 @@ type StorageMinerStruct struct {
WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"` WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"`
WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
} }
} }
@ -522,8 +522,8 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.Wor
return c.Internal.WorkerStats(ctx) return c.Internal.WorkerStats(ctx)
} }
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) { func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx) return c.Internal.WorkerQueue(ctx, cfg)
} }
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {

View File

@ -8,10 +8,10 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-hamt-ipld" "github.com/ipfs/go-hamt-ipld"
"github.com/minio/blake2b-simd"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors" "github.com/minio/blake2b-simd"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
vstate "github.com/filecoin-project/chain-validation/pkg/state" vstate "github.com/filecoin-project/chain-validation/pkg/state"
vactors "github.com/filecoin-project/chain-validation/pkg/state/actors" vactors "github.com/filecoin-project/chain-validation/pkg/state/actors"
@ -327,7 +327,7 @@ func fromActorCode(code vactors.ActorCodeID) cid.Cid {
} }
} }
func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address{ func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address {
switch addr { switch addr {
case vactors.InitAddress: case vactors.InitAddress:
out, err := vaddress.NewFromBytes(actors.InitAddress.Bytes()) out, err := vaddress.NewFromBytes(actors.InitAddress.Bytes())

View File

@ -1,9 +1,10 @@
package main package main
import ( import (
"github.com/mitchellh/go-homedir"
"os" "os"
"github.com/mitchellh/go-homedir"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
@ -44,6 +45,12 @@ func main() {
Usage: "enable use of GPU for mining operations", Usage: "enable use of GPU for mining operations",
Value: true, Value: true,
}, },
&cli.BoolFlag{
Name: "no-precommit",
},
&cli.BoolFlag{
Name: "no-commit",
},
}, },
Commands: local, Commands: local,
@ -95,6 +102,6 @@ var runCmd = &cli.Command{
log.Warn("Shutting down..") log.Warn("Shutting down..")
}() }()
return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r) return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r, cctx.Bool("no-precommit"), cctx.Bool("no-commit"))
}, },
} }

View File

@ -7,12 +7,13 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
type worker struct { type worker struct {
api api.StorageMiner api lapi.StorageMiner
minerEndpoint string minerEndpoint string
repo string repo string
auth http.Header auth http.Header
@ -20,7 +21,7 @@ type worker struct {
sb *sectorbuilder.SectorBuilder sb *sectorbuilder.SectorBuilder
} }
func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error { func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error {
act, err := api.ActorAddress(ctx) act, err := api.ActorAddress(ctx)
if err != nil { if err != nil {
return err return err
@ -43,6 +44,10 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth
return err return err
} }
if err := build.GetParams(ssize); err != nil {
return xerrors.Errorf("get params: %w", err)
}
w := &worker{ w := &worker{
api: api, api: api,
minerEndpoint: endpoint, minerEndpoint: endpoint,
@ -51,13 +56,18 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth
sb: sb, sb: sb,
} }
tasks, err := api.WorkerQueue(ctx) tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{
NoPreCommit: noprecommit,
NoCommit: nocommit,
})
if err != nil { if err != nil {
return err return err
} }
loop: loop:
for { for {
log.Infof("Waiting for new task")
select { select {
case task := <-tasks: case task := <-tasks:
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type)
@ -90,6 +100,8 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
return errRes(xerrors.Errorf("fetching sector: %w", err)) return errRes(xerrors.Errorf("fetching sector: %w", err))
} }
log.Infof("Data fetched, starting computation")
var res sectorbuilder.SealRes var res sectorbuilder.SealRes
switch task.Type { switch task.Type {

View File

@ -12,7 +12,7 @@ import (
"path/filepath" "path/filepath"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/lib/systar" "github.com/filecoin-project/lotus/lib/tarutil"
) )
func (w *worker) fetch(typ string, sectorID uint64) error { func (w *worker) fetch(typ string, sectorID uint64) error {
@ -58,7 +58,7 @@ func (w *worker) fetch(typ string, sectorID uint64) error {
switch mediatype { switch mediatype {
case "application/x-tar": case "application/x-tar":
return systar.ExtractTar(barreader, filepath.Dir(outname)) return tarutil.ExtractTar(barreader, outname)
case "application/octet-stream": case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(barreader), outname) return files.WriteTo(files.NewReaderFile(barreader), outname)
default: default:
@ -80,7 +80,7 @@ func (w *worker) push(typ string, sectorID uint64) error {
var r io.Reader var r io.Reader
if stat.IsDir() { if stat.IsDir() {
r, err = systar.TarDirectory(filename) r, err = tarutil.TarDirectory(filename)
} else { } else {
r, err = os.OpenFile(filename, os.O_RDONLY, 0644) r, err = os.OpenFile(filename, os.O_RDONLY, 0644)
} }

View File

@ -63,6 +63,12 @@ var infoCmd = &cli.Command{
fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved) fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved)
fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal) fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal)
fmt.Printf("Queues:\n")
fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait)
fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait)
fmt.Printf("\tCommit: %d\n", wstat.CommitWait)
fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait)
eps, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil) eps, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil)
if err != nil { if err != nil {
return err return err

View File

@ -249,10 +249,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
CommP: sector.CommD[:], CommP: sector.CommD[:],
}, },
}, },
CommC: nil,
CommD: sector.CommD[:], CommD: sector.CommD[:],
CommR: sector.CommR[:], CommR: sector.CommR[:],
CommRLast: nil,
Proof: nil, Proof: nil,
Ticket: storage.SealTicket{}, Ticket: storage.SealTicket{},
PreCommitMessage: nil, PreCommitMessage: nil,
@ -358,7 +356,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err return err
} }
sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2)(mds, api) sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2, false, false)(mds, api)
if err != nil { if err != nil {
return xerrors.Errorf("getting genesis miner sector builder config: %w", err) return xerrors.Errorf("getting genesis miner sector builder config: %w", err)
} }

View File

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"sort"
"strconv" "strconv"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -125,6 +126,10 @@ var sectorsListCmd = &cli.Command{
commitedIDs[info.SectorID] = struct{}{} commitedIDs[info.SectorID] = struct{}{}
} }
sort.Slice(list, func(i, j int) bool {
return list[i] < list[j]
})
for _, s := range list { for _, s := range list {
st, err := nodeApi.SectorsStatus(ctx, s) st, err := nodeApi.SectorsStatus(ctx, s)
if err != nil { if err != nil {

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 6d9e80001bfa2d80eec4e157da46d783038d9b42 Subproject commit 6ac840062c094b35c87a87638cd7a262f43edd49

View File

@ -33,7 +33,7 @@ type workerCall struct {
ret chan SealRes ret chan SealRes
} }
func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) { func (sb *SectorBuilder) AddWorker(ctx context.Context, cfg WorkerCfg) (<-chan WorkerTask, error) {
sb.remoteLk.Lock() sb.remoteLk.Lock()
defer sb.remoteLk.Unlock() defer sb.remoteLk.Unlock()
@ -46,22 +46,32 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro
sb.remoteCtr++ sb.remoteCtr++
sb.remotes[sb.remoteCtr] = r sb.remotes[sb.remoteCtr] = r
go sb.remoteWorker(ctx, r) go sb.remoteWorker(ctx, r, cfg)
return taskCh, nil return taskCh, nil
} }
func (sb *SectorBuilder) returnTask(task workerCall) { func (sb *SectorBuilder) returnTask(task workerCall) {
var ret chan workerCall
switch task.task.Type {
case WorkerPreCommit:
ret = sb.precommitTasks
case WorkerCommit:
ret = sb.commitTasks
default:
log.Error("unknown task type", task.task.Type)
}
go func() { go func() {
select { select {
case sb.sealTasks <- task: case ret <- task:
case <-sb.stopping: case <-sb.stopping:
return return
} }
}() }()
} }
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote, cfg WorkerCfg) {
defer log.Warn("Remote worker disconnected") defer log.Warn("Remote worker disconnected")
defer func() { defer func() {
@ -76,9 +86,34 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
} }
}() }()
precommits := sb.precommitTasks
if cfg.NoPreCommit {
precommits = nil
}
commits := sb.commitTasks
if cfg.NoCommit {
commits = nil
}
for { for {
select { select {
case task := <-sb.sealTasks: case task := <-commits:
sb.doTask(ctx, r, task)
case task := <-precommits:
sb.doTask(ctx, r, task)
case <-ctx.Done():
return
case <-sb.stopping:
return
}
r.lk.Lock()
r.busy = 0
r.lk.Unlock()
}
}
func (sb *SectorBuilder) doTask(ctx context.Context, r *remote, task workerCall) {
resCh := make(chan SealRes) resCh := make(chan SealRes)
sb.remoteLk.Lock() sb.remoteLk.Lock()
@ -116,17 +151,6 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
case <-sb.stopping: case <-sb.stopping:
return return
} }
case <-ctx.Done():
return
case <-sb.stopping:
return
}
r.lk.Lock()
r.busy = 0
r.lk.Unlock()
}
} }
func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error { func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error {

View File

@ -45,6 +45,13 @@ type EPostCandidate = sectorbuilder.Candidate
const CommLen = sectorbuilder.CommitmentBytesLen const CommLen = sectorbuilder.CommitmentBytesLen
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
// TODO: 'cost' info, probably in terms of sealing + transfer speed
}
type SectorBuilder struct { type SectorBuilder struct {
ds dtypes.MetadataDS ds dtypes.MetadataDS
idLk sync.Mutex idLk sync.Mutex
@ -61,10 +68,12 @@ type SectorBuilder struct {
unsealLk sync.Mutex unsealLk sync.Mutex
sealLocal bool noCommit bool
noPreCommit bool
rateLimit chan struct{} rateLimit chan struct{}
sealTasks chan workerCall precommitTasks chan workerCall
commitTasks chan workerCall
taskCtr uint64 taskCtr uint64
remoteLk sync.Mutex remoteLk sync.Mutex
@ -72,31 +81,30 @@ type SectorBuilder struct {
remotes map[int]*remote remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes remoteResults map[uint64]chan<- SealRes
addPieceWait int32
preCommitWait int32
commitWait int32
unsealWait int32
stopping chan struct{} stopping chan struct{}
} }
type JsonRSPCO struct { type JsonRSPCO struct {
CommC []byte
CommD []byte CommD []byte
CommR []byte CommR []byte
CommRLast []byte
} }
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO { func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{ return JsonRSPCO{
CommC: rspco.CommC[:],
CommD: rspco.CommD[:], CommD: rspco.CommD[:],
CommR: rspco.CommR[:], CommR: rspco.CommR[:],
CommRLast: rspco.CommRLast[:],
} }
} }
func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
var out RawSealPreCommitOutput var out RawSealPreCommitOutput
copy(out.CommC[:], rspco.CommC)
copy(out.CommD[:], rspco.CommD) copy(out.CommD[:], rspco.CommD)
copy(out.CommR[:], rspco.CommR) copy(out.CommR[:], rspco.CommR)
copy(out.CommRLast[:], rspco.CommRLast)
return out return out
} }
@ -121,6 +129,8 @@ type Config struct {
WorkerThreads uint8 WorkerThreads uint8
FallbackLastID uint64 FallbackLastID uint64
NoCommit bool
NoPreCommit bool
CacheDir string CacheDir string
SealedDir string SealedDir string
@ -179,11 +189,13 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
Miner: cfg.Miner, Miner: cfg.Miner,
sealLocal: sealLocal, noPreCommit: cfg.NoPreCommit || !sealLocal,
noCommit: cfg.NoCommit || !sealLocal,
rateLimit: make(chan struct{}, rlimit), rateLimit: make(chan struct{}, rlimit),
taskCtr: 1, taskCtr: 1,
sealTasks: make(chan workerCall), precommitTasks: make(chan workerCall),
commitTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{}, remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{}, remotes: map[int]*remote{},
@ -214,7 +226,6 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) {
cacheDir: cfg.CacheDir, cacheDir: cfg.CacheDir,
unsealedDir: cfg.UnsealedDir, unsealedDir: cfg.UnsealedDir,
sealLocal: true,
taskCtr: 1, taskCtr: 1,
remotes: map[int]*remote{}, remotes: map[int]*remote{},
rateLimit: make(chan struct{}, cfg.WorkerThreads), rateLimit: make(chan struct{}, cfg.WorkerThreads),
@ -245,6 +256,11 @@ type WorkerStats struct {
// todo: post in progress // todo: post in progress
RemotesTotal int RemotesTotal int
RemotesFree int RemotesFree int
AddPieceWait int
PreCommitWait int
CommitWait int
UnsealWait int
} }
func (sb *SectorBuilder) WorkerStats() WorkerStats { func (sb *SectorBuilder) WorkerStats() WorkerStats {
@ -264,6 +280,11 @@ func (sb *SectorBuilder) WorkerStats() WorkerStats {
LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers, LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers,
RemotesTotal: len(sb.remotes), RemotesTotal: len(sb.remotes),
RemotesFree: remoteFree, RemotesFree: remoteFree,
AddPieceWait: int(atomic.LoadInt32(&sb.addPieceWait)),
PreCommitWait: int(atomic.LoadInt32(&sb.preCommitWait)),
CommitWait: int(atomic.LoadInt32(&sb.commitWait)),
UnsealWait: int(atomic.LoadInt32(&sb.unsealWait)),
} }
} }
@ -288,7 +309,9 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
} }
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) { func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit() ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
defer ret() defer ret()
f, werr, err := toReadableFile(file, int64(pieceSize)) f, werr, err := toReadableFile(file, int64(pieceSize))
@ -321,8 +344,11 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
} }
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
defer ret() defer ret()
atomic.AddInt32(&sb.unsealWait, -1)
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock() defer sb.unsealLk.Unlock()
@ -389,6 +415,8 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
} }
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
atomic.AddInt32(&sb.preCommitWait, -1)
select { select {
case ret := <-call.ret: case ret := <-call.ret:
var err error var err error
@ -413,8 +441,10 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
ret: make(chan SealRes), ret: make(chan SealRes),
} }
atomic.AddInt32(&sb.preCommitWait, 1)
select { // prefer remote select { // prefer remote
case sb.sealTasks <- call: case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call) return sb.sealPreCommitRemote(call)
default: default:
} }
@ -422,16 +452,18 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
sb.checkRateLimit() sb.checkRateLimit()
rl := sb.rateLimit rl := sb.rateLimit
if !sb.sealLocal { if sb.noPreCommit {
rl = make(chan struct{}) rl = make(chan struct{})
} }
select { // use whichever is available select { // use whichever is available
case sb.sealTasks <- call: case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call) return sb.sealPreCommitRemote(call)
case rl <- struct{}{}: case rl <- struct{}{}:
} }
atomic.AddInt32(&sb.preCommitWait, -1)
// local // local
defer func() { defer func() {
@ -474,10 +506,14 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err) return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
} }
log.Infof("PRECOMMIT FFI RSPCO %v", rspco)
return RawSealPreCommitOutput(rspco), nil return RawSealPreCommitOutput(rspco), nil
} }
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
select { select {
case ret := <-call.ret: case ret := <-call.ret:
if ret.Err != "" { if ret.Err != "" {
@ -490,6 +526,8 @@ func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err er
} }
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
defer func() { defer func() {
<-sb.rateLimit <-sb.rateLimit
}() }()
@ -535,19 +573,21 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
ret: make(chan SealRes), ret: make(chan SealRes),
} }
atomic.AddInt32(&sb.commitWait, 1)
select { // prefer remote select { // prefer remote
case sb.sealTasks <- call: case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call) proof, err = sb.sealCommitRemote(call)
default: default:
sb.checkRateLimit() sb.checkRateLimit()
rl := sb.rateLimit rl := sb.rateLimit
if !sb.sealLocal { if sb.noCommit {
rl = make(chan struct{}) rl = make(chan struct{})
} }
select { // use whichever is available select { // use whichever is available
case sb.sealTasks <- call: case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call) proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}: case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco) proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -21,6 +22,10 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
func init() {
logging.SetLogLevel("*", "INFO")
}
const sectorSize = 1024 const sectorSize = 1024
type seal struct { type seal struct {

View File

@ -1,47 +0,0 @@
package systar
import (
"golang.org/x/xerrors"
"io"
"os"
"os/exec"
"path/filepath"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("systar")
func ExtractTar(body io.Reader, dest string) error {
if err := os.MkdirAll(dest, 0755); err != nil {
return xerrors.Errorf("creating dest directory: %w", err)
}
cmd := exec.Command("tar", "-xS", "-C", dest)
cmd.Stdin = body
return cmd.Run()
}
func TarDirectory(file string) (io.ReadCloser, error) {
// use system builtin tar, golang one doesn't support sparse files
dir := filepath.Dir(file)
base := filepath.Base(file)
i, o := io.Pipe()
// don't bother with compression, it's mostly random data
cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base)
cmd.Stdout = o
if err := cmd.Start(); err != nil {
return nil, err
}
go func() {
if err := o.CloseWithError(cmd.Wait()); err != nil {
log.Error(err)
}
}()
return i, nil
}

88
lib/tarutil/systar.go Normal file
View File

@ -0,0 +1,88 @@
package tarutil
import (
"archive/tar"
"golang.org/x/xerrors"
"io"
"io/ioutil"
"os"
"path/filepath"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("tarutil")
func ExtractTar(body io.Reader, dir string) error {
if err := os.MkdirAll(dir, 0755); err != nil {
return xerrors.Errorf("mkdir: %w", err)
}
tr := tar.NewReader(body)
for {
header, err := tr.Next()
switch err {
default:
return err
case io.EOF:
return nil
case nil:
}
f, err := os.Create(filepath.Join(dir, header.Name))
if err != nil {
return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err)
}
if _, err := io.Copy(f, tr); err != nil {
return err
}
}
}
func TarDirectory(dir string) (io.ReadCloser, error) {
r, w := io.Pipe()
go func() {
_ = w.CloseWithError(writeTarDirectory(dir, w))
}()
return r, nil
}
func writeTarDirectory(dir string, w io.Writer) error {
tw := tar.NewWriter(w)
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, file := range files {
h, err := tar.FileInfoHeader(file, "")
if err != nil {
return xerrors.Errorf("getting header for file %s: %w", file.Name(), err)
}
if err := tw.WriteHeader(h); err != nil {
return xerrors.Errorf("wiritng header for file %s: %w", file.Name(), err)
}
f, err := os.OpenFile(filepath.Join(dir, file.Name()), os.O_RDONLY, 644)
if err != nil {
return xerrors.Errorf("opening %s for reading: %w", file.Name(), err)
}
if _, err := io.Copy(tw, f); err != nil {
return xerrors.Errorf("copy data for file %s: %w", file.Name(), err)
}
if err := f.Close(); err != nil {
return err
}
}
return nil
}

View File

@ -334,7 +334,10 @@ func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option {
return Options( return Options(
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, cfg.SectorBuilder.WorkerCount)), Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path,
cfg.SectorBuilder.WorkerCount,
cfg.SectorBuilder.DisableLocalPreCommit,
cfg.SectorBuilder.DisableLocalCommit)),
) )
} }

View File

@ -50,6 +50,9 @@ type Metrics struct {
type SectorBuilder struct { type SectorBuilder struct {
Path string Path string
WorkerCount uint WorkerCount uint
DisableLocalPreCommit bool
DisableLocalCommit bool
} }
func defCommon() Common { func defCommon() Common {

View File

@ -7,17 +7,17 @@ import (
"mime" "mime"
"net/http" "net/http"
"os" "os"
"path/filepath"
"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/lib/systar" "github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"
) )
type StorageMinerAPI struct { type StorageMinerAPI struct {
@ -68,7 +68,7 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
var rd io.Reader var rd io.Reader
if stat.IsDir() { if stat.IsDir() {
rd, err = systar.TarDirectory(path) rd, err = tarutil.TarDirectory(path)
w.Header().Set("Content-Type", "application/x-tar") w.Header().Set("Content-Type", "application/x-tar")
} else { } else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644) rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
@ -112,7 +112,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
switch mediatype { switch mediatype {
case "application/x-tar": case "application/x-tar":
if err := systar.ExtractTar(r.Body, filepath.Dir(path)); err != nil { if err := tarutil.ExtractTar(r.Body, path); err != nil {
log.Error(err) log.Error(err)
w.WriteHeader(500) w.WriteHeader(500)
return return
@ -206,8 +206,8 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state a
return sm.Miner.UpdateSectorState(ctx, id, state) return sm.Miner.UpdateSectorState(ctx, id, state)
} }
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) { func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx) return sm.SectorBuilder.AddWorker(ctx, cfg)
} }
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {

View File

@ -52,7 +52,7 @@ func GetParams(sbc *sectorbuilder.Config) error {
return nil return nil
} }
func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) {
return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) { return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) {
minerAddr, err := minerAddrFromDS(ds) minerAddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
@ -81,7 +81,10 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD
sb := &sectorbuilder.Config{ sb := &sectorbuilder.Config{
Miner: minerAddr, Miner: minerAddr,
SectorSize: ssize, SectorSize: ssize,
WorkerThreads: uint8(threads), WorkerThreads: uint8(threads),
NoPreCommit: noprecommit,
NoCommit: nocommit,
CacheDir: cache, CacheDir: cache,
UnsealedDir: unsealed, UnsealedDir: unsealed,

View File

@ -263,11 +263,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
} }
} }
// t.t.CommC ([]uint8) (slice) // t.t.Pad0 ([]uint8) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommC)))); err != nil { if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Pad0)))); err != nil {
return err return err
} }
if _, err := w.Write(t.CommC); err != nil { if _, err := w.Write(t.Pad0); err != nil {
return err return err
} }
@ -287,11 +287,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err return err
} }
// t.t.CommRLast ([]uint8) (slice) // t.t.Pad1 ([]uint8) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommRLast)))); err != nil { if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Pad1)))); err != nil {
return err return err
} }
if _, err := w.Write(t.CommRLast); err != nil { if _, err := w.Write(t.Pad1); err != nil {
return err return err
} }
@ -408,21 +408,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
t.Pieces[i] = v t.Pieces[i] = v
} }
// t.t.CommC ([]uint8) (slice) // t.t.Pad0 ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeader(br) maj, extra, err = cbg.CborReadHeader(br)
if err != nil { if err != nil {
return err return err
} }
if extra > 8192 { if extra > 8192 {
return fmt.Errorf("t.CommC: array too large (%d)", extra) return fmt.Errorf("t.Pad0: array too large (%d)", extra)
} }
if maj != cbg.MajByteString { if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array") return fmt.Errorf("expected byte array")
} }
t.CommC = make([]byte, extra) t.Pad0 = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommC); err != nil { if _, err := io.ReadFull(br, t.Pad0); err != nil {
return err return err
} }
// t.t.CommD ([]uint8) (slice) // t.t.CommD ([]uint8) (slice)
@ -459,21 +459,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
if _, err := io.ReadFull(br, t.CommR); err != nil { if _, err := io.ReadFull(br, t.CommR); err != nil {
return err return err
} }
// t.t.CommRLast ([]uint8) (slice) // t.t.Pad1 ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeader(br) maj, extra, err = cbg.CborReadHeader(br)
if err != nil { if err != nil {
return err return err
} }
if extra > 8192 { if extra > 8192 {
return fmt.Errorf("t.CommRLast: array too large (%d)", extra) return fmt.Errorf("t.Pad1: array too large (%d)", extra)
} }
if maj != cbg.MajByteString { if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array") return fmt.Errorf("expected byte array")
} }
t.CommRLast = make([]byte, extra) t.Pad1 = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommRLast); err != nil { if _, err := io.ReadFull(br, t.Pad1); err != nil {
return err return err
} }
// t.t.Proof ([]uint8) (slice) // t.t.Proof ([]uint8) (slice)

View File

@ -75,10 +75,8 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp
} }
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
info.CommC = rspco.CommC[:]
info.CommD = rspco.CommD[:] info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:] info.CommR = rspco.CommR[:]
info.CommRLast = rspco.CommRLast[:]
info.Ticket = SealTicket{ info.Ticket = SealTicket{
BlockHeight: ticket.BlockHeight, BlockHeight: ticket.BlockHeight,
TicketBytes: ticket.TicketBytes[:], TicketBytes: ticket.TicketBytes[:],

View File

@ -55,10 +55,10 @@ type SectorInfo struct {
Pieces []Piece Pieces []Piece
// PreCommit // PreCommit
CommC []byte Pad0 []byte // TODO: legacy placeholder, remove
CommD []byte CommD []byte
CommR []byte CommR []byte
CommRLast []byte Pad1 []byte // TODO: legacy placeholder, remove
Proof []byte Proof []byte
Ticket SealTicket Ticket SealTicket
@ -105,10 +105,8 @@ func (t *SectorInfo) existingPieces() []uint64 {
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD) copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR) copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out return out
} }

View File

@ -167,9 +167,11 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
var sector SectorInfo var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState s.State = update.newState
s.LastErr = ""
if update.err != nil { if update.err != nil {
s.LastErr = fmt.Sprintf("%+v", update.err) if s.LastErr != "" {
s.LastErr += "---------\n\n"
}
s.LastErr += fmt.Sprintf("entering state %s: %+v", api.SectorStates[update.newState], update.err)
} }
if update.mut != nil { if update.mut != nil {