Rename agvmgr+sealmgr to sectorstorage
This commit is contained in:
commit
e40c5d441d
410
manager.go
Normal file
410
manager.go
Normal file
@ -0,0 +1,410 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
|
||||
)
|
||||
|
||||
var log = logging.Logger("advmgr")
|
||||
|
||||
type URLs []string
|
||||
|
||||
type Worker interface {
|
||||
sectorbuilder.Sealer
|
||||
|
||||
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
||||
|
||||
// Returns paths accessible to the worker
|
||||
Paths(context.Context) ([]stores.StoragePath, error)
|
||||
|
||||
Info(context.Context) (api.WorkerInfo, error)
|
||||
}
|
||||
|
||||
type SectorManager interface {
|
||||
SectorSize() abi.SectorSize
|
||||
|
||||
ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
|
||||
|
||||
sectorbuilder.Sealer
|
||||
storage.Prover
|
||||
}
|
||||
|
||||
type workerID uint64
|
||||
|
||||
type Manager struct {
|
||||
scfg *sectorbuilder.Config
|
||||
|
||||
ls stores.LocalStorage
|
||||
storage *stores.Remote
|
||||
localStore *stores.Local
|
||||
remoteHnd *stores.FetchHandler
|
||||
index stores.SectorIndex
|
||||
|
||||
storage.Prover
|
||||
|
||||
workersLk sync.Mutex
|
||||
nextWorker workerID
|
||||
workers map[workerID]*workerHandle
|
||||
|
||||
newWorkers chan *workerHandle
|
||||
schedule chan *workerRequest
|
||||
workerFree chan workerID
|
||||
closing chan struct{}
|
||||
|
||||
schedQueue *list.List // List[*workerRequest]
|
||||
}
|
||||
|
||||
func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
lstor, err := stores.NewLocal(ctx, ls, si, urls)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prover, err := sectorbuilder.New(&readonlyProvider{stor: lstor}, cfg)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||
}
|
||||
|
||||
token, err := ca.AuthNew(context.TODO(), []api.Permission{"admin"})
|
||||
headers := http.Header{}
|
||||
headers.Add("Authorization", "Bearer "+string(token))
|
||||
stor := stores.NewRemote(lstor, si, headers)
|
||||
|
||||
m := &Manager{
|
||||
scfg: cfg,
|
||||
|
||||
ls: ls,
|
||||
storage: stor,
|
||||
localStore: lstor,
|
||||
remoteHnd: &stores.FetchHandler{Local: lstor},
|
||||
index: si,
|
||||
|
||||
nextWorker: 0,
|
||||
workers: map[workerID]*workerHandle{},
|
||||
|
||||
newWorkers: make(chan *workerHandle),
|
||||
schedule: make(chan *workerRequest),
|
||||
workerFree: make(chan workerID),
|
||||
closing: make(chan struct{}),
|
||||
|
||||
schedQueue: list.New(),
|
||||
|
||||
Prover: prover,
|
||||
}
|
||||
|
||||
go m.runSched()
|
||||
|
||||
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
|
||||
SealProof: cfg.SealProofType,
|
||||
TaskTypes: []sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize},
|
||||
}, stor, lstor, si))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("adding local worker: %w", err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
|
||||
path, err := homedir.Expand(path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("expanding local path: %w", err)
|
||||
}
|
||||
|
||||
if err := m.localStore.OpenPath(ctx, path); err != nil {
|
||||
return xerrors.Errorf("opening local path: %w", err)
|
||||
}
|
||||
|
||||
if err := m.ls.SetStorage(func(sc *config.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("get storage config: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
|
||||
info, err := w.Info(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting worker info: %w", err)
|
||||
}
|
||||
|
||||
m.newWorkers <- &workerHandle{
|
||||
w: w,
|
||||
info: info,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
m.remoteHnd.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (m *Manager) SectorSize() abi.SectorSize {
|
||||
sz, _ := m.scfg.SealProofType.SectorSize()
|
||||
return sz
|
||||
}
|
||||
|
||||
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) {
|
||||
m.workersLk.Lock()
|
||||
defer m.workersLk.Unlock()
|
||||
|
||||
var workers []workerID
|
||||
paths := map[workerID]stores.StorageInfo{}
|
||||
|
||||
for i, worker := range m.workers {
|
||||
tt, err := worker.w.TaskTypes(context.TODO())
|
||||
if err != nil {
|
||||
log.Errorf("error getting supported worker task types: %+v", err)
|
||||
continue
|
||||
}
|
||||
if _, ok := tt[task]; !ok {
|
||||
log.Debugf("dropping worker %d; task %s not supported (supports %v)", i, task, tt)
|
||||
continue
|
||||
}
|
||||
|
||||
phs, err := worker.w.Paths(context.TODO())
|
||||
if err != nil {
|
||||
log.Errorf("error getting worker paths: %+v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// check if the worker has access to the path we selected
|
||||
var st *stores.StorageInfo
|
||||
for _, p := range phs {
|
||||
for _, meta := range inPaths {
|
||||
if p.ID == meta.ID {
|
||||
if st != nil && st.Weight > p.Weight {
|
||||
continue
|
||||
}
|
||||
|
||||
p := meta // copy
|
||||
st = &p
|
||||
}
|
||||
}
|
||||
}
|
||||
if st == nil {
|
||||
log.Debugf("skipping worker %d; doesn't have any of %v", i, inPaths)
|
||||
log.Debugf("skipping worker %d; only has %v", i, phs)
|
||||
continue
|
||||
}
|
||||
|
||||
paths[i] = *st
|
||||
workers = append(workers, i)
|
||||
}
|
||||
|
||||
return workers, paths
|
||||
}
|
||||
|
||||
func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []workerID) (Worker, func(), error) {
|
||||
ret := make(chan workerResponse)
|
||||
|
||||
select {
|
||||
case m.schedule <- &workerRequest{
|
||||
taskType: taskType,
|
||||
accept: accept,
|
||||
|
||||
cancel: ctx.Done(),
|
||||
ret: ret,
|
||||
}:
|
||||
case <-m.closing:
|
||||
return nil, nil, xerrors.New("closing")
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
}
|
||||
|
||||
select {
|
||||
case resp := <-ret:
|
||||
return resp.worker, resp.done, resp.err
|
||||
case <-m.closing:
|
||||
return nil, nil, xerrors.New("closing")
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
|
||||
log.Warnf("stub NewSector")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
||||
// TODO: consider multiple paths vs workers when initially allocating
|
||||
|
||||
var best []stores.StorageInfo
|
||||
var err error
|
||||
if len(existingPieces) == 0 { // new
|
||||
best, err = m.index.StorageBestAlloc(ctx, sectorbuilder.FTUnsealed, true)
|
||||
} else { // append to existing
|
||||
best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed, false)
|
||||
}
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("find workers for %v", best)
|
||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
|
||||
|
||||
if len(candidateWorkers) == 0 {
|
||||
return abi.PieceInfo{}, xerrors.New("no worker found")
|
||||
}
|
||||
|
||||
worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
// TODO: select(candidateWorkers, ...)
|
||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
||||
return worker.AddPiece(ctx, sector, existingPieces, sz, r)
|
||||
}
|
||||
|
||||
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||
// TODO: also consider where the unsealed data sits
|
||||
|
||||
best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
|
||||
}
|
||||
|
||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
|
||||
if len(candidateWorkers) == 0 {
|
||||
return nil, xerrors.New("no suitable workers found")
|
||||
}
|
||||
|
||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("scheduling worker: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
// TODO: select(candidateWorkers, ...)
|
||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
||||
return worker.SealPreCommit1(ctx, sector, ticket, pieces)
|
||||
}
|
||||
|
||||
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
|
||||
// TODO: allow workers to fetch the sectors
|
||||
|
||||
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
|
||||
if err != nil {
|
||||
return storage.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
|
||||
}
|
||||
|
||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
|
||||
if len(candidateWorkers) == 0 {
|
||||
return storage.SectorCids{}, xerrors.New("no suitable workers found")
|
||||
}
|
||||
|
||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
|
||||
if err != nil {
|
||||
return storage.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
// TODO: select(candidateWorkers, ...)
|
||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
||||
return worker.SealPreCommit2(ctx, sector, phase1Out)
|
||||
}
|
||||
|
||||
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
|
||||
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
|
||||
}
|
||||
|
||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
|
||||
if len(candidateWorkers) == 0 {
|
||||
return nil, xerrors.New("no suitable workers found") // TODO: wait?
|
||||
}
|
||||
|
||||
// TODO: Try very hard to execute on worker with access to the sectors
|
||||
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit1, candidateWorkers)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("scheduling worker: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
// TODO: select(candidateWorkers, ...)
|
||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
||||
return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
||||
}
|
||||
|
||||
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
|
||||
var candidateWorkers []workerID
|
||||
|
||||
m.workersLk.Lock()
|
||||
for id, worker := range m.workers {
|
||||
tt, err := worker.w.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("error getting supported worker task types: %+v", err)
|
||||
continue
|
||||
}
|
||||
if _, ok := tt[sealtasks.TTCommit2]; !ok {
|
||||
continue
|
||||
}
|
||||
candidateWorkers = append(candidateWorkers, id)
|
||||
}
|
||||
m.workersLk.Unlock()
|
||||
|
||||
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("scheduling worker: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
return worker.SealCommit2(ctx, sector, phase1Out)
|
||||
}
|
||||
|
||||
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed, true)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding sealed sector: %w", err)
|
||||
}
|
||||
|
||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
|
||||
|
||||
// TODO: Remove sector from sealing stores
|
||||
// TODO: Move the sector to long-term storage
|
||||
return m.workers[candidateWorkers[0]].w.FinalizeSector(ctx, sector)
|
||||
}
|
||||
|
||||
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
||||
l, err := m.localStore.Local(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := map[stores.ID]string{}
|
||||
for _, st := range l {
|
||||
out[st.ID] = st.LocalPath
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
var _ SectorManager = &Manager{}
|
371
mock/mock.go
Normal file
371
mock/mock.go
Normal file
@ -0,0 +1,371 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
commcid "github.com/filecoin-project/go-fil-commcid"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
)
|
||||
|
||||
var log = logging.Logger("sbmock")
|
||||
|
||||
type SectorMgr struct {
|
||||
sectors map[abi.SectorID]*sectorState
|
||||
sectorSize abi.SectorSize
|
||||
nextSectorID abi.SectorNumber
|
||||
rateLimit chan struct{}
|
||||
proofType abi.RegisteredProof
|
||||
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
type mockVerif struct{}
|
||||
|
||||
func NewMockSectorMgr(threads int, ssize abi.SectorSize) *SectorMgr {
|
||||
rt, _, err := api.ProofTypeFromSectorSize(ssize)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &SectorMgr{
|
||||
sectors: make(map[abi.SectorID]*sectorState),
|
||||
sectorSize: ssize,
|
||||
nextSectorID: 5,
|
||||
rateLimit: make(chan struct{}, threads),
|
||||
proofType: rt,
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
statePacking = iota
|
||||
statePreCommit
|
||||
stateCommit
|
||||
)
|
||||
|
||||
type sectorState struct {
|
||||
pieces []cid.Cid
|
||||
failed bool
|
||||
|
||||
state int
|
||||
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) RateLimit() func() {
|
||||
sb.rateLimit <- struct{}{}
|
||||
|
||||
// TODO: probably want to copy over rate limit code
|
||||
return func() {
|
||||
<-sb.rateLimit
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
||||
log.Warn("Add piece: ", sectorId, size, sb.proofType)
|
||||
sb.lk.Lock()
|
||||
ss, ok := sb.sectors[sectorId]
|
||||
if !ok {
|
||||
ss = §orState{
|
||||
state: statePacking,
|
||||
}
|
||||
sb.sectors[sectorId] = ss
|
||||
}
|
||||
sb.lk.Unlock()
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
|
||||
c, err := sectorbuilder.GeneratePieceCIDFromFile(sb.proofType, r, size)
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err)
|
||||
}
|
||||
|
||||
log.Warn("Generated Piece CID: ", c)
|
||||
|
||||
ss.pieces = append(ss.pieces, c)
|
||||
return abi.PieceInfo{
|
||||
Size: size.Padded(),
|
||||
PieceCID: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) SectorSize() abi.SectorSize {
|
||||
return sb.sectorSize
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
|
||||
sb.lk.Lock()
|
||||
defer sb.lk.Unlock()
|
||||
id := sb.nextSectorID
|
||||
sb.nextSectorID++
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||
sb.lk.Lock()
|
||||
ss, ok := sb.sectors[sid]
|
||||
sb.lk.Unlock()
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf("no sector with id %d in sectorbuilder", sid)
|
||||
}
|
||||
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
|
||||
ussize := abi.PaddedPieceSize(sb.sectorSize).Unpadded()
|
||||
|
||||
// TODO: verify pieces in sinfo.pieces match passed in pieces
|
||||
|
||||
var sum abi.UnpaddedPieceSize
|
||||
for _, p := range pieces {
|
||||
sum += p.Size.Unpadded()
|
||||
}
|
||||
|
||||
if sum != ussize {
|
||||
return nil, xerrors.Errorf("aggregated piece sizes don't match up: %d != %d", sum, ussize)
|
||||
}
|
||||
|
||||
if ss.state != statePacking {
|
||||
return nil, xerrors.Errorf("cannot call pre-seal on sector not in 'packing' state")
|
||||
}
|
||||
|
||||
opFinishWait(ctx)
|
||||
|
||||
ss.state = statePreCommit
|
||||
|
||||
pis := make([]abi.PieceInfo, len(ss.pieces))
|
||||
for i, piece := range ss.pieces {
|
||||
pis[i] = abi.PieceInfo{
|
||||
Size: pieces[i].Size,
|
||||
PieceCID: piece,
|
||||
}
|
||||
}
|
||||
|
||||
commd, err := MockVerifier.GenerateDataCommitment(abi.PaddedPieceSize(sb.sectorSize), pis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cc, _, err := commcid.CIDToCommitment(commd)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
cc[0] ^= 'd'
|
||||
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
|
||||
db := []byte(string(phase1Out))
|
||||
db[0] ^= 'd'
|
||||
|
||||
d := commcid.DataCommitmentV1ToCID(db)
|
||||
|
||||
commr := make([]byte, 32)
|
||||
for i := range db {
|
||||
commr[32-(i+1)] = db[i]
|
||||
}
|
||||
|
||||
commR := commcid.DataCommitmentV1ToCID(commr)
|
||||
|
||||
return storage.SectorCids{
|
||||
Unsealed: d,
|
||||
Sealed: commR,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
|
||||
sb.lk.Lock()
|
||||
ss, ok := sb.sectors[sid]
|
||||
sb.lk.Unlock()
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf("no such sector %d", sid)
|
||||
}
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
|
||||
if ss.failed {
|
||||
return nil, xerrors.Errorf("[mock] cannot commit failed sector %d", sid)
|
||||
}
|
||||
|
||||
if ss.state != statePreCommit {
|
||||
return nil, xerrors.Errorf("cannot commit sector that has not been precommitted")
|
||||
}
|
||||
|
||||
opFinishWait(ctx)
|
||||
|
||||
var out [32]byte
|
||||
for i := range out {
|
||||
out[i] = cids.Unsealed.Bytes()[i] + cids.Sealed.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid.Number&0xff)
|
||||
}
|
||||
|
||||
return out[:], nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
|
||||
var out [32]byte
|
||||
for i := range out {
|
||||
out[i] = phase1Out[i] ^ byte(sid.Number&0xff)
|
||||
}
|
||||
|
||||
return out[:], nil
|
||||
}
|
||||
|
||||
// Test Instrumentation Methods
|
||||
|
||||
func (sb *SectorMgr) FailSector(sid abi.SectorID) error {
|
||||
sb.lk.Lock()
|
||||
defer sb.lk.Unlock()
|
||||
ss, ok := sb.sectors[sid]
|
||||
if !ok {
|
||||
return fmt.Errorf("no such sector in sectorbuilder")
|
||||
}
|
||||
|
||||
ss.failed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func opFinishWait(ctx context.Context) {
|
||||
val, ok := ctx.Value("opfinish").(chan struct{})
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
<-val
|
||||
}
|
||||
|
||||
func AddOpFinish(ctx context.Context) (context.Context, func()) {
|
||||
done := make(chan struct{})
|
||||
|
||||
return context.WithValue(ctx, "opfinish", done), func() {
|
||||
close(done)
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) GenerateFallbackPoSt(context.Context, abi.ActorID, []abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) (storage.FallbackPostOut, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) ComputeElectionPoSt(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
|
||||
if len(faults) > 0 {
|
||||
panic("todo")
|
||||
}
|
||||
|
||||
n := sectorbuilder.ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults)))
|
||||
if n > uint64(len(sectorInfo)) {
|
||||
n = uint64(len(sectorInfo))
|
||||
}
|
||||
|
||||
out := make([]storage.PoStCandidateWithTicket, n)
|
||||
|
||||
seed := big.NewInt(0).SetBytes(challengeSeed[:])
|
||||
start := seed.Mod(seed, big.NewInt(int64(len(sectorInfo)))).Int64()
|
||||
|
||||
for i := range out {
|
||||
out[i] = storage.PoStCandidateWithTicket{
|
||||
Candidate: abi.PoStCandidate{
|
||||
SectorID: abi.SectorID{
|
||||
Number: abi.SectorNumber((int(start) + i) % len(sectorInfo)),
|
||||
Miner: mid,
|
||||
},
|
||||
PartialTicket: abi.PartialTicket(challengeSeed),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) {
|
||||
if len(sb.sectors[sectorID].pieces) > 1 {
|
||||
panic("implme")
|
||||
}
|
||||
return ioutil.NopCloser(io.LimitReader(bytes.NewReader(sb.sectors[sectorID].pieces[0].Bytes()[offset:]), int64(size))), nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) {
|
||||
usize := abi.PaddedPieceSize(sb.sectorSize).Unpadded()
|
||||
sid, err := sb.AcquireSectorNumber()
|
||||
if err != nil {
|
||||
return abi.SectorID{}, nil, err
|
||||
}
|
||||
|
||||
buf := make([]byte, usize)
|
||||
rand.Read(buf)
|
||||
|
||||
id := abi.SectorID{
|
||||
Miner: mid,
|
||||
Number: sid,
|
||||
}
|
||||
|
||||
pi, err := sb.AddPiece(context.TODO(), id, nil, usize, bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
return abi.SectorID{}, nil, err
|
||||
}
|
||||
|
||||
return id, []abi.PieceInfo{pi}, nil
|
||||
}
|
||||
|
||||
func (sb *SectorMgr) FinalizeSector(context.Context, abi.SectorID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockVerif) VerifyElectionPost(ctx context.Context, pvi abi.PoStVerifyInfo) (bool, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m mockVerif) VerifyFallbackPost(ctx context.Context, pvi abi.PoStVerifyInfo) (bool, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m mockVerif) VerifySeal(svi abi.SealVerifyInfo) (bool, error) {
|
||||
if len(svi.OnChain.Proof) != 32 { // Real ones are longer, but this should be fine
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for i, b := range svi.OnChain.Proof {
|
||||
if b != svi.UnsealedCID.Bytes()[i]+svi.OnChain.SealedCID.Bytes()[31-i]-svi.InteractiveRandomness[i]*svi.Randomness[i] {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (m mockVerif) GenerateDataCommitment(ssize abi.PaddedPieceSize, pieces []abi.PieceInfo) (cid.Cid, error) {
|
||||
if len(pieces) != 1 {
|
||||
panic("todo")
|
||||
}
|
||||
if pieces[0].Size != ssize {
|
||||
fmt.Println("wrong sizes? ", pieces[0].Size, ssize)
|
||||
panic("todo")
|
||||
}
|
||||
return pieces[0].PieceCID, nil
|
||||
}
|
||||
|
||||
var MockVerifier = mockVerif{}
|
||||
|
||||
var _ sectorbuilder.Verifier = MockVerifier
|
||||
var _ sectorstorage.SectorManager = &SectorMgr{}
|
45
mock/mock_test.go
Normal file
45
mock/mock_test.go
Normal file
@ -0,0 +1,45 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
func TestOpFinish(t *testing.T) {
|
||||
sb := NewMockSectorMgr(1, 2048)
|
||||
|
||||
sid, pieces, err := sb.StageFakeData(123)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, done := AddOpFinish(context.TODO())
|
||||
|
||||
finished := make(chan struct{})
|
||||
go func() {
|
||||
_, err := sb.SealPreCommit1(ctx, sid, abi.SealRandomness{}, pieces)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
close(finished)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-finished:
|
||||
t.Fatal("should not finish until we tell it to")
|
||||
case <-time.After(time.Second / 2):
|
||||
}
|
||||
|
||||
done()
|
||||
|
||||
select {
|
||||
case <-finished:
|
||||
case <-time.After(time.Second / 2):
|
||||
t.Fatal("should finish after we tell it to")
|
||||
}
|
||||
}
|
63
mock/preseal.go
Normal file
63
mock/preseal.go
Normal file
@ -0,0 +1,63 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/go-address"
|
||||
commcid "github.com/filecoin-project/go-fil-commcid"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/genesis"
|
||||
)
|
||||
|
||||
func PreSeal(ssize abi.SectorSize, maddr address.Address, sectors int) (*genesis.Miner, *types.KeyInfo, error) {
|
||||
k, err := wallet.GenerateKey(crypto.SigTypeBLS)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
genm := &genesis.Miner{
|
||||
Owner: k.Address,
|
||||
Worker: k.Address,
|
||||
MarketBalance: big.NewInt(0),
|
||||
PowerBalance: big.NewInt(0),
|
||||
SectorSize: ssize,
|
||||
Sectors: make([]*genesis.PreSeal, sectors),
|
||||
}
|
||||
|
||||
_, st, err := api.ProofTypeFromSectorSize(ssize)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
for i := range genm.Sectors {
|
||||
preseal := &genesis.PreSeal{}
|
||||
|
||||
preseal.ProofType = st
|
||||
preseal.CommD = sectorbuilder.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
|
||||
d, _ := commcid.CIDToPieceCommitmentV1(preseal.CommD)
|
||||
r := commDR(d)
|
||||
preseal.CommR = commcid.ReplicaCommitmentV1ToCID(r[:])
|
||||
preseal.SectorID = abi.SectorNumber(i + 1)
|
||||
preseal.Deal = market.DealProposal{
|
||||
PieceCID: preseal.CommD,
|
||||
PieceSize: abi.PaddedPieceSize(ssize),
|
||||
Client: maddr,
|
||||
Provider: maddr,
|
||||
StartEpoch: 1,
|
||||
EndEpoch: 10000,
|
||||
StoragePricePerEpoch: big.Zero(),
|
||||
ProviderCollateral: big.Zero(),
|
||||
ClientCollateral: big.Zero(),
|
||||
}
|
||||
|
||||
genm.Sectors[i] = preseal
|
||||
}
|
||||
|
||||
return genm, &k.KeyInfo, nil
|
||||
}
|
23
mock/util.go
Normal file
23
mock/util.go
Normal file
@ -0,0 +1,23 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
func randB(n uint64) []byte {
|
||||
b, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(n)))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func commDR(in []byte) (out [32]byte) {
|
||||
for i, b := range in {
|
||||
out[i] = ^b
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
135
resources.go
Normal file
135
resources.go
Normal file
@ -0,0 +1,135 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
var FSOverheadSeal = map[sectorbuilder.SectorFileType]int{ // 10x overheads
|
||||
sectorbuilder.FTUnsealed: 10,
|
||||
sectorbuilder.FTSealed: 10,
|
||||
sectorbuilder.FTCache: 70, // TODO: confirm for 32G
|
||||
}
|
||||
|
||||
var FsOverheadFinalized = map[sectorbuilder.SectorFileType]int{
|
||||
sectorbuilder.FTUnsealed: 10,
|
||||
sectorbuilder.FTSealed: 10,
|
||||
sectorbuilder.FTCache: 2,
|
||||
}
|
||||
|
||||
type Resources struct {
|
||||
MinMemory uint64 // What Must be in RAM for decent perf
|
||||
MaxMemory uint64 // Mamory required (swap + ram)
|
||||
|
||||
MultiThread bool
|
||||
CanGPU bool
|
||||
|
||||
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
|
||||
}
|
||||
|
||||
const MaxCachingOverhead = 32 << 30
|
||||
|
||||
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
||||
sealtasks.TTAddPiece: {
|
||||
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ // This is probably a bit conservative
|
||||
MaxMemory: 32 << 30,
|
||||
MinMemory: 32 << 30,
|
||||
|
||||
MultiThread: false,
|
||||
|
||||
BaseMinMemory: 1 << 30,
|
||||
},
|
||||
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
|
||||
MaxMemory: 1 << 30,
|
||||
MinMemory: 1 << 30,
|
||||
|
||||
MultiThread: false,
|
||||
|
||||
BaseMinMemory: 1 << 30,
|
||||
},
|
||||
},
|
||||
sealtasks.TTPreCommit1: {
|
||||
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
|
||||
MaxMemory: 64 << 30,
|
||||
MinMemory: 32 << 30,
|
||||
|
||||
MultiThread: false,
|
||||
|
||||
BaseMinMemory: 30 << 30,
|
||||
},
|
||||
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
|
||||
MaxMemory: 3 << 29, // 1.5G
|
||||
MinMemory: 1 << 30,
|
||||
|
||||
MultiThread: false,
|
||||
|
||||
BaseMinMemory: 1 << 30,
|
||||
},
|
||||
},
|
||||
sealtasks.TTPreCommit2: {
|
||||
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
|
||||
MaxMemory: 96 << 30,
|
||||
MinMemory: 64 << 30,
|
||||
|
||||
MultiThread: true,
|
||||
|
||||
BaseMinMemory: 30 << 30,
|
||||
},
|
||||
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
|
||||
MaxMemory: 3 << 29, // 1.5G
|
||||
MinMemory: 1 << 30,
|
||||
|
||||
MultiThread: true,
|
||||
|
||||
BaseMinMemory: 1 << 30,
|
||||
},
|
||||
},
|
||||
sealtasks.TTCommit1: { // Very short (~100ms), so params are very light
|
||||
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
|
||||
MaxMemory: 1 << 30,
|
||||
MinMemory: 1 << 30,
|
||||
|
||||
MultiThread: false,
|
||||
|
||||
BaseMinMemory: 1 << 30,
|
||||
},
|
||||
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
|
||||
MaxMemory: 1 << 30,
|
||||
MinMemory: 1 << 30,
|
||||
|
||||
MultiThread: false,
|
||||
|
||||
BaseMinMemory: 1 << 30,
|
||||
},
|
||||
},
|
||||
sealtasks.TTCommit2: { // TODO: Measure more accurately
|
||||
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
|
||||
MaxMemory: 110 << 30,
|
||||
MinMemory: 60 << 30,
|
||||
|
||||
MultiThread: true,
|
||||
CanGPU: true,
|
||||
|
||||
BaseMinMemory: 64 << 30, // params
|
||||
},
|
||||
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
|
||||
MaxMemory: 3 << 29, // 1.5G
|
||||
MinMemory: 1 << 30,
|
||||
|
||||
MultiThread: false, // This is fine
|
||||
CanGPU: true,
|
||||
|
||||
BaseMinMemory: 10 << 30,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
// for now we just reuse params for 2kib and 8mib from 512mib
|
||||
|
||||
for taskType := range ResourceTable {
|
||||
ResourceTable[taskType][abi.RegisteredProof_StackedDRG8MiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal]
|
||||
ResourceTable[taskType][abi.RegisteredProof_StackedDRG2KiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal]
|
||||
}
|
||||
}
|
25
roprov.go
Normal file
25
roprov.go
Normal file
@ -0,0 +1,25 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type readonlyProvider struct {
|
||||
stor *stores.Local
|
||||
}
|
||||
|
||||
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
||||
if allocate != 0 {
|
||||
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
|
||||
}
|
||||
|
||||
p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing)
|
||||
|
||||
return p, done, err
|
||||
}
|
242
sched.go
Normal file
242
sched.go
Normal file
@ -0,0 +1,242 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
)
|
||||
|
||||
const mib = 1 << 20
|
||||
|
||||
type workerRequest struct {
|
||||
taskType sealtasks.TaskType
|
||||
accept []workerID // ordered by preference
|
||||
|
||||
ret chan<- workerResponse
|
||||
cancel <-chan struct{}
|
||||
}
|
||||
|
||||
type workerResponse struct {
|
||||
err error
|
||||
|
||||
worker Worker
|
||||
done func()
|
||||
}
|
||||
|
||||
func (r *workerRequest) respond(resp workerResponse) {
|
||||
select {
|
||||
case r.ret <- resp:
|
||||
case <-r.cancel:
|
||||
log.Warnf("request got cancelled before we could respond")
|
||||
if resp.done != nil {
|
||||
resp.done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type workerHandle struct {
|
||||
w Worker
|
||||
|
||||
info api.WorkerInfo
|
||||
|
||||
memUsedMin uint64
|
||||
memUsedMax uint64
|
||||
gpuUsed bool
|
||||
cpuUse int // -1 - multicore thing; 0 - free; 1+ - singlecore things
|
||||
}
|
||||
|
||||
func (m *Manager) runSched() {
|
||||
for {
|
||||
select {
|
||||
case w := <-m.newWorkers:
|
||||
m.schedNewWorker(w)
|
||||
case req := <-m.schedule:
|
||||
resp, err := m.maybeSchedRequest(req)
|
||||
if err != nil {
|
||||
req.respond(workerResponse{err: err})
|
||||
continue
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
req.respond(*resp)
|
||||
continue
|
||||
}
|
||||
|
||||
m.schedQueue.PushBack(req)
|
||||
case wid := <-m.workerFree:
|
||||
m.onWorkerFreed(wid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) onWorkerFreed(wid workerID) {
|
||||
for e := m.schedQueue.Front(); e != nil; e = e.Next() {
|
||||
req := e.Value.(*workerRequest)
|
||||
var ok bool
|
||||
for _, id := range req.accept {
|
||||
if id == wid {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
resp, err := m.maybeSchedRequest(req)
|
||||
if err != nil {
|
||||
req.respond(workerResponse{err: err})
|
||||
continue
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
req.respond(*resp)
|
||||
|
||||
pe := e.Prev()
|
||||
m.schedQueue.Remove(e)
|
||||
if pe == nil {
|
||||
pe = m.schedQueue.Front()
|
||||
}
|
||||
if pe == nil {
|
||||
break
|
||||
}
|
||||
e = pe
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error) {
|
||||
m.workersLk.Lock()
|
||||
defer m.workersLk.Unlock()
|
||||
|
||||
tried := 0
|
||||
|
||||
for _, id := range req.accept {
|
||||
w, ok := m.workers[id]
|
||||
if !ok {
|
||||
log.Warnf("requested worker %d is not in scheduler", id)
|
||||
}
|
||||
tried++
|
||||
|
||||
canDo, err := m.canHandleRequest(id, w, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !canDo {
|
||||
continue
|
||||
}
|
||||
|
||||
return m.makeResponse(id, w, req), nil
|
||||
}
|
||||
|
||||
if tried == 0 {
|
||||
return nil, xerrors.New("maybeSchedRequest didn't find any good workers")
|
||||
}
|
||||
|
||||
return nil, nil // put in waiting queue
|
||||
}
|
||||
|
||||
func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest) *workerResponse {
|
||||
needRes := ResourceTable[req.taskType][m.scfg.SealProofType]
|
||||
|
||||
w.gpuUsed = needRes.CanGPU
|
||||
if needRes.MultiThread {
|
||||
w.cpuUse = -1
|
||||
} else {
|
||||
if w.cpuUse != -1 {
|
||||
w.cpuUse++
|
||||
} else {
|
||||
log.Warnf("sched: makeResponse for worker %d: worker cpu is in multicore use, but a single core task was scheduled", wid)
|
||||
}
|
||||
}
|
||||
|
||||
w.memUsedMin += needRes.MinMemory
|
||||
w.memUsedMax += needRes.MaxMemory
|
||||
|
||||
return &workerResponse{
|
||||
err: nil,
|
||||
worker: w.w,
|
||||
done: func() {
|
||||
m.workersLk.Lock()
|
||||
|
||||
if needRes.CanGPU {
|
||||
w.gpuUsed = false
|
||||
}
|
||||
|
||||
if needRes.MultiThread {
|
||||
w.cpuUse = 0
|
||||
} else if w.cpuUse != -1 {
|
||||
w.cpuUse--
|
||||
}
|
||||
|
||||
w.memUsedMin -= needRes.MinMemory
|
||||
w.memUsedMax -= needRes.MaxMemory
|
||||
|
||||
m.workersLk.Unlock()
|
||||
|
||||
select {
|
||||
case m.workerFree <- wid:
|
||||
case <-m.closing:
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) canHandleRequest(wid workerID, w *workerHandle, req *workerRequest) (bool, error) {
|
||||
needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType]
|
||||
if !ok {
|
||||
return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType)
|
||||
}
|
||||
|
||||
res := w.info.Resources
|
||||
|
||||
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
|
||||
minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
|
||||
if minNeedMem > res.MemPhysical {
|
||||
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
|
||||
if m.scfg.SealProofType == abi.RegisteredProof_StackedDRG32GiBSeal {
|
||||
maxNeedMem += MaxCachingOverhead
|
||||
}
|
||||
if maxNeedMem > res.MemSwap+res.MemPhysical {
|
||||
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if needRes.MultiThread {
|
||||
if w.cpuUse != 0 {
|
||||
log.Debugf("sched: not scheduling on worker %d; multicore process needs free CPU", wid)
|
||||
return false, nil
|
||||
}
|
||||
} else {
|
||||
if w.cpuUse == -1 {
|
||||
log.Debugf("sched: not scheduling on worker %d; CPU in use by a multicore process", wid)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(res.GPUs) > 0 && needRes.CanGPU {
|
||||
if w.gpuUsed {
|
||||
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (m *Manager) schedNewWorker(w *workerHandle) {
|
||||
m.workersLk.Lock()
|
||||
defer m.workersLk.Unlock()
|
||||
|
||||
id := m.nextWorker
|
||||
m.workers[id] = w
|
||||
m.nextWorker++
|
||||
}
|
13
sealtasks/task.go
Normal file
13
sealtasks/task.go
Normal file
@ -0,0 +1,13 @@
|
||||
package sealtasks
|
||||
|
||||
type TaskType string
|
||||
|
||||
const (
|
||||
TTAddPiece TaskType = "seal/v0/addpiece"
|
||||
TTPreCommit1 TaskType = "seal/v0/precommit/1"
|
||||
TTPreCommit2 TaskType = "seal/v0/precommit/2"
|
||||
TTCommit1 TaskType = "seal/v0/commit/1" // NOTE: We use this to transfer the sector into miner-local storage for now; Don't use on workers!
|
||||
TTCommit2 TaskType = "seal/v0/commit/2"
|
||||
|
||||
TTFinalize TaskType = "seal/v0/finalize"
|
||||
)
|
56
sectorutil/utils.go
Normal file
56
sectorutil/utils.go
Normal file
@ -0,0 +1,56 @@
|
||||
package sectorutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
func ParseSectorID(baseName string) (abi.SectorID, error) {
|
||||
var n abi.SectorNumber
|
||||
var mid abi.ActorID
|
||||
read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n)
|
||||
if err != nil {
|
||||
return abi.SectorID{}, xerrors.Errorf(": %w", err)
|
||||
}
|
||||
|
||||
if read != 2 {
|
||||
return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read)
|
||||
}
|
||||
|
||||
return abi.SectorID{
|
||||
Miner: mid,
|
||||
Number: n,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func SectorName(sid abi.SectorID) string {
|
||||
return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
|
||||
}
|
||||
|
||||
func PathByType(sps sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType) string {
|
||||
switch fileType {
|
||||
case sectorbuilder.FTUnsealed:
|
||||
return sps.Unsealed
|
||||
case sectorbuilder.FTSealed:
|
||||
return sps.Sealed
|
||||
case sectorbuilder.FTCache:
|
||||
return sps.Cache
|
||||
}
|
||||
|
||||
panic("requested unknown path type")
|
||||
}
|
||||
|
||||
func SetPathByType(sps *sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType, p string) {
|
||||
switch fileType {
|
||||
case sectorbuilder.FTUnsealed:
|
||||
sps.Unsealed = p
|
||||
case sectorbuilder.FTSealed:
|
||||
sps.Sealed = p
|
||||
case sectorbuilder.FTCache:
|
||||
sps.Cache = p
|
||||
}
|
||||
}
|
125
stores/http_handler.go
Normal file
125
stores/http_handler.go
Normal file
@ -0,0 +1,125 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/lib/tarutil"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
|
||||
)
|
||||
|
||||
var log = logging.Logger("stores")
|
||||
|
||||
type FetchHandler struct {
|
||||
*Local
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
|
||||
mux := mux.NewRouter()
|
||||
|
||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
|
||||
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
|
||||
|
||||
mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
|
||||
log.Infof("SERVE GET %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
|
||||
id, err := sectorutil.ParseSectorID(vars["id"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
ft, err := ftFromString(vars["type"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, 0, false)
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
defer done()
|
||||
|
||||
path := sectorutil.PathByType(paths, ft)
|
||||
if path == "" {
|
||||
log.Error("acquired path was empty")
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var rd io.Reader
|
||||
if stat.IsDir() {
|
||||
rd, err = tarutil.TarDirectory(path)
|
||||
w.Header().Set("Content-Type", "application/x-tar")
|
||||
} else {
|
||||
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(200)
|
||||
if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) {
|
||||
log.Infof("SERVE DELETE %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
|
||||
id, err := sectorutil.ParseSectorID(vars["id"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
ft, err := ftFromString(vars["type"])
|
||||
if err != nil {
|
||||
log.Error("%+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := handler.delete(r.Context(), id, ft); err != nil {
|
||||
log.Error("%+v", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func ftFromString(t string) (sectorbuilder.SectorFileType, error) {
|
||||
switch t {
|
||||
case sectorbuilder.FTUnsealed.String():
|
||||
return sectorbuilder.FTUnsealed, nil
|
||||
case sectorbuilder.FTSealed.String():
|
||||
return sectorbuilder.FTSealed, nil
|
||||
case sectorbuilder.FTCache.String():
|
||||
return sectorbuilder.FTCache, nil
|
||||
default:
|
||||
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
|
||||
}
|
||||
}
|
312
stores/index.go
Normal file
312
stores/index.go
Normal file
@ -0,0 +1,312 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
gopath "path"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
|
||||
)
|
||||
|
||||
// ID identifies sector storage by UUID. One sector storage should map to one
|
||||
// filesystem, local or networked / shared by multiple machines
|
||||
type ID string
|
||||
|
||||
type StorageInfo struct {
|
||||
ID ID
|
||||
URLs []string // TODO: Support non-http transports
|
||||
Weight uint64
|
||||
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
}
|
||||
|
||||
type SectorIndex interface { // part of storage-miner api
|
||||
StorageAttach(context.Context, StorageInfo, FsStat) error
|
||||
StorageInfo(context.Context, ID) (StorageInfo, error)
|
||||
// TODO: StorageUpdateStats(FsStat)
|
||||
|
||||
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
|
||||
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
|
||||
StorageFindSector(ctx context.Context, sector abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error)
|
||||
|
||||
StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error)
|
||||
}
|
||||
|
||||
type Decl struct {
|
||||
abi.SectorID
|
||||
sectorbuilder.SectorFileType
|
||||
}
|
||||
|
||||
type storageEntry struct {
|
||||
info *StorageInfo
|
||||
fsi FsStat
|
||||
}
|
||||
|
||||
type Index struct {
|
||||
lk sync.RWMutex
|
||||
|
||||
sectors map[Decl][]ID
|
||||
stores map[ID]*storageEntry
|
||||
}
|
||||
|
||||
func NewIndex() *Index {
|
||||
return &Index{
|
||||
sectors: map[Decl][]ID{},
|
||||
stores: map[ID]*storageEntry{},
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
|
||||
byID := map[ID]map[abi.SectorID]sectorbuilder.SectorFileType{}
|
||||
|
||||
for id := range i.stores {
|
||||
byID[id] = map[abi.SectorID]sectorbuilder.SectorFileType{}
|
||||
}
|
||||
for decl, ids := range i.sectors {
|
||||
for _, id := range ids {
|
||||
byID[id][decl.SectorID] |= decl.SectorFileType
|
||||
}
|
||||
}
|
||||
|
||||
out := map[ID][]Decl{}
|
||||
for id, m := range byID {
|
||||
out[id] = []Decl{}
|
||||
for sectorID, fileType := range m {
|
||||
out[id] = append(out[id], Decl{
|
||||
SectorID: sectorID,
|
||||
SectorFileType: fileType,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
||||
log.Infof("New sector storage: %s", si.ID)
|
||||
|
||||
if _, ok := i.stores[si.ID]; ok {
|
||||
for _, u := range si.URLs {
|
||||
if _, err := url.Parse(u); err != nil {
|
||||
return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err)
|
||||
}
|
||||
}
|
||||
|
||||
i.stores[si.ID].info.URLs = append(i.stores[si.ID].info.URLs, si.URLs...)
|
||||
return nil
|
||||
}
|
||||
i.stores[si.ID] = &storageEntry{
|
||||
info: &si,
|
||||
fsi: st,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&ft == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
d := Decl{s, fileType}
|
||||
|
||||
for _, sid := range i.sectors[d] {
|
||||
if sid == storageId {
|
||||
log.Warnf("sector %v redeclared in %s", storageId)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
i.sectors[d] = append(i.sectors[d], storageId)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&ft == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
d := Decl{s, fileType}
|
||||
|
||||
if len(i.sectors[d]) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
rewritten := make([]ID, 0, len(i.sectors[d])-1)
|
||||
for _, sid := range i.sectors[d] {
|
||||
if sid == storageId {
|
||||
continue
|
||||
}
|
||||
|
||||
rewritten = append(rewritten, sid)
|
||||
}
|
||||
if len(rewritten) == 0 {
|
||||
delete(i.sectors, d)
|
||||
return nil
|
||||
}
|
||||
|
||||
i.sectors[d] = rewritten
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
|
||||
storageIDs := map[ID]uint64{}
|
||||
|
||||
for _, pathType := range pathTypes {
|
||||
if ft&pathType == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, id := range i.sectors[Decl{s, pathType}] {
|
||||
storageIDs[id]++
|
||||
}
|
||||
}
|
||||
|
||||
out := make([]StorageInfo, 0, len(storageIDs))
|
||||
|
||||
for id, n := range storageIDs {
|
||||
st, ok := i.stores[id]
|
||||
if !ok {
|
||||
log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s)
|
||||
continue
|
||||
}
|
||||
|
||||
urls := make([]string, len(st.info.URLs))
|
||||
for k, u := range st.info.URLs {
|
||||
rl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
|
||||
rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s))
|
||||
urls[k] = rl.String()
|
||||
}
|
||||
|
||||
out = append(out, StorageInfo{
|
||||
ID: id,
|
||||
URLs: urls,
|
||||
Weight: st.info.Weight * n, // storage with more sector types is better
|
||||
CanSeal: st.info.CanSeal,
|
||||
CanStore: st.info.CanStore,
|
||||
})
|
||||
}
|
||||
|
||||
if allowFetch {
|
||||
for id, st := range i.stores {
|
||||
if _, ok := storageIDs[id]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
urls := make([]string, len(st.info.URLs))
|
||||
for k, u := range st.info.URLs {
|
||||
rl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse url: %w", err)
|
||||
}
|
||||
|
||||
rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s))
|
||||
urls[k] = rl.String()
|
||||
}
|
||||
|
||||
out = append(out, StorageInfo{
|
||||
ID: id,
|
||||
URLs: urls,
|
||||
Weight: st.info.Weight * 0, // TODO: something better than just '0'
|
||||
CanSeal: st.info.CanSeal,
|
||||
CanStore: st.info.CanStore,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
|
||||
si, found := i.stores[id]
|
||||
if !found {
|
||||
return StorageInfo{}, xerrors.Errorf("sector store not found")
|
||||
}
|
||||
|
||||
return *si.info, nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
|
||||
var candidates []storageEntry
|
||||
|
||||
for _, p := range i.stores {
|
||||
if sealing && !p.info.CanSeal {
|
||||
log.Debugf("alloc: not considering %s; can't seal", p.info.ID)
|
||||
continue
|
||||
}
|
||||
if !sealing && !p.info.CanStore {
|
||||
log.Debugf("alloc: not considering %s; can't store", p.info.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: filter out of space
|
||||
|
||||
candidates = append(candidates, *p)
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
return nil, xerrors.New("no good path found")
|
||||
}
|
||||
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Free)), big.NewInt(int64(candidates[i].info.Weight)))
|
||||
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Free)), big.NewInt(int64(candidates[j].info.Weight)))
|
||||
|
||||
return iw.GreaterThan(jw)
|
||||
})
|
||||
|
||||
out := make([]StorageInfo, len(candidates))
|
||||
for i, candidate := range candidates {
|
||||
out[i] = *candidate.info
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (i *Index) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]ID, error) {
|
||||
i.lk.RLock()
|
||||
defer i.lk.RUnlock()
|
||||
|
||||
return i.sectors[Decl{
|
||||
SectorID: id,
|
||||
SectorFileType: typ,
|
||||
}], nil
|
||||
}
|
||||
|
||||
var _ SectorIndex = &Index{}
|
32
stores/interface.go
Normal file
32
stores/interface.go
Normal file
@ -0,0 +1,32 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
||||
}
|
||||
|
||||
type FsStat struct {
|
||||
Capacity uint64
|
||||
Free uint64 // Free to use for sector storage
|
||||
}
|
||||
|
||||
func Stat(path string) (FsStat, error) {
|
||||
var stat syscall.Statfs_t
|
||||
if err := syscall.Statfs(path, &stat); err != nil {
|
||||
return FsStat{}, xerrors.Errorf("statfs: %w", err)
|
||||
}
|
||||
|
||||
return FsStat{
|
||||
Capacity: stat.Blocks * uint64(stat.Bsize),
|
||||
Free: stat.Bavail * uint64(stat.Bsize),
|
||||
}, nil
|
||||
}
|
314
stores/local.go
Normal file
314
stores/local.go
Normal file
@ -0,0 +1,314 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"math/bits"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
|
||||
)
|
||||
|
||||
type StoragePath struct {
|
||||
ID ID
|
||||
Weight uint64
|
||||
|
||||
LocalPath string
|
||||
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
}
|
||||
|
||||
// [path]/sectorstore.json
|
||||
type LocalStorageMeta struct {
|
||||
ID ID
|
||||
Weight uint64 // 0 = readonly
|
||||
|
||||
CanSeal bool
|
||||
CanStore bool
|
||||
}
|
||||
|
||||
type LocalStorage interface {
|
||||
GetStorage() (config.StorageConfig, error)
|
||||
SetStorage(func(*config.StorageConfig)) error
|
||||
}
|
||||
|
||||
const MetaFile = "sectorstore.json"
|
||||
|
||||
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
|
||||
|
||||
type Local struct {
|
||||
localStorage LocalStorage
|
||||
index SectorIndex
|
||||
urls []string
|
||||
|
||||
paths map[ID]*path
|
||||
|
||||
localLk sync.RWMutex
|
||||
}
|
||||
|
||||
type path struct {
|
||||
local string // absolute local path
|
||||
}
|
||||
|
||||
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
|
||||
l := &Local{
|
||||
localStorage: ls,
|
||||
index: index,
|
||||
urls: urls,
|
||||
|
||||
paths: map[ID]*path{},
|
||||
}
|
||||
return l, l.open(ctx)
|
||||
}
|
||||
|
||||
func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
st.localLk.Lock()
|
||||
defer st.localLk.Unlock()
|
||||
|
||||
mb, err := ioutil.ReadFile(filepath.Join(p, MetaFile))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
|
||||
}
|
||||
|
||||
var meta LocalStorageMeta
|
||||
if err := json.Unmarshal(mb, &meta); err != nil {
|
||||
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
|
||||
}
|
||||
|
||||
// TODO: Check existing / dedupe
|
||||
|
||||
out := &path{
|
||||
local: p,
|
||||
}
|
||||
|
||||
fst, err := Stat(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = st.index.StorageAttach(ctx, StorageInfo{
|
||||
ID: meta.ID,
|
||||
URLs: st.urls,
|
||||
Weight: meta.Weight,
|
||||
CanSeal: meta.CanSeal,
|
||||
CanStore: meta.CanStore,
|
||||
}, fst)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("declaring storage in index: %w", err)
|
||||
}
|
||||
|
||||
for _, t := range pathTypes {
|
||||
ents, err := ioutil.ReadDir(filepath.Join(p, t.String()))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(filepath.Join(p, t.String()), 0755); err != nil {
|
||||
return xerrors.Errorf("openPath mkdir '%s': %w", filepath.Join(p, t.String()), err)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
return xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err)
|
||||
}
|
||||
|
||||
for _, ent := range ents {
|
||||
sid, err := sectorutil.ParseSectorID(ent.Name())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
|
||||
}
|
||||
|
||||
if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t); err != nil {
|
||||
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
st.paths[meta.ID] = out
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) open(ctx context.Context) error {
|
||||
cfg, err := st.localStorage.GetStorage()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting local storage config: %w", err)
|
||||
}
|
||||
|
||||
for _, path := range cfg.StoragePaths {
|
||||
err := st.OpenPath(ctx, path.Path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("opening path %s: %w", path.Path, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
|
||||
if existing|allocate != existing^allocate {
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||
}
|
||||
|
||||
st.localLk.RLock()
|
||||
|
||||
var out sectorbuilder.SectorPaths
|
||||
var storageIDs sectorbuilder.SectorPaths
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&existing == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
si, err := st.index.StorageFindSector(ctx, sid, fileType, false)
|
||||
if err != nil {
|
||||
log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, info := range si {
|
||||
p, ok := st.paths[info.ID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.local == "" { // TODO: can that even be the case?
|
||||
continue
|
||||
}
|
||||
|
||||
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
|
||||
sectorutil.SetPathByType(&out, fileType, spath)
|
||||
sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID))
|
||||
|
||||
existing ^= fileType
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&allocate == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing)
|
||||
if err != nil {
|
||||
st.localLk.RUnlock()
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
|
||||
}
|
||||
|
||||
var best string
|
||||
var bestID ID
|
||||
|
||||
for _, si := range sis {
|
||||
p, ok := st.paths[si.ID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.local == "" { // TODO: can that even be the case?
|
||||
continue
|
||||
}
|
||||
|
||||
if sealing && !si.CanSeal {
|
||||
continue
|
||||
}
|
||||
|
||||
if !sealing && !si.CanStore {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Check free space
|
||||
|
||||
best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
|
||||
bestID = si.ID
|
||||
}
|
||||
|
||||
if best == "" {
|
||||
st.localLk.RUnlock()
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
|
||||
}
|
||||
|
||||
sectorutil.SetPathByType(&out, fileType, best)
|
||||
sectorutil.SetPathByType(&storageIDs, fileType, string(bestID))
|
||||
allocate ^= fileType
|
||||
}
|
||||
|
||||
return out, storageIDs, st.localLk.RUnlock, nil
|
||||
}
|
||||
|
||||
func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
|
||||
st.localLk.RLock()
|
||||
defer st.localLk.RUnlock()
|
||||
|
||||
var out []StoragePath
|
||||
for id, p := range st.paths {
|
||||
if p.local == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
si, err := st.index.StorageInfo(ctx, id)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("get storage info for %s: %w", id, err)
|
||||
}
|
||||
|
||||
out = append(out, StoragePath{
|
||||
ID: id,
|
||||
Weight: si.Weight,
|
||||
LocalPath: p.local,
|
||||
CanSeal: si.CanSeal,
|
||||
CanStore: si.CanStore,
|
||||
})
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
||||
if bits.OnesCount(uint(typ)) != 1 {
|
||||
return xerrors.New("delete expects one file type")
|
||||
}
|
||||
|
||||
si, err := st.index.StorageFindSector(ctx, sid, typ, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
|
||||
}
|
||||
|
||||
for _, info := range si {
|
||||
p, ok := st.paths[info.ID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if p.local == "" { // TODO: can that even be the case?
|
||||
continue
|
||||
}
|
||||
|
||||
spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid))
|
||||
log.Infof("remove %s", spath)
|
||||
|
||||
if err := os.RemoveAll(spath); err != nil {
|
||||
log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) FsStat(id ID) (FsStat, error) {
|
||||
st.localLk.RLock()
|
||||
defer st.localLk.RUnlock()
|
||||
|
||||
p, ok := st.paths[id]
|
||||
if !ok {
|
||||
return FsStat{}, xerrors.Errorf("fsstat: path not found")
|
||||
}
|
||||
|
||||
return Stat(p.local)
|
||||
}
|
204
stores/remote.go
Normal file
204
stores/remote.go
Normal file
@ -0,0 +1,204 @@
|
||||
package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"mime"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/tarutil"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
|
||||
)
|
||||
|
||||
type Remote struct {
|
||||
local *Local
|
||||
index SectorIndex
|
||||
auth http.Header
|
||||
|
||||
fetchLk sync.Mutex // TODO: this can be much smarter
|
||||
// TODO: allow multiple parallel fetches
|
||||
// (make sure to not fetch the same sector data twice)
|
||||
}
|
||||
|
||||
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
||||
return &Remote{
|
||||
local: local,
|
||||
index: index,
|
||||
auth: auth,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
|
||||
if existing|allocate != existing^allocate {
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||
}
|
||||
|
||||
r.fetchLk.Lock()
|
||||
defer r.fetchLk.Unlock()
|
||||
|
||||
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
|
||||
if err != nil {
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
|
||||
}
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&existing == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if sectorutil.PathByType(paths, fileType) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
ap, storageID, url, foundIn, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
|
||||
if err != nil {
|
||||
done()
|
||||
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
|
||||
}
|
||||
|
||||
done = mergeDone(done, rdone)
|
||||
sectorutil.SetPathByType(&paths, fileType, ap)
|
||||
sectorutil.SetPathByType(&stores, fileType, string(storageID))
|
||||
|
||||
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
|
||||
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: some way to allow having duplicated sectors in the system for perf
|
||||
if err := r.index.StorageDropSector(ctx, foundIn, s, fileType); err != nil {
|
||||
log.Warnf("dropping sector %v from %s from sector index failed: %+v", s, storageID, err)
|
||||
}
|
||||
|
||||
if err := r.deleteFromRemote(url); err != nil {
|
||||
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
||||
}
|
||||
}
|
||||
|
||||
return paths, stores, done, nil
|
||||
}
|
||||
|
||||
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, ID, func(), error) {
|
||||
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
|
||||
if err != nil {
|
||||
return "", "", "", "", nil, err
|
||||
}
|
||||
|
||||
sort.Slice(si, func(i, j int) bool {
|
||||
return si[i].Weight < si[j].Weight
|
||||
})
|
||||
|
||||
apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
|
||||
if err != nil {
|
||||
return "", "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
dest := sectorutil.PathByType(apaths, fileType)
|
||||
storageID := sectorutil.PathByType(ids, fileType)
|
||||
|
||||
var merr error
|
||||
for _, info := range si {
|
||||
for _, url := range info.URLs {
|
||||
err := r.fetch(url, dest)
|
||||
if err != nil {
|
||||
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err))
|
||||
continue
|
||||
}
|
||||
|
||||
if merr != nil {
|
||||
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
|
||||
}
|
||||
return dest, ID(storageID), url, info.ID, done, nil
|
||||
}
|
||||
}
|
||||
|
||||
done()
|
||||
return "", "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
|
||||
}
|
||||
|
||||
func (r *Remote) fetch(url, outname string) error {
|
||||
log.Infof("Fetch %s -> %s", url, outname)
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
/*bar := pb.New64(w.sizeForType(typ))
|
||||
bar.ShowPercent = true
|
||||
bar.ShowSpeed = true
|
||||
bar.Units = pb.U_BYTES
|
||||
|
||||
barreader := bar.NewProxyReader(resp.Body)
|
||||
|
||||
bar.Start()
|
||||
defer bar.Finish()*/
|
||||
|
||||
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parse media type: %w", err)
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(outname); err != nil {
|
||||
return xerrors.Errorf("removing dest: %w", err)
|
||||
}
|
||||
|
||||
switch mediatype {
|
||||
case "application/x-tar":
|
||||
return tarutil.ExtractTar(resp.Body, outname)
|
||||
case "application/octet-stream":
|
||||
return files.WriteTo(files.NewReaderFile(resp.Body), outname)
|
||||
default:
|
||||
return xerrors.Errorf("unknown content type: '%s'", mediatype)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) deleteFromRemote(url string) error {
|
||||
log.Infof("Delete %s", url)
|
||||
|
||||
req, err := http.NewRequest("DELETE", url, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeDone(a func(), b func()) func() {
|
||||
return func() {
|
||||
a()
|
||||
b()
|
||||
}
|
||||
}
|
||||
|
||||
var _ Store = &Remote{}
|
198
worker_local.go
Normal file
198
worker_local.go
Normal file
@ -0,0 +1,198 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/elastic/go-sysinfo"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
storage2 "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
|
||||
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
|
||||
)
|
||||
|
||||
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
|
||||
|
||||
type WorkerConfig struct {
|
||||
SealProof abi.RegisteredProof
|
||||
TaskTypes []sealtasks.TaskType
|
||||
}
|
||||
|
||||
type LocalWorker struct {
|
||||
scfg *sectorbuilder.Config
|
||||
storage stores.Store
|
||||
localStore *stores.Local
|
||||
sindex stores.SectorIndex
|
||||
|
||||
acceptTasks map[sealtasks.TaskType]struct{}
|
||||
}
|
||||
|
||||
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
|
||||
ppt, err := wcfg.SealProof.RegisteredPoStProof()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
acceptTasks := map[sealtasks.TaskType]struct{}{}
|
||||
for _, taskType := range wcfg.TaskTypes {
|
||||
acceptTasks[taskType] = struct{}{}
|
||||
}
|
||||
|
||||
return &LocalWorker{
|
||||
scfg: §orbuilder.Config{
|
||||
SealProofType: wcfg.SealProof,
|
||||
PoStProofType: ppt,
|
||||
},
|
||||
storage: store,
|
||||
localStore: local,
|
||||
sindex: sindex,
|
||||
|
||||
acceptTasks: acceptTasks,
|
||||
}
|
||||
}
|
||||
|
||||
type localWorkerPathProvider struct {
|
||||
w *LocalWorker
|
||||
}
|
||||
|
||||
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
||||
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
|
||||
if err != nil {
|
||||
return sectorbuilder.SectorPaths{}, nil, err
|
||||
}
|
||||
|
||||
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
|
||||
|
||||
return paths, func() {
|
||||
done()
|
||||
|
||||
for _, fileType := range pathTypes {
|
||||
if fileType&allocate == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
sid := sectorutil.PathByType(storageIDs, fileType)
|
||||
|
||||
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil {
|
||||
log.Errorf("declare sector error: %+v", err)
|
||||
}
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *LocalWorker) sb() (sectorbuilder.Basic, error) {
|
||||
return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sb.NewSector(ctx, sector)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, err
|
||||
}
|
||||
|
||||
return sb.AddPiece(ctx, sector, epcs, sz, r)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sb.SealPreCommit1(ctx, sector, ticket, pieces)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return storage2.SectorCids{}, err
|
||||
}
|
||||
|
||||
return sb.SealPreCommit2(ctx, sector, phase1Out)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sb.SealCommit2(ctx, sector, phase1Out)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||
sb, err := l.sb()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sb.FinalizeSector(ctx, sector)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
|
||||
return l.acceptTasks, nil
|
||||
}
|
||||
|
||||
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
|
||||
return l.localStore.Local(ctx)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) {
|
||||
hostname, err := os.Hostname() // TODO: allow overriding from config
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gpus, err := ffi.GetGPUDevices()
|
||||
if err != nil {
|
||||
log.Errorf("getting gpu devices failed: %+v", err)
|
||||
}
|
||||
|
||||
h, err := sysinfo.Host()
|
||||
if err != nil {
|
||||
return api.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err)
|
||||
}
|
||||
|
||||
mem, err := h.Memory()
|
||||
if err != nil {
|
||||
return api.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
|
||||
}
|
||||
|
||||
return api.WorkerInfo{
|
||||
Hostname: hostname,
|
||||
Resources: api.WorkerResources{
|
||||
MemPhysical: mem.Total,
|
||||
MemSwap: mem.VirtualTotal,
|
||||
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
|
||||
GPUs: gpus,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
var _ Worker = &LocalWorker{}
|
45
worker_remote.go
Normal file
45
worker_remote.go
Normal file
@ -0,0 +1,45 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
storage2 "github.com/filecoin-project/specs-storage/storage"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
)
|
||||
|
||||
type remote struct {
|
||||
api.WorkerApi
|
||||
}
|
||||
|
||||
func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error {
|
||||
return xerrors.New("unsupported")
|
||||
}
|
||||
|
||||
func (r *remote) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
|
||||
return abi.PieceInfo{}, xerrors.New("unsupported")
|
||||
}
|
||||
|
||||
func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, error) {
|
||||
token, err := fa.AuthNew(ctx, []api.Permission{"admin"})
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating auth token for remote connection: %w", err)
|
||||
}
|
||||
|
||||
headers := http.Header{}
|
||||
headers.Add("Authorization", "Bearer "+string(token))
|
||||
|
||||
wapi, close, err := client.NewWorkerRPC(url, headers)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating jsonrpc client: %w", err)
|
||||
}
|
||||
_ = close // TODO
|
||||
|
||||
return &remote{wapi}, nil
|
||||
}
|
||||
|
||||
var _ Worker = &remote{}
|
Loading…
Reference in New Issue
Block a user