commit e40c5d441ddcc13892607e7aeab4e94eb2ea7fb9 Author: Ɓukasz Magiera Date: Mon Mar 23 12:40:02 2020 +0100 Rename agvmgr+sealmgr to sectorstorage diff --git a/manager.go b/manager.go new file mode 100644 index 000000000..08597bd31 --- /dev/null +++ b/manager.go @@ -0,0 +1,410 @@ +package sectorstorage + +import ( + "container/list" + "context" + "io" + "net/http" + "sync" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/mitchellh/go-homedir" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" +) + +var log = logging.Logger("advmgr") + +type URLs []string + +type Worker interface { + sectorbuilder.Sealer + + TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) + + // Returns paths accessible to the worker + Paths(context.Context) ([]stores.StoragePath, error) + + Info(context.Context) (api.WorkerInfo, error) +} + +type SectorManager interface { + SectorSize() abi.SectorSize + + ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) + + sectorbuilder.Sealer + storage.Prover +} + +type workerID uint64 + +type Manager struct { + scfg *sectorbuilder.Config + + ls stores.LocalStorage + storage *stores.Remote + localStore *stores.Local + remoteHnd *stores.FetchHandler + index stores.SectorIndex + + storage.Prover + + workersLk sync.Mutex + nextWorker workerID + workers map[workerID]*workerHandle + + newWorkers chan *workerHandle + schedule chan *workerRequest + workerFree chan workerID + closing chan struct{} + + schedQueue *list.List // List[*workerRequest] +} + +func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) { + ctx := context.TODO() + + lstor, err := stores.NewLocal(ctx, ls, si, urls) + if err != nil { + return nil, err + } + + prover, err := sectorbuilder.New(&readonlyProvider{stor: lstor}, cfg) + if err != nil { + return nil, xerrors.Errorf("creating prover instance: %w", err) + } + + token, err := ca.AuthNew(context.TODO(), []api.Permission{"admin"}) + headers := http.Header{} + headers.Add("Authorization", "Bearer "+string(token)) + stor := stores.NewRemote(lstor, si, headers) + + m := &Manager{ + scfg: cfg, + + ls: ls, + storage: stor, + localStore: lstor, + remoteHnd: &stores.FetchHandler{Local: lstor}, + index: si, + + nextWorker: 0, + workers: map[workerID]*workerHandle{}, + + newWorkers: make(chan *workerHandle), + schedule: make(chan *workerRequest), + workerFree: make(chan workerID), + closing: make(chan struct{}), + + schedQueue: list.New(), + + Prover: prover, + } + + go m.runSched() + + err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ + SealProof: cfg.SealProofType, + TaskTypes: []sealtasks.TaskType{sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize}, + }, stor, lstor, si)) + if err != nil { + return nil, xerrors.Errorf("adding local worker: %w", err) + } + + return m, nil +} + +func (m *Manager) AddLocalStorage(ctx context.Context, path string) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + if err := m.localStore.OpenPath(ctx, path); err != nil { + return xerrors.Errorf("opening local path: %w", err) + } + + if err := m.ls.SetStorage(func(sc *config.StorageConfig) { + sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path}) + }); err != nil { + return xerrors.Errorf("get storage config: %w", err) + } + return nil +} + +func (m *Manager) AddWorker(ctx context.Context, w Worker) error { + info, err := w.Info(ctx) + if err != nil { + return xerrors.Errorf("getting worker info: %w", err) + } + + m.newWorkers <- &workerHandle{ + w: w, + info: info, + } + return nil +} + +func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.remoteHnd.ServeHTTP(w, r) +} + +func (m *Manager) SectorSize() abi.SectorSize { + sz, _ := m.scfg.SealProofType.SectorSize() + return sz +} + +func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) { + panic("implement me") +} + +func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]workerID, map[workerID]stores.StorageInfo) { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + var workers []workerID + paths := map[workerID]stores.StorageInfo{} + + for i, worker := range m.workers { + tt, err := worker.w.TaskTypes(context.TODO()) + if err != nil { + log.Errorf("error getting supported worker task types: %+v", err) + continue + } + if _, ok := tt[task]; !ok { + log.Debugf("dropping worker %d; task %s not supported (supports %v)", i, task, tt) + continue + } + + phs, err := worker.w.Paths(context.TODO()) + if err != nil { + log.Errorf("error getting worker paths: %+v", err) + continue + } + + // check if the worker has access to the path we selected + var st *stores.StorageInfo + for _, p := range phs { + for _, meta := range inPaths { + if p.ID == meta.ID { + if st != nil && st.Weight > p.Weight { + continue + } + + p := meta // copy + st = &p + } + } + } + if st == nil { + log.Debugf("skipping worker %d; doesn't have any of %v", i, inPaths) + log.Debugf("skipping worker %d; only has %v", i, phs) + continue + } + + paths[i] = *st + workers = append(workers, i) + } + + return workers, paths +} + +func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []workerID) (Worker, func(), error) { + ret := make(chan workerResponse) + + select { + case m.schedule <- &workerRequest{ + taskType: taskType, + accept: accept, + + cancel: ctx.Done(), + ret: ret, + }: + case <-m.closing: + return nil, nil, xerrors.New("closing") + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + + select { + case resp := <-ret: + return resp.worker, resp.done, resp.err + case <-m.closing: + return nil, nil, xerrors.New("closing") + case <-ctx.Done(): + return nil, nil, ctx.Err() + } +} + +func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { + log.Warnf("stub NewSector") + return nil +} + +func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { + // TODO: consider multiple paths vs workers when initially allocating + + var best []stores.StorageInfo + var err error + if len(existingPieces) == 0 { // new + best, err = m.index.StorageBestAlloc(ctx, sectorbuilder.FTUnsealed, true) + } else { // append to existing + best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed, false) + } + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err) + } + + log.Debugf("find workers for %v", best) + candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best) + + if len(candidateWorkers) == 0 { + return abi.PieceInfo{}, xerrors.New("no worker found") + } + + worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + + // TODO: select(candidateWorkers, ...) + // TODO: remove the sectorbuilder abstraction, pass path directly + return worker.AddPiece(ctx, sector, existingPieces, sz, r) +} + +func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { + // TODO: also consider where the unsealed data sits + + best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) + if err != nil { + return nil, xerrors.Errorf("finding path for sector sealing: %w", err) + } + + candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best) + if len(candidateWorkers) == 0 { + return nil, xerrors.New("no suitable workers found") + } + + worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers) + if err != nil { + return nil, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + + // TODO: select(candidateWorkers, ...) + // TODO: remove the sectorbuilder abstraction, pass path directly + return worker.SealPreCommit1(ctx, sector, ticket, pieces) +} + +func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) { + // TODO: allow workers to fetch the sectors + + best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) + if err != nil { + return storage.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err) + } + + candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best) + if len(candidateWorkers) == 0 { + return storage.SectorCids{}, xerrors.New("no suitable workers found") + } + + worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers) + if err != nil { + return storage.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + + // TODO: select(candidateWorkers, ...) + // TODO: remove the sectorbuilder abstraction, pass path directly + return worker.SealPreCommit2(ctx, sector, phase1Out) +} + +func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) { + best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) + if err != nil { + return nil, xerrors.Errorf("finding path for sector sealing: %w", err) + } + + candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best) + if len(candidateWorkers) == 0 { + return nil, xerrors.New("no suitable workers found") // TODO: wait? + } + + // TODO: Try very hard to execute on worker with access to the sectors + worker, done, err := m.getWorker(ctx, sealtasks.TTCommit1, candidateWorkers) + if err != nil { + return nil, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + + // TODO: select(candidateWorkers, ...) + // TODO: remove the sectorbuilder abstraction, pass path directly + return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) +} + +func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) { + var candidateWorkers []workerID + + m.workersLk.Lock() + for id, worker := range m.workers { + tt, err := worker.w.TaskTypes(ctx) + if err != nil { + log.Errorf("error getting supported worker task types: %+v", err) + continue + } + if _, ok := tt[sealtasks.TTCommit2]; !ok { + continue + } + candidateWorkers = append(candidateWorkers, id) + } + m.workersLk.Unlock() + + worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers) + if err != nil { + return nil, xerrors.Errorf("scheduling worker: %w", err) + } + defer done() + + return worker.SealCommit2(ctx, sector, phase1Out) +} + +func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { + best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed, true) + if err != nil { + return xerrors.Errorf("finding sealed sector: %w", err) + } + + candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best) + + // TODO: Remove sector from sealing stores + // TODO: Move the sector to long-term storage + return m.workers[candidateWorkers[0]].w.FinalizeSector(ctx, sector) +} + +func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) { + l, err := m.localStore.Local(ctx) + if err != nil { + return nil, err + } + + out := map[stores.ID]string{} + for _, st := range l { + out[st.ID] = st.LocalPath + } + + return out, nil +} + +var _ SectorManager = &Manager{} diff --git a/mock/mock.go b/mock/mock.go new file mode 100644 index 000000000..fdaae7f80 --- /dev/null +++ b/mock/mock.go @@ -0,0 +1,371 @@ +package mock + +import ( + "bytes" + "context" + "fmt" + "github.com/filecoin-project/lotus/storage/sectorstorage" + "io" + "io/ioutil" + "math/big" + "math/rand" + "sync" + + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" +) + +var log = logging.Logger("sbmock") + +type SectorMgr struct { + sectors map[abi.SectorID]*sectorState + sectorSize abi.SectorSize + nextSectorID abi.SectorNumber + rateLimit chan struct{} + proofType abi.RegisteredProof + + lk sync.Mutex +} + +type mockVerif struct{} + +func NewMockSectorMgr(threads int, ssize abi.SectorSize) *SectorMgr { + rt, _, err := api.ProofTypeFromSectorSize(ssize) + if err != nil { + panic(err) + } + + return &SectorMgr{ + sectors: make(map[abi.SectorID]*sectorState), + sectorSize: ssize, + nextSectorID: 5, + rateLimit: make(chan struct{}, threads), + proofType: rt, + } +} + +const ( + statePacking = iota + statePreCommit + stateCommit +) + +type sectorState struct { + pieces []cid.Cid + failed bool + + state int + + lk sync.Mutex +} + +func (sb *SectorMgr) RateLimit() func() { + sb.rateLimit <- struct{}{} + + // TODO: probably want to copy over rate limit code + return func() { + <-sb.rateLimit + } +} + +func (sb *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error { + return nil +} + +func (sb *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { + log.Warn("Add piece: ", sectorId, size, sb.proofType) + sb.lk.Lock() + ss, ok := sb.sectors[sectorId] + if !ok { + ss = §orState{ + state: statePacking, + } + sb.sectors[sectorId] = ss + } + sb.lk.Unlock() + ss.lk.Lock() + defer ss.lk.Unlock() + + c, err := sectorbuilder.GeneratePieceCIDFromFile(sb.proofType, r, size) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err) + } + + log.Warn("Generated Piece CID: ", c) + + ss.pieces = append(ss.pieces, c) + return abi.PieceInfo{ + Size: size.Padded(), + PieceCID: c, + }, nil +} + +func (sb *SectorMgr) SectorSize() abi.SectorSize { + return sb.sectorSize +} + +func (sb *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) { + sb.lk.Lock() + defer sb.lk.Unlock() + id := sb.nextSectorID + sb.nextSectorID++ + return id, nil +} + +func (sb *SectorMgr) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { + sb.lk.Lock() + ss, ok := sb.sectors[sid] + sb.lk.Unlock() + if !ok { + return nil, xerrors.Errorf("no sector with id %d in sectorbuilder", sid) + } + + ss.lk.Lock() + defer ss.lk.Unlock() + + ussize := abi.PaddedPieceSize(sb.sectorSize).Unpadded() + + // TODO: verify pieces in sinfo.pieces match passed in pieces + + var sum abi.UnpaddedPieceSize + for _, p := range pieces { + sum += p.Size.Unpadded() + } + + if sum != ussize { + return nil, xerrors.Errorf("aggregated piece sizes don't match up: %d != %d", sum, ussize) + } + + if ss.state != statePacking { + return nil, xerrors.Errorf("cannot call pre-seal on sector not in 'packing' state") + } + + opFinishWait(ctx) + + ss.state = statePreCommit + + pis := make([]abi.PieceInfo, len(ss.pieces)) + for i, piece := range ss.pieces { + pis[i] = abi.PieceInfo{ + Size: pieces[i].Size, + PieceCID: piece, + } + } + + commd, err := MockVerifier.GenerateDataCommitment(abi.PaddedPieceSize(sb.sectorSize), pis) + if err != nil { + return nil, err + } + + cc, _, err := commcid.CIDToCommitment(commd) + if err != nil { + panic(err) + } + + cc[0] ^= 'd' + + return cc, nil +} + +func (sb *SectorMgr) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) { + db := []byte(string(phase1Out)) + db[0] ^= 'd' + + d := commcid.DataCommitmentV1ToCID(db) + + commr := make([]byte, 32) + for i := range db { + commr[32-(i+1)] = db[i] + } + + commR := commcid.DataCommitmentV1ToCID(commr) + + return storage.SectorCids{ + Unsealed: d, + Sealed: commR, + }, nil +} + +func (sb *SectorMgr) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) { + sb.lk.Lock() + ss, ok := sb.sectors[sid] + sb.lk.Unlock() + if !ok { + return nil, xerrors.Errorf("no such sector %d", sid) + } + ss.lk.Lock() + defer ss.lk.Unlock() + + if ss.failed { + return nil, xerrors.Errorf("[mock] cannot commit failed sector %d", sid) + } + + if ss.state != statePreCommit { + return nil, xerrors.Errorf("cannot commit sector that has not been precommitted") + } + + opFinishWait(ctx) + + var out [32]byte + for i := range out { + out[i] = cids.Unsealed.Bytes()[i] + cids.Sealed.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid.Number&0xff) + } + + return out[:], nil +} + +func (sb *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) { + var out [32]byte + for i := range out { + out[i] = phase1Out[i] ^ byte(sid.Number&0xff) + } + + return out[:], nil +} + +// Test Instrumentation Methods + +func (sb *SectorMgr) FailSector(sid abi.SectorID) error { + sb.lk.Lock() + defer sb.lk.Unlock() + ss, ok := sb.sectors[sid] + if !ok { + return fmt.Errorf("no such sector in sectorbuilder") + } + + ss.failed = true + return nil +} + +func opFinishWait(ctx context.Context) { + val, ok := ctx.Value("opfinish").(chan struct{}) + if !ok { + return + } + <-val +} + +func AddOpFinish(ctx context.Context) (context.Context, func()) { + done := make(chan struct{}) + + return context.WithValue(ctx, "opfinish", done), func() { + close(done) + } +} + +func (sb *SectorMgr) GenerateFallbackPoSt(context.Context, abi.ActorID, []abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) (storage.FallbackPostOut, error) { + panic("implement me") +} + +func (sb *SectorMgr) ComputeElectionPoSt(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { + panic("implement me") +} + +func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) { + if len(faults) > 0 { + panic("todo") + } + + n := sectorbuilder.ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults))) + if n > uint64(len(sectorInfo)) { + n = uint64(len(sectorInfo)) + } + + out := make([]storage.PoStCandidateWithTicket, n) + + seed := big.NewInt(0).SetBytes(challengeSeed[:]) + start := seed.Mod(seed, big.NewInt(int64(len(sectorInfo)))).Int64() + + for i := range out { + out[i] = storage.PoStCandidateWithTicket{ + Candidate: abi.PoStCandidate{ + SectorID: abi.SectorID{ + Number: abi.SectorNumber((int(start) + i) % len(sectorInfo)), + Miner: mid, + }, + PartialTicket: abi.PartialTicket(challengeSeed), + }, + } + } + + return out, nil +} + +func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) { + if len(sb.sectors[sectorID].pieces) > 1 { + panic("implme") + } + return ioutil.NopCloser(io.LimitReader(bytes.NewReader(sb.sectors[sectorID].pieces[0].Bytes()[offset:]), int64(size))), nil +} + +func (sb *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) { + usize := abi.PaddedPieceSize(sb.sectorSize).Unpadded() + sid, err := sb.AcquireSectorNumber() + if err != nil { + return abi.SectorID{}, nil, err + } + + buf := make([]byte, usize) + rand.Read(buf) + + id := abi.SectorID{ + Miner: mid, + Number: sid, + } + + pi, err := sb.AddPiece(context.TODO(), id, nil, usize, bytes.NewReader(buf)) + if err != nil { + return abi.SectorID{}, nil, err + } + + return id, []abi.PieceInfo{pi}, nil +} + +func (sb *SectorMgr) FinalizeSector(context.Context, abi.SectorID) error { + return nil +} + +func (m mockVerif) VerifyElectionPost(ctx context.Context, pvi abi.PoStVerifyInfo) (bool, error) { + panic("implement me") +} + +func (m mockVerif) VerifyFallbackPost(ctx context.Context, pvi abi.PoStVerifyInfo) (bool, error) { + panic("implement me") +} + +func (m mockVerif) VerifySeal(svi abi.SealVerifyInfo) (bool, error) { + if len(svi.OnChain.Proof) != 32 { // Real ones are longer, but this should be fine + return false, nil + } + + for i, b := range svi.OnChain.Proof { + if b != svi.UnsealedCID.Bytes()[i]+svi.OnChain.SealedCID.Bytes()[31-i]-svi.InteractiveRandomness[i]*svi.Randomness[i] { + return false, nil + } + } + + return true, nil +} + +func (m mockVerif) GenerateDataCommitment(ssize abi.PaddedPieceSize, pieces []abi.PieceInfo) (cid.Cid, error) { + if len(pieces) != 1 { + panic("todo") + } + if pieces[0].Size != ssize { + fmt.Println("wrong sizes? ", pieces[0].Size, ssize) + panic("todo") + } + return pieces[0].PieceCID, nil +} + +var MockVerifier = mockVerif{} + +var _ sectorbuilder.Verifier = MockVerifier +var _ sectorstorage.SectorManager = &SectorMgr{} diff --git a/mock/mock_test.go b/mock/mock_test.go new file mode 100644 index 000000000..524e8d615 --- /dev/null +++ b/mock/mock_test.go @@ -0,0 +1,45 @@ +package mock + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +func TestOpFinish(t *testing.T) { + sb := NewMockSectorMgr(1, 2048) + + sid, pieces, err := sb.StageFakeData(123) + if err != nil { + t.Fatal(err) + } + + ctx, done := AddOpFinish(context.TODO()) + + finished := make(chan struct{}) + go func() { + _, err := sb.SealPreCommit1(ctx, sid, abi.SealRandomness{}, pieces) + if err != nil { + t.Error(err) + return + } + + close(finished) + }() + + select { + case <-finished: + t.Fatal("should not finish until we tell it to") + case <-time.After(time.Second / 2): + } + + done() + + select { + case <-finished: + case <-time.After(time.Second / 2): + t.Fatal("should finish after we tell it to") + } +} diff --git a/mock/preseal.go b/mock/preseal.go new file mode 100644 index 000000000..6bac0aaea --- /dev/null +++ b/mock/preseal.go @@ -0,0 +1,63 @@ +package mock + +import ( + "github.com/filecoin-project/go-address" + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/crypto" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/genesis" +) + +func PreSeal(ssize abi.SectorSize, maddr address.Address, sectors int) (*genesis.Miner, *types.KeyInfo, error) { + k, err := wallet.GenerateKey(crypto.SigTypeBLS) + if err != nil { + return nil, nil, err + } + + genm := &genesis.Miner{ + Owner: k.Address, + Worker: k.Address, + MarketBalance: big.NewInt(0), + PowerBalance: big.NewInt(0), + SectorSize: ssize, + Sectors: make([]*genesis.PreSeal, sectors), + } + + _, st, err := api.ProofTypeFromSectorSize(ssize) + if err != nil { + return nil, nil, err + } + + for i := range genm.Sectors { + preseal := &genesis.PreSeal{} + + preseal.ProofType = st + preseal.CommD = sectorbuilder.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) + d, _ := commcid.CIDToPieceCommitmentV1(preseal.CommD) + r := commDR(d) + preseal.CommR = commcid.ReplicaCommitmentV1ToCID(r[:]) + preseal.SectorID = abi.SectorNumber(i + 1) + preseal.Deal = market.DealProposal{ + PieceCID: preseal.CommD, + PieceSize: abi.PaddedPieceSize(ssize), + Client: maddr, + Provider: maddr, + StartEpoch: 1, + EndEpoch: 10000, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + + genm.Sectors[i] = preseal + } + + return genm, &k.KeyInfo, nil +} diff --git a/mock/util.go b/mock/util.go new file mode 100644 index 000000000..e37cf3552 --- /dev/null +++ b/mock/util.go @@ -0,0 +1,23 @@ +package mock + +import ( + "crypto/rand" + "io" + "io/ioutil" +) + +func randB(n uint64) []byte { + b, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(n))) + if err != nil { + panic(err) + } + return b +} + +func commDR(in []byte) (out [32]byte) { + for i, b := range in { + out[i] = ^b + } + + return out +} diff --git a/resources.go b/resources.go new file mode 100644 index 000000000..3587b41ea --- /dev/null +++ b/resources.go @@ -0,0 +1,135 @@ +package sectorstorage + +import ( + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var FSOverheadSeal = map[sectorbuilder.SectorFileType]int{ // 10x overheads + sectorbuilder.FTUnsealed: 10, + sectorbuilder.FTSealed: 10, + sectorbuilder.FTCache: 70, // TODO: confirm for 32G +} + +var FsOverheadFinalized = map[sectorbuilder.SectorFileType]int{ + sectorbuilder.FTUnsealed: 10, + sectorbuilder.FTSealed: 10, + sectorbuilder.FTCache: 2, +} + +type Resources struct { + MinMemory uint64 // What Must be in RAM for decent perf + MaxMemory uint64 // Mamory required (swap + ram) + + MultiThread bool + CanGPU bool + + BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads) +} + +const MaxCachingOverhead = 32 << 30 + +var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{ + sealtasks.TTAddPiece: { + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ // This is probably a bit conservative + MaxMemory: 32 << 30, + MinMemory: 32 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 1 << 30, + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + }, + sealtasks.TTPreCommit1: { + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 64 << 30, + MinMemory: 32 << 30, + + MultiThread: false, + + BaseMinMemory: 30 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 3 << 29, // 1.5G + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + }, + sealtasks.TTPreCommit2: { + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 96 << 30, + MinMemory: 64 << 30, + + MultiThread: true, + + BaseMinMemory: 30 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 3 << 29, // 1.5G + MinMemory: 1 << 30, + + MultiThread: true, + + BaseMinMemory: 1 << 30, + }, + }, + sealtasks.TTCommit1: { // Very short (~100ms), so params are very light + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 1 << 30, + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 1 << 30, + MinMemory: 1 << 30, + + MultiThread: false, + + BaseMinMemory: 1 << 30, + }, + }, + sealtasks.TTCommit2: { // TODO: Measure more accurately + abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ + MaxMemory: 110 << 30, + MinMemory: 60 << 30, + + MultiThread: true, + CanGPU: true, + + BaseMinMemory: 64 << 30, // params + }, + abi.RegisteredProof_StackedDRG512MiBSeal: Resources{ + MaxMemory: 3 << 29, // 1.5G + MinMemory: 1 << 30, + + MultiThread: false, // This is fine + CanGPU: true, + + BaseMinMemory: 10 << 30, + }, + }, +} + +func init() { + // for now we just reuse params for 2kib and 8mib from 512mib + + for taskType := range ResourceTable { + ResourceTable[taskType][abi.RegisteredProof_StackedDRG8MiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal] + ResourceTable[taskType][abi.RegisteredProof_StackedDRG2KiBSeal] = ResourceTable[taskType][abi.RegisteredProof_StackedDRG512MiBSeal] + } +} diff --git a/roprov.go b/roprov.go new file mode 100644 index 000000000..99723e181 --- /dev/null +++ b/roprov.go @@ -0,0 +1,25 @@ +package sectorstorage + +import ( + "context" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" + + "github.com/filecoin-project/specs-actors/actors/abi" + "golang.org/x/xerrors" +) + +type readonlyProvider struct { + stor *stores.Local +} + +func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { + if allocate != 0 { + return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage") + } + + p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing) + + return p, done, err +} diff --git a/sched.go b/sched.go new file mode 100644 index 000000000..adf2e6cd3 --- /dev/null +++ b/sched.go @@ -0,0 +1,242 @@ +package sectorstorage + +import ( + "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" + "github.com/filecoin-project/specs-actors/actors/abi" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" +) + +const mib = 1 << 20 + +type workerRequest struct { + taskType sealtasks.TaskType + accept []workerID // ordered by preference + + ret chan<- workerResponse + cancel <-chan struct{} +} + +type workerResponse struct { + err error + + worker Worker + done func() +} + +func (r *workerRequest) respond(resp workerResponse) { + select { + case r.ret <- resp: + case <-r.cancel: + log.Warnf("request got cancelled before we could respond") + if resp.done != nil { + resp.done() + } + } +} + +type workerHandle struct { + w Worker + + info api.WorkerInfo + + memUsedMin uint64 + memUsedMax uint64 + gpuUsed bool + cpuUse int // -1 - multicore thing; 0 - free; 1+ - singlecore things +} + +func (m *Manager) runSched() { + for { + select { + case w := <-m.newWorkers: + m.schedNewWorker(w) + case req := <-m.schedule: + resp, err := m.maybeSchedRequest(req) + if err != nil { + req.respond(workerResponse{err: err}) + continue + } + + if resp != nil { + req.respond(*resp) + continue + } + + m.schedQueue.PushBack(req) + case wid := <-m.workerFree: + m.onWorkerFreed(wid) + } + } +} + +func (m *Manager) onWorkerFreed(wid workerID) { + for e := m.schedQueue.Front(); e != nil; e = e.Next() { + req := e.Value.(*workerRequest) + var ok bool + for _, id := range req.accept { + if id == wid { + ok = true + break + } + } + if !ok { + continue + } + + resp, err := m.maybeSchedRequest(req) + if err != nil { + req.respond(workerResponse{err: err}) + continue + } + + if resp != nil { + req.respond(*resp) + + pe := e.Prev() + m.schedQueue.Remove(e) + if pe == nil { + pe = m.schedQueue.Front() + } + if pe == nil { + break + } + e = pe + continue + } + } +} + +func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error) { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + tried := 0 + + for _, id := range req.accept { + w, ok := m.workers[id] + if !ok { + log.Warnf("requested worker %d is not in scheduler", id) + } + tried++ + + canDo, err := m.canHandleRequest(id, w, req) + if err != nil { + return nil, err + } + + if !canDo { + continue + } + + return m.makeResponse(id, w, req), nil + } + + if tried == 0 { + return nil, xerrors.New("maybeSchedRequest didn't find any good workers") + } + + return nil, nil // put in waiting queue +} + +func (m *Manager) makeResponse(wid workerID, w *workerHandle, req *workerRequest) *workerResponse { + needRes := ResourceTable[req.taskType][m.scfg.SealProofType] + + w.gpuUsed = needRes.CanGPU + if needRes.MultiThread { + w.cpuUse = -1 + } else { + if w.cpuUse != -1 { + w.cpuUse++ + } else { + log.Warnf("sched: makeResponse for worker %d: worker cpu is in multicore use, but a single core task was scheduled", wid) + } + } + + w.memUsedMin += needRes.MinMemory + w.memUsedMax += needRes.MaxMemory + + return &workerResponse{ + err: nil, + worker: w.w, + done: func() { + m.workersLk.Lock() + + if needRes.CanGPU { + w.gpuUsed = false + } + + if needRes.MultiThread { + w.cpuUse = 0 + } else if w.cpuUse != -1 { + w.cpuUse-- + } + + w.memUsedMin -= needRes.MinMemory + w.memUsedMax -= needRes.MaxMemory + + m.workersLk.Unlock() + + select { + case m.workerFree <- wid: + case <-m.closing: + } + }, + } +} + +func (m *Manager) canHandleRequest(wid workerID, w *workerHandle, req *workerRequest) (bool, error) { + needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType] + if !ok { + return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType) + } + + res := w.info.Resources + + // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) + minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory + if minNeedMem > res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + return false, nil + } + + maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + if m.scfg.SealProofType == abi.RegisteredProof_StackedDRG32GiBSeal { + maxNeedMem += MaxCachingOverhead + } + if maxNeedMem > res.MemSwap+res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + return false, nil + } + + if needRes.MultiThread { + if w.cpuUse != 0 { + log.Debugf("sched: not scheduling on worker %d; multicore process needs free CPU", wid) + return false, nil + } + } else { + if w.cpuUse == -1 { + log.Debugf("sched: not scheduling on worker %d; CPU in use by a multicore process", wid) + return false, nil + } + } + + if len(res.GPUs) > 0 && needRes.CanGPU { + if w.gpuUsed { + log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + return false, nil + } + } + + return true, nil +} + +func (m *Manager) schedNewWorker(w *workerHandle) { + m.workersLk.Lock() + defer m.workersLk.Unlock() + + id := m.nextWorker + m.workers[id] = w + m.nextWorker++ +} diff --git a/sealtasks/task.go b/sealtasks/task.go new file mode 100644 index 000000000..8eefa14fa --- /dev/null +++ b/sealtasks/task.go @@ -0,0 +1,13 @@ +package sealtasks + +type TaskType string + +const ( + TTAddPiece TaskType = "seal/v0/addpiece" + TTPreCommit1 TaskType = "seal/v0/precommit/1" + TTPreCommit2 TaskType = "seal/v0/precommit/2" + TTCommit1 TaskType = "seal/v0/commit/1" // NOTE: We use this to transfer the sector into miner-local storage for now; Don't use on workers! + TTCommit2 TaskType = "seal/v0/commit/2" + + TTFinalize TaskType = "seal/v0/finalize" +) diff --git a/sectorutil/utils.go b/sectorutil/utils.go new file mode 100644 index 000000000..01862b7b4 --- /dev/null +++ b/sectorutil/utils.go @@ -0,0 +1,56 @@ +package sectorutil + +import ( + "fmt" + "github.com/filecoin-project/go-sectorbuilder" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +func ParseSectorID(baseName string) (abi.SectorID, error) { + var n abi.SectorNumber + var mid abi.ActorID + read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n) + if err != nil { + return abi.SectorID{}, xerrors.Errorf(": %w", err) + } + + if read != 2 { + return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read) + } + + return abi.SectorID{ + Miner: mid, + Number: n, + }, nil +} + +func SectorName(sid abi.SectorID) string { + return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number) +} + +func PathByType(sps sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType) string { + switch fileType { + case sectorbuilder.FTUnsealed: + return sps.Unsealed + case sectorbuilder.FTSealed: + return sps.Sealed + case sectorbuilder.FTCache: + return sps.Cache + } + + panic("requested unknown path type") +} + +func SetPathByType(sps *sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType, p string) { + switch fileType { + case sectorbuilder.FTUnsealed: + sps.Unsealed = p + case sectorbuilder.FTSealed: + sps.Sealed = p + case sectorbuilder.FTCache: + sps.Cache = p + } +} diff --git a/stores/http_handler.go b/stores/http_handler.go new file mode 100644 index 000000000..daa81061e --- /dev/null +++ b/stores/http_handler.go @@ -0,0 +1,125 @@ +package stores + +import ( + "io" + "net/http" + "os" + + "github.com/gorilla/mux" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/lib/tarutil" + "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" +) + +var log = logging.Logger("stores") + +type FetchHandler struct { + *Local +} + +func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/ + mux := mux.NewRouter() + + mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET") + mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE") + + mux.ServeHTTP(w, r) +} + +func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) { + log.Infof("SERVE GET %s", r.URL) + vars := mux.Vars(r) + + id, err := sectorutil.ParseSectorID(vars["id"]) + if err != nil { + log.Error("%+v", err) + w.WriteHeader(500) + return + } + + ft, err := ftFromString(vars["type"]) + if err != nil { + log.Error("%+v", err) + return + } + paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, 0, false) + if err != nil { + log.Error("%+v", err) + return + } + defer done() + + path := sectorutil.PathByType(paths, ft) + if path == "" { + log.Error("acquired path was empty") + w.WriteHeader(500) + return + } + + stat, err := os.Stat(path) + if err != nil { + log.Error("%+v", err) + w.WriteHeader(500) + return + } + + var rd io.Reader + if stat.IsDir() { + rd, err = tarutil.TarDirectory(path) + w.Header().Set("Content-Type", "application/x-tar") + } else { + rd, err = os.OpenFile(path, os.O_RDONLY, 0644) + w.Header().Set("Content-Type", "application/octet-stream") + } + if err != nil { + log.Error("%+v", err) + w.WriteHeader(500) + return + } + + w.WriteHeader(200) + if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small + log.Error("%+v", err) + return + } +} + +func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) { + log.Infof("SERVE DELETE %s", r.URL) + vars := mux.Vars(r) + + id, err := sectorutil.ParseSectorID(vars["id"]) + if err != nil { + log.Error("%+v", err) + w.WriteHeader(500) + return + } + + ft, err := ftFromString(vars["type"]) + if err != nil { + log.Error("%+v", err) + return + } + + if err := handler.delete(r.Context(), id, ft); err != nil { + log.Error("%+v", err) + w.WriteHeader(500) + return + } +} + +func ftFromString(t string) (sectorbuilder.SectorFileType, error) { + switch t { + case sectorbuilder.FTUnsealed.String(): + return sectorbuilder.FTUnsealed, nil + case sectorbuilder.FTSealed.String(): + return sectorbuilder.FTSealed, nil + case sectorbuilder.FTCache.String(): + return sectorbuilder.FTCache, nil + default: + return 0, xerrors.Errorf("unknown sector file type: '%s'", t) + } +} diff --git a/stores/index.go b/stores/index.go new file mode 100644 index 000000000..e508171b7 --- /dev/null +++ b/stores/index.go @@ -0,0 +1,312 @@ +package stores + +import ( + "context" + "net/url" + gopath "path" + "sort" + "sync" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" +) + +// ID identifies sector storage by UUID. One sector storage should map to one +// filesystem, local or networked / shared by multiple machines +type ID string + +type StorageInfo struct { + ID ID + URLs []string // TODO: Support non-http transports + Weight uint64 + + CanSeal bool + CanStore bool +} + +type SectorIndex interface { // part of storage-miner api + StorageAttach(context.Context, StorageInfo, FsStat) error + StorageInfo(context.Context, ID) (StorageInfo, error) + // TODO: StorageUpdateStats(FsStat) + + StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error + StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error + StorageFindSector(ctx context.Context, sector abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) + + StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) +} + +type Decl struct { + abi.SectorID + sectorbuilder.SectorFileType +} + +type storageEntry struct { + info *StorageInfo + fsi FsStat +} + +type Index struct { + lk sync.RWMutex + + sectors map[Decl][]ID + stores map[ID]*storageEntry +} + +func NewIndex() *Index { + return &Index{ + sectors: map[Decl][]ID{}, + stores: map[ID]*storageEntry{}, + } +} + +func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) { + byID := map[ID]map[abi.SectorID]sectorbuilder.SectorFileType{} + + for id := range i.stores { + byID[id] = map[abi.SectorID]sectorbuilder.SectorFileType{} + } + for decl, ids := range i.sectors { + for _, id := range ids { + byID[id][decl.SectorID] |= decl.SectorFileType + } + } + + out := map[ID][]Decl{} + for id, m := range byID { + out[id] = []Decl{} + for sectorID, fileType := range m { + out[id] = append(out[id], Decl{ + SectorID: sectorID, + SectorFileType: fileType, + }) + } + } + + return out, nil +} + +func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) error { + i.lk.Lock() + defer i.lk.Unlock() + + log.Infof("New sector storage: %s", si.ID) + + if _, ok := i.stores[si.ID]; ok { + for _, u := range si.URLs { + if _, err := url.Parse(u); err != nil { + return xerrors.Errorf("failed to parse url %s: %w", si.URLs, err) + } + } + + i.stores[si.ID].info.URLs = append(i.stores[si.ID].info.URLs, si.URLs...) + return nil + } + i.stores[si.ID] = &storageEntry{ + info: &si, + fsi: st, + } + return nil +} + +func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { + i.lk.Lock() + defer i.lk.Unlock() + + for _, fileType := range pathTypes { + if fileType&ft == 0 { + continue + } + + d := Decl{s, fileType} + + for _, sid := range i.sectors[d] { + if sid == storageId { + log.Warnf("sector %v redeclared in %s", storageId) + return nil + } + } + + i.sectors[d] = append(i.sectors[d], storageId) + } + + return nil +} + +func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { + i.lk.Lock() + defer i.lk.Unlock() + + for _, fileType := range pathTypes { + if fileType&ft == 0 { + continue + } + + d := Decl{s, fileType} + + if len(i.sectors[d]) == 0 { + return nil + } + + rewritten := make([]ID, 0, len(i.sectors[d])-1) + for _, sid := range i.sectors[d] { + if sid == storageId { + continue + } + + rewritten = append(rewritten, sid) + } + if len(rewritten) == 0 { + delete(i.sectors, d) + return nil + } + + i.sectors[d] = rewritten + } + + return nil +} + +func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) { + i.lk.RLock() + defer i.lk.RUnlock() + + storageIDs := map[ID]uint64{} + + for _, pathType := range pathTypes { + if ft&pathType == 0 { + continue + } + + for _, id := range i.sectors[Decl{s, pathType}] { + storageIDs[id]++ + } + } + + out := make([]StorageInfo, 0, len(storageIDs)) + + for id, n := range storageIDs { + st, ok := i.stores[id] + if !ok { + log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s) + continue + } + + urls := make([]string, len(st.info.URLs)) + for k, u := range st.info.URLs { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + + rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s)) + urls[k] = rl.String() + } + + out = append(out, StorageInfo{ + ID: id, + URLs: urls, + Weight: st.info.Weight * n, // storage with more sector types is better + CanSeal: st.info.CanSeal, + CanStore: st.info.CanStore, + }) + } + + if allowFetch { + for id, st := range i.stores { + if _, ok := storageIDs[id]; ok { + continue + } + + urls := make([]string, len(st.info.URLs)) + for k, u := range st.info.URLs { + rl, err := url.Parse(u) + if err != nil { + return nil, xerrors.Errorf("failed to parse url: %w", err) + } + + rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s)) + urls[k] = rl.String() + } + + out = append(out, StorageInfo{ + ID: id, + URLs: urls, + Weight: st.info.Weight * 0, // TODO: something better than just '0' + CanSeal: st.info.CanSeal, + CanStore: st.info.CanStore, + }) + } + } + + return out, nil +} + +func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) { + i.lk.RLock() + defer i.lk.RUnlock() + + si, found := i.stores[id] + if !found { + return StorageInfo{}, xerrors.Errorf("sector store not found") + } + + return *si.info, nil +} + +func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) { + i.lk.RLock() + defer i.lk.RUnlock() + + var candidates []storageEntry + + for _, p := range i.stores { + if sealing && !p.info.CanSeal { + log.Debugf("alloc: not considering %s; can't seal", p.info.ID) + continue + } + if !sealing && !p.info.CanStore { + log.Debugf("alloc: not considering %s; can't store", p.info.ID) + continue + } + + // TODO: filter out of space + + candidates = append(candidates, *p) + } + + if len(candidates) == 0 { + return nil, xerrors.New("no good path found") + } + + sort.Slice(candidates, func(i, j int) bool { + iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Free)), big.NewInt(int64(candidates[i].info.Weight))) + jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Free)), big.NewInt(int64(candidates[j].info.Weight))) + + return iw.GreaterThan(jw) + }) + + out := make([]StorageInfo, len(candidates)) + for i, candidate := range candidates { + out[i] = *candidate.info + } + + return out, nil +} + +func (i *Index) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]ID, error) { + i.lk.RLock() + defer i.lk.RUnlock() + + return i.sectors[Decl{ + SectorID: id, + SectorFileType: typ, + }], nil +} + +var _ SectorIndex = &Index{} diff --git a/stores/interface.go b/stores/interface.go new file mode 100644 index 000000000..67c18b16e --- /dev/null +++ b/stores/interface.go @@ -0,0 +1,32 @@ +package stores + +import ( + "context" + "syscall" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type Store interface { + AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error) +} + +type FsStat struct { + Capacity uint64 + Free uint64 // Free to use for sector storage +} + +func Stat(path string) (FsStat, error) { + var stat syscall.Statfs_t + if err := syscall.Statfs(path, &stat); err != nil { + return FsStat{}, xerrors.Errorf("statfs: %w", err) + } + + return FsStat{ + Capacity: stat.Blocks * uint64(stat.Bsize), + Free: stat.Bavail * uint64(stat.Bsize), + }, nil +} diff --git a/stores/local.go b/stores/local.go new file mode 100644 index 000000000..a8eb53ee8 --- /dev/null +++ b/stores/local.go @@ -0,0 +1,314 @@ +package stores + +import ( + "context" + "encoding/json" + "io/ioutil" + "math/bits" + "os" + "path/filepath" + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" +) + +type StoragePath struct { + ID ID + Weight uint64 + + LocalPath string + + CanSeal bool + CanStore bool +} + +// [path]/sectorstore.json +type LocalStorageMeta struct { + ID ID + Weight uint64 // 0 = readonly + + CanSeal bool + CanStore bool +} + +type LocalStorage interface { + GetStorage() (config.StorageConfig, error) + SetStorage(func(*config.StorageConfig)) error +} + +const MetaFile = "sectorstore.json" + +var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache} + +type Local struct { + localStorage LocalStorage + index SectorIndex + urls []string + + paths map[ID]*path + + localLk sync.RWMutex +} + +type path struct { + local string // absolute local path +} + +func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { + l := &Local{ + localStorage: ls, + index: index, + urls: urls, + + paths: map[ID]*path{}, + } + return l, l.open(ctx) +} + +func (st *Local) OpenPath(ctx context.Context, p string) error { + st.localLk.Lock() + defer st.localLk.Unlock() + + mb, err := ioutil.ReadFile(filepath.Join(p, MetaFile)) + if err != nil { + return xerrors.Errorf("reading storage metadata for %s: %w", p, err) + } + + var meta LocalStorageMeta + if err := json.Unmarshal(mb, &meta); err != nil { + return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err) + } + + // TODO: Check existing / dedupe + + out := &path{ + local: p, + } + + fst, err := Stat(p) + if err != nil { + return err + } + + err = st.index.StorageAttach(ctx, StorageInfo{ + ID: meta.ID, + URLs: st.urls, + Weight: meta.Weight, + CanSeal: meta.CanSeal, + CanStore: meta.CanStore, + }, fst) + if err != nil { + return xerrors.Errorf("declaring storage in index: %w", err) + } + + for _, t := range pathTypes { + ents, err := ioutil.ReadDir(filepath.Join(p, t.String())) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(filepath.Join(p, t.String()), 0755); err != nil { + return xerrors.Errorf("openPath mkdir '%s': %w", filepath.Join(p, t.String()), err) + } + + continue + } + return xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err) + } + + for _, ent := range ents { + sid, err := sectorutil.ParseSectorID(ent.Name()) + if err != nil { + return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) + } + + if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t); err != nil { + return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err) + } + } + } + + st.paths[meta.ID] = out + + return nil +} + +func (st *Local) open(ctx context.Context) error { + cfg, err := st.localStorage.GetStorage() + if err != nil { + return xerrors.Errorf("getting local storage config: %w", err) + } + + for _, path := range cfg.StoragePaths { + err := st.OpenPath(ctx, path.Path) + if err != nil { + return xerrors.Errorf("opening path %s: %w", path.Path, err) + } + } + + return nil +} + +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { + if existing|allocate != existing^allocate { + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + } + + st.localLk.RLock() + + var out sectorbuilder.SectorPaths + var storageIDs sectorbuilder.SectorPaths + + for _, fileType := range pathTypes { + if fileType&existing == 0 { + continue + } + + si, err := st.index.StorageFindSector(ctx, sid, fileType, false) + if err != nil { + log.Warnf("finding existing sector %d(t:%d) failed: %+v", sid, fileType, err) + continue + } + + for _, info := range si { + p, ok := st.paths[info.ID] + if !ok { + continue + } + + if p.local == "" { // TODO: can that even be the case? + continue + } + + spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) + sectorutil.SetPathByType(&out, fileType, spath) + sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID)) + + existing ^= fileType + break + } + } + + for _, fileType := range pathTypes { + if fileType&allocate == 0 { + continue + } + + sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing) + if err != nil { + st.localLk.RUnlock() + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) + } + + var best string + var bestID ID + + for _, si := range sis { + p, ok := st.paths[si.ID] + if !ok { + continue + } + + if p.local == "" { // TODO: can that even be the case? + continue + } + + if sealing && !si.CanSeal { + continue + } + + if !sealing && !si.CanStore { + continue + } + + // TODO: Check free space + + best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) + bestID = si.ID + } + + if best == "" { + st.localLk.RUnlock() + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") + } + + sectorutil.SetPathByType(&out, fileType, best) + sectorutil.SetPathByType(&storageIDs, fileType, string(bestID)) + allocate ^= fileType + } + + return out, storageIDs, st.localLk.RUnlock, nil +} + +func (st *Local) Local(ctx context.Context) ([]StoragePath, error) { + st.localLk.RLock() + defer st.localLk.RUnlock() + + var out []StoragePath + for id, p := range st.paths { + if p.local == "" { + continue + } + + si, err := st.index.StorageInfo(ctx, id) + if err != nil { + return nil, xerrors.Errorf("get storage info for %s: %w", id, err) + } + + out = append(out, StoragePath{ + ID: id, + Weight: si.Weight, + LocalPath: p.local, + CanSeal: si.CanSeal, + CanStore: si.CanStore, + }) + } + + return out, nil +} + +func (st *Local) delete(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error { + if bits.OnesCount(uint(typ)) != 1 { + return xerrors.New("delete expects one file type") + } + + si, err := st.index.StorageFindSector(ctx, sid, typ, false) + if err != nil { + return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err) + } + + for _, info := range si { + p, ok := st.paths[info.ID] + if !ok { + continue + } + + if p.local == "" { // TODO: can that even be the case? + continue + } + + spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid)) + log.Infof("remove %s", spath) + + if err := os.RemoveAll(spath); err != nil { + log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) + } + } + + return nil +} + +func (st *Local) FsStat(id ID) (FsStat, error) { + st.localLk.RLock() + defer st.localLk.RUnlock() + + p, ok := st.paths[id] + if !ok { + return FsStat{}, xerrors.Errorf("fsstat: path not found") + } + + return Stat(p.local) +} diff --git a/stores/remote.go b/stores/remote.go new file mode 100644 index 000000000..7bbd6d225 --- /dev/null +++ b/stores/remote.go @@ -0,0 +1,204 @@ +package stores + +import ( + "context" + "mime" + "net/http" + "os" + "sort" + "sync" + + "github.com/hashicorp/go-multierror" + files "github.com/ipfs/go-ipfs-files" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/lib/tarutil" + "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" +) + +type Remote struct { + local *Local + index SectorIndex + auth http.Header + + fetchLk sync.Mutex // TODO: this can be much smarter + // TODO: allow multiple parallel fetches + // (make sure to not fetch the same sector data twice) +} + +func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { + return &Remote{ + local: local, + index: index, + auth: auth, + } +} + +func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { + if existing|allocate != existing^allocate { + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + } + + r.fetchLk.Lock() + defer r.fetchLk.Unlock() + + paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing) + if err != nil { + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) + } + + for _, fileType := range pathTypes { + if fileType&existing == 0 { + continue + } + + if sectorutil.PathByType(paths, fileType) != "" { + continue + } + + ap, storageID, url, foundIn, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) + if err != nil { + done() + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err + } + + done = mergeDone(done, rdone) + sectorutil.SetPathByType(&paths, fileType, ap) + sectorutil.SetPathByType(&stores, fileType, string(storageID)) + + if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { + log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) + continue + } + + // TODO: some way to allow having duplicated sectors in the system for perf + if err := r.index.StorageDropSector(ctx, foundIn, s, fileType); err != nil { + log.Warnf("dropping sector %v from %s from sector index failed: %+v", s, storageID, err) + } + + if err := r.deleteFromRemote(url); err != nil { + log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) + } + } + + return paths, stores, done, nil +} + +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, ID, func(), error) { + si, err := r.index.StorageFindSector(ctx, s, fileType, false) + if err != nil { + return "", "", "", "", nil, err + } + + sort.Slice(si, func(i, j int) bool { + return si[i].Weight < si[j].Weight + }) + + apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing) + if err != nil { + return "", "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) + } + dest := sectorutil.PathByType(apaths, fileType) + storageID := sectorutil.PathByType(ids, fileType) + + var merr error + for _, info := range si { + for _, url := range info.URLs { + err := r.fetch(url, dest) + if err != nil { + merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err)) + continue + } + + if merr != nil { + log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr) + } + return dest, ID(storageID), url, info.ID, done, nil + } + } + + done() + return "", "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) +} + +func (r *Remote) fetch(url, outname string) error { + log.Infof("Fetch %s -> %s", url, outname) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return xerrors.Errorf("request: %w", err) + } + req.Header = r.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return xerrors.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 code: %d", resp.StatusCode) + } + + /*bar := pb.New64(w.sizeForType(typ)) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES + + barreader := bar.NewProxyReader(resp.Body) + + bar.Start() + defer bar.Finish()*/ + + mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return xerrors.Errorf("parse media type: %w", err) + } + + if err := os.RemoveAll(outname); err != nil { + return xerrors.Errorf("removing dest: %w", err) + } + + switch mediatype { + case "application/x-tar": + return tarutil.ExtractTar(resp.Body, outname) + case "application/octet-stream": + return files.WriteTo(files.NewReaderFile(resp.Body), outname) + default: + return xerrors.Errorf("unknown content type: '%s'", mediatype) + } +} + +func (r *Remote) deleteFromRemote(url string) error { + log.Infof("Delete %s", url) + + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + return xerrors.Errorf("request: %w", err) + } + req.Header = r.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return xerrors.Errorf("do request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 code: %d", resp.StatusCode) + } + + return nil +} + +func mergeDone(a func(), b func()) func() { + return func() { + a() + b() + } +} + +var _ Store = &Remote{} diff --git a/worker_local.go b/worker_local.go new file mode 100644 index 000000000..d691f150e --- /dev/null +++ b/worker_local.go @@ -0,0 +1,198 @@ +package sectorstorage + +import ( + "context" + "io" + "os" + + "github.com/elastic/go-sysinfo" + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + storage2 "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" + "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" +) + +var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache} + +type WorkerConfig struct { + SealProof abi.RegisteredProof + TaskTypes []sealtasks.TaskType +} + +type LocalWorker struct { + scfg *sectorbuilder.Config + storage stores.Store + localStore *stores.Local + sindex stores.SectorIndex + + acceptTasks map[sealtasks.TaskType]struct{} +} + +func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker { + ppt, err := wcfg.SealProof.RegisteredPoStProof() + if err != nil { + panic(err) + } + + acceptTasks := map[sealtasks.TaskType]struct{}{} + for _, taskType := range wcfg.TaskTypes { + acceptTasks[taskType] = struct{}{} + } + + return &LocalWorker{ + scfg: §orbuilder.Config{ + SealProofType: wcfg.SealProof, + PoStProofType: ppt, + }, + storage: store, + localStore: local, + sindex: sindex, + + acceptTasks: acceptTasks, + } +} + +type localWorkerPathProvider struct { + w *LocalWorker +} + +func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { + paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing) + if err != nil { + return sectorbuilder.SectorPaths{}, nil, err + } + + log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) + + return paths, func() { + done() + + for _, fileType := range pathTypes { + if fileType&allocate == 0 { + continue + } + + sid := sectorutil.PathByType(storageIDs, fileType) + + if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil { + log.Errorf("declare sector error: %+v", err) + } + } + }, nil +} + +func (l *LocalWorker) sb() (sectorbuilder.Basic, error) { + return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg) +} + +func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error { + sb, err := l.sb() + if err != nil { + return err + } + + return sb.NewSector(ctx, sector) +} + +func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { + sb, err := l.sb() + if err != nil { + return abi.PieceInfo{}, err + } + + return sb.AddPiece(ctx, sector, epcs, sz, r) +} + +func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) { + sb, err := l.sb() + if err != nil { + return nil, err + } + + return sb.SealPreCommit1(ctx, sector, ticket, pieces) +} + +func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) { + sb, err := l.sb() + if err != nil { + return storage2.SectorCids{}, err + } + + return sb.SealPreCommit2(ctx, sector, phase1Out) +} + +func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) { + sb, err := l.sb() + if err != nil { + return nil, err + } + + return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids) +} + +func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) { + sb, err := l.sb() + if err != nil { + return nil, err + } + + return sb.SealCommit2(ctx, sector, phase1Out) +} + +func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error { + sb, err := l.sb() + if err != nil { + return err + } + + return sb.FinalizeSector(ctx, sector) +} + +func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) { + return l.acceptTasks, nil +} + +func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { + return l.localStore.Local(ctx) +} + +func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) { + hostname, err := os.Hostname() // TODO: allow overriding from config + if err != nil { + panic(err) + } + + gpus, err := ffi.GetGPUDevices() + if err != nil { + log.Errorf("getting gpu devices failed: %+v", err) + } + + h, err := sysinfo.Host() + if err != nil { + return api.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err) + } + + mem, err := h.Memory() + if err != nil { + return api.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) + } + + return api.WorkerInfo{ + Hostname: hostname, + Resources: api.WorkerResources{ + MemPhysical: mem.Total, + MemSwap: mem.VirtualTotal, + MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process + GPUs: gpus, + }, + }, nil +} + +var _ Worker = &LocalWorker{} diff --git a/worker_remote.go b/worker_remote.go new file mode 100644 index 000000000..f49ea4dc6 --- /dev/null +++ b/worker_remote.go @@ -0,0 +1,45 @@ +package sectorstorage + +import ( + "context" + "net/http" + + "github.com/filecoin-project/specs-actors/actors/abi" + storage2 "github.com/filecoin-project/specs-storage/storage" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/client" +) + +type remote struct { + api.WorkerApi +} + +func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error { + return xerrors.New("unsupported") +} + +func (r *remote) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) { + return abi.PieceInfo{}, xerrors.New("unsupported") +} + +func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, error) { + token, err := fa.AuthNew(ctx, []api.Permission{"admin"}) + if err != nil { + return nil, xerrors.Errorf("creating auth token for remote connection: %w", err) + } + + headers := http.Header{} + headers.Add("Authorization", "Bearer "+string(token)) + + wapi, close, err := client.NewWorkerRPC(url, headers) + if err != nil { + return nil, xerrors.Errorf("creating jsonrpc client: %w", err) + } + _ = close // TODO + + return &remote{wapi}, nil +} + +var _ Worker = &remote{}