diff --git a/ffiwrapper/config.go b/ffiwrapper/config.go index a2d79c410..9b1fc5f9a 100644 --- a/ffiwrapper/config.go +++ b/ffiwrapper/config.go @@ -55,3 +55,18 @@ func SectorSizeForRegisteredProof(p abi.RegisteredProof) (abi.SectorSize, error) return 0, fmt.Errorf("unsupported registered proof %d", p) } } + +func ProofTypeFromSectorSize(ssize abi.SectorSize) (abi.RegisteredProof, abi.RegisteredProof, error) { + switch ssize { + case 2 << 10: + return abi.RegisteredProof_StackedDRG2KiBPoSt, abi.RegisteredProof_StackedDRG2KiBSeal, nil + case 8 << 20: + return abi.RegisteredProof_StackedDRG8MiBPoSt, abi.RegisteredProof_StackedDRG8MiBSeal, nil + case 512 << 20: + return abi.RegisteredProof_StackedDRG512MiBPoSt, abi.RegisteredProof_StackedDRG512MiBSeal, nil + case 32 << 30: + return abi.RegisteredProof_StackedDRG32GiBPoSt, abi.RegisteredProof_StackedDRG32GiBSeal, nil + default: + return 0, 0, xerrors.Errorf("unsupported sector size for miner: %v", ssize) + } +} diff --git a/ffiwrapper/verifier_cgo.go b/ffiwrapper/verifier_cgo.go index 402e85fab..2de6137da 100644 --- a/ffiwrapper/verifier_cgo.go +++ b/ffiwrapper/verifier_cgo.go @@ -9,9 +9,10 @@ import ( "go.opencensus.io/trace" ffi "github.com/filecoin-project/filecoin-ffi" - "github.com/filecoin-project/lotus/storage/sectorstorage/stores" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" ) func (sb *Sealer) ComputeElectionPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { diff --git a/worker_local.go b/localworker.go similarity index 94% rename from worker_local.go rename to localworker.go index 24d1e14df..e71a619f3 100644 --- a/worker_local.go +++ b/localworker.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" storage2 "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" "github.com/filecoin-project/lotus/storage/sectorstorage/stores" @@ -174,7 +173,7 @@ func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { return l.localStore.Local(ctx) } -func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) { +func (l *LocalWorker) Info(context.Context) (WorkerInfo, error) { hostname, err := os.Hostname() // TODO: allow overriding from config if err != nil { panic(err) @@ -187,17 +186,17 @@ func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) { h, err := sysinfo.Host() if err != nil { - return api.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err) + return WorkerInfo{}, xerrors.Errorf("getting host info: %w", err) } mem, err := h.Memory() if err != nil { - return api.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) + return WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) } - return api.WorkerInfo{ + return WorkerInfo{ Hostname: hostname, - Resources: api.WorkerResources{ + Resources: WorkerResources{ MemPhysical: mem.Total, MemSwap: mem.VirtualTotal, MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process diff --git a/manager.go b/manager.go index 97d645484..6f9d5877e 100644 --- a/manager.go +++ b/manager.go @@ -16,8 +16,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" "github.com/filecoin-project/lotus/storage/sectorstorage/stores" @@ -37,11 +35,26 @@ type Worker interface { // Returns paths accessible to the worker Paths(context.Context) ([]stores.StoragePath, error) - Info(context.Context) (api.WorkerInfo, error) + Info(context.Context) (WorkerInfo, error) Close() error } +type WorkerInfo struct { + Hostname string + + Resources WorkerResources +} + +type WorkerResources struct { + MemPhysical uint64 + MemSwap uint64 + + MemReserved uint64 // Used by system / other processes + + GPUs []string +} + type SectorManager interface { SectorSize() abi.SectorSize @@ -76,7 +89,16 @@ type Manager struct { schedQueue *list.List // List[*workerRequest] } -func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc config.Storage, urls URLs, ca api.Common) (*Manager, error) { +type SealerConfig struct { + // Local worker config + AllowPreCommit1 bool + AllowPreCommit2 bool + AllowCommit bool +} + +type StorageAuth http.Header + +func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth) (*Manager, error) { lstor, err := stores.NewLocal(ctx, ls, si, urls) if err != nil { return nil, err @@ -87,10 +109,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg return nil, xerrors.Errorf("creating prover instance: %w", err) } - token, err := ca.AuthNew(ctx, []api.Permission{"admin"}) - headers := http.Header{} - headers.Add("Authorization", "Bearer "+string(token)) - stor := stores.NewRemote(lstor, si, headers) + stor := stores.NewRemote(lstor, si, http.Header(sa)) m := &Manager{ scfg: cfg, @@ -150,8 +169,8 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error { return xerrors.Errorf("opening local path: %w", err) } - if err := m.ls.SetStorage(func(sc *config.StorageConfig) { - sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path}) + if err := m.ls.SetStorage(func(sc *stores.StorageConfig) { + sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path}) }); err != nil { return xerrors.Errorf("get storage config: %w", err) } diff --git a/mock/mock.go b/mock/mock.go index 0591958c1..dc4ca54ef 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -17,7 +17,6 @@ import ( logging "github.com/ipfs/go-log" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/sectorstorage" "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" ) @@ -37,7 +36,7 @@ type SectorMgr struct { type mockVerif struct{} func NewMockSectorMgr(threads int, ssize abi.SectorSize) *SectorMgr { - rt, _, err := api.ProofTypeFromSectorSize(ssize) + rt, _, err := ffiwrapper.ProofTypeFromSectorSize(ssize) if err != nil { panic(err) } diff --git a/mock/preseal.go b/mock/preseal.go deleted file mode 100644 index 20a4377cd..000000000 --- a/mock/preseal.go +++ /dev/null @@ -1,63 +0,0 @@ -package mock - -import ( - "github.com/filecoin-project/go-address" - commcid "github.com/filecoin-project/go-fil-commcid" - "github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/abi/big" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/filecoin-project/specs-actors/actors/crypto" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/genesis" -) - -func PreSeal(ssize abi.SectorSize, maddr address.Address, sectors int) (*genesis.Miner, *types.KeyInfo, error) { - k, err := wallet.GenerateKey(crypto.SigTypeBLS) - if err != nil { - return nil, nil, err - } - - genm := &genesis.Miner{ - Owner: k.Address, - Worker: k.Address, - MarketBalance: big.NewInt(0), - PowerBalance: big.NewInt(0), - SectorSize: ssize, - Sectors: make([]*genesis.PreSeal, sectors), - } - - _, st, err := api.ProofTypeFromSectorSize(ssize) - if err != nil { - return nil, nil, err - } - - for i := range genm.Sectors { - preseal := &genesis.PreSeal{} - - preseal.ProofType = st - preseal.CommD = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) - d, _ := commcid.CIDToPieceCommitmentV1(preseal.CommD) - r := commDR(d) - preseal.CommR = commcid.ReplicaCommitmentV1ToCID(r[:]) - preseal.SectorID = abi.SectorNumber(i + 1) - preseal.Deal = market.DealProposal{ - PieceCID: preseal.CommD, - PieceSize: abi.PaddedPieceSize(ssize), - Client: maddr, - Provider: maddr, - StartEpoch: 1, - EndEpoch: 10000, - StoragePricePerEpoch: big.Zero(), - ProviderCollateral: big.Zero(), - ClientCollateral: big.Zero(), - } - - genm.Sectors[i] = preseal - } - - return genm, &k.KeyInfo, nil -} diff --git a/mock/util.go b/mock/util.go index e37cf3552..2d2ebbfe2 100644 --- a/mock/util.go +++ b/mock/util.go @@ -1,20 +1,6 @@ package mock -import ( - "crypto/rand" - "io" - "io/ioutil" -) - -func randB(n uint64) []byte { - b, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(n))) - if err != nil { - panic(err) - } - return b -} - -func commDR(in []byte) (out [32]byte) { +func CommDR(in []byte) (out [32]byte) { for i, b := range in { out[i] = ^b } diff --git a/resources.go b/resources.go index 4aafb5962..ab2e5170d 100644 --- a/resources.go +++ b/resources.go @@ -1,9 +1,10 @@ package sectorstorage import ( + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" "github.com/filecoin-project/lotus/storage/sectorstorage/stores" - "github.com/filecoin-project/specs-actors/actors/abi" ) var FSOverheadSeal = map[stores.SectorFileType]int{ // 10x overheads diff --git a/roprov.go b/roprov.go index dfab863ff..694bcd2b2 100644 --- a/roprov.go +++ b/roprov.go @@ -3,10 +3,11 @@ package sectorstorage import ( "context" - "github.com/filecoin-project/lotus/storage/sectorstorage/stores" + "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" - "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" ) type readonlyProvider struct { diff --git a/sched.go b/sched.go index d8e3d35a0..79f9c8971 100644 --- a/sched.go +++ b/sched.go @@ -1,11 +1,11 @@ package sectorstorage import ( - "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" - "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" ) const mib = 1 << 20 @@ -39,7 +39,7 @@ func (r *workerRequest) respond(resp workerResponse) { type workerHandle struct { w Worker - info api.WorkerInfo + info WorkerInfo memUsedMin uint64 memUsedMax uint64 diff --git a/stats.go b/stats.go index 2cae1decb..70a5f341a 100644 --- a/stats.go +++ b/stats.go @@ -1,15 +1,22 @@ package sectorstorage -import "github.com/filecoin-project/lotus/api" +type WorkerStats struct { + Info WorkerInfo -func (m *Manager) WorkerStats() map[uint64]api.WorkerStats { + MemUsedMin uint64 + MemUsedMax uint64 + GpuUsed bool + CpuUse int +} + +func (m *Manager) WorkerStats() map[uint64]WorkerStats { m.workersLk.Lock() defer m.workersLk.Unlock() - out := map[uint64]api.WorkerStats{} + out := map[uint64]WorkerStats{} for id, handle := range m.workers { - out[uint64(id)] = api.WorkerStats{ + out[uint64(id)] = WorkerStats{ Info: handle.info, MemUsedMin: handle.memUsedMin, MemUsedMax: handle.memUsedMax, diff --git a/stores/http_handler.go b/stores/http_handler.go index 21903494b..14fbe04c8 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -10,7 +10,7 @@ import ( logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/lib/tarutil" + "github.com/filecoin-project/lotus/storage/sectorstorage/tarutil" ) var log = logging.Logger("stores") diff --git a/stores/local.go b/stores/local.go index a971b61b3..281475b1c 100644 --- a/stores/local.go +++ b/stores/local.go @@ -9,10 +9,9 @@ import ( "path/filepath" "sync" - "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/specs-actors/actors/abi" ) type StoragePath struct { @@ -34,9 +33,18 @@ type LocalStorageMeta struct { CanStore bool } +// .lotusstorage/storage.json +type StorageConfig struct { + StoragePaths []LocalPath +} + +type LocalPath struct { + Path string +} + type LocalStorage interface { - GetStorage() (config.StorageConfig, error) - SetStorage(func(*config.StorageConfig)) error + GetStorage() (StorageConfig, error) + SetStorage(func(*StorageConfig)) error } const MetaFile = "sectorstore.json" diff --git a/stores/remote.go b/stores/remote.go index 14550174f..349b73722 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -19,7 +19,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/lotus/lib/tarutil" + "github.com/filecoin-project/lotus/storage/sectorstorage/tarutil" ) type Remote struct { diff --git a/tarutil/systar.go b/tarutil/systar.go new file mode 100644 index 000000000..a94354731 --- /dev/null +++ b/tarutil/systar.go @@ -0,0 +1,92 @@ +package tarutil + +import ( + "archive/tar" + "golang.org/x/xerrors" + "io" + "io/ioutil" + "os" + "path/filepath" + + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("tarutil") + +func ExtractTar(body io.Reader, dir string) error { + if err := os.MkdirAll(dir, 0755); err != nil { + return xerrors.Errorf("mkdir: %w", err) + } + + tr := tar.NewReader(body) + for { + header, err := tr.Next() + switch err { + default: + return err + case io.EOF: + return nil + + case nil: + } + + f, err := os.Create(filepath.Join(dir, header.Name)) + if err != nil { + return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err) + } + + if _, err := io.Copy(f, tr); err != nil { + return err + } + + if err := f.Close(); err != nil { + return err + } + } +} + +func TarDirectory(dir string) (io.ReadCloser, error) { + r, w := io.Pipe() + + go func() { + _ = w.CloseWithError(writeTarDirectory(dir, w)) + }() + + return r, nil +} + +func writeTarDirectory(dir string, w io.Writer) error { + tw := tar.NewWriter(w) + + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + + for _, file := range files { + h, err := tar.FileInfoHeader(file, "") + if err != nil { + return xerrors.Errorf("getting header for file %s: %w", file.Name(), err) + } + + if err := tw.WriteHeader(h); err != nil { + return xerrors.Errorf("wiritng header for file %s: %w", file.Name(), err) + } + + f, err := os.OpenFile(filepath.Join(dir, file.Name()), os.O_RDONLY, 644) + if err != nil { + return xerrors.Errorf("opening %s for reading: %w", file.Name(), err) + } + + if _, err := io.Copy(tw, f); err != nil { + return xerrors.Errorf("copy data for file %s: %w", file.Name(), err) + } + + if err := f.Close(); err != nil { + return err + } + + } + + return nil +} diff --git a/worker_remote.go b/worker_remote.go deleted file mode 100644 index ffd96f188..000000000 --- a/worker_remote.go +++ /dev/null @@ -1,51 +0,0 @@ -package sectorstorage - -import ( - "context" - "net/http" - - "github.com/filecoin-project/specs-actors/actors/abi" - storage2 "github.com/filecoin-project/specs-storage/storage" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/lib/jsonrpc" -) - -type remote struct { - api.WorkerApi - closer jsonrpc.ClientCloser -} - -func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error { - return xerrors.New("unsupported") -} - -func (r *remote) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) { - return abi.PieceInfo{}, xerrors.New("unsupported") -} - -func ConnectRemote(ctx context.Context, fa api.Common, url string) (*remote, error) { - token, err := fa.AuthNew(ctx, []api.Permission{"admin"}) - if err != nil { - return nil, xerrors.Errorf("creating auth token for remote connection: %w", err) - } - - headers := http.Header{} - headers.Add("Authorization", "Bearer "+string(token)) - - wapi, closer, err := client.NewWorkerRPC(url, headers) - if err != nil { - return nil, xerrors.Errorf("creating jsonrpc client: %w", err) - } - - return &remote{wapi, closer}, nil -} - -func (r *remote) Close() error { - r.closer() - return nil -} - -var _ Worker = &remote{}