diff --git a/node/modules/storageminer_svc.go b/node/modules/storageminer_svc.go index dec045309..c1e086d11 100644 --- a/node/modules/storageminer_svc.go +++ b/node/modules/storageminer_svc.go @@ -2,14 +2,25 @@ package modules import ( "context" + "strings" "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" + "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/chain/types" cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/provider/lpmarket" + "github.com/filecoin-project/lotus/provider/lpmarket/fakelm" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -18,8 +29,82 @@ type MinerStorageService api.StorageMiner var _ sectorblocks.SectorBuilder = *new(MinerSealingService) -func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) { +func connectHarmony(apiInfo string, fapi v1api.FullNode, mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) { + hc := config.HarmonyDB{} + + // apiInfo - harmony:maddr:user:pass:dbname:host:port + + parts := strings.Split(apiInfo, ":") + + if len(parts) != 7 { + return nil, xerrors.Errorf("invalid harmonydb info") + } + + maddr, err := address.NewFromString(parts[1]) + if err != nil { + return nil, xerrors.Errorf("parsing miner address: %w", err) + } + + hc.Username = parts[2] + hc.Password = parts[3] + hc.Database = parts[4] + hc.Hosts = []string{parts[5]} + hc.Port = parts[6] + + db, err := harmonydb.NewFromConfig(hc) + if err != nil { + return nil, xerrors.Errorf("connecting to harmonydb: %w", err) + } + + pin := lpmarket.NewPieceIngester(db, fapi) + + si := paths.NewDBIndex(nil, db) + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return nil, xerrors.Errorf("getting miner id: %w", err) + } + + mi, err := fapi.StateMinerInfo(mctx, maddr, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting miner info: %w", err) + } + + lp := fakelm.NewLMRPCProvider(si, maddr, abi.ActorID(mid), mi.SectorSize, pin) + + ast := api.StorageMinerStruct{} + + ast.Internal.ActorAddress = lp.ActorAddress + ast.Internal.WorkerJobs = lp.WorkerJobs + ast.Internal.SectorsStatus = lp.SectorsStatus + ast.Internal.SectorsList = lp.SectorsList + ast.Internal.SectorsSummary = lp.SectorsSummary + ast.Internal.SectorsListInStates = lp.SectorsListInStates + ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal + ast.Internal.ComputeDataCid = lp.ComputeDataCid + ast.Internal.SectorAddPieceToAny = lp.SectorAddPieceToAny + + ast.Internal.StorageList = si.StorageList + ast.Internal.StorageDetach = si.StorageDetach + ast.Internal.StorageReportHealth = si.StorageReportHealth + ast.Internal.StorageDeclareSector = si.StorageDeclareSector + ast.Internal.StorageDropSector = si.StorageDropSector + ast.Internal.StorageFindSector = si.StorageFindSector + ast.Internal.StorageInfo = si.StorageInfo + ast.Internal.StorageBestAlloc = si.StorageBestAlloc + ast.Internal.StorageLock = si.StorageLock + ast.Internal.StorageTryLock = si.StorageTryLock + ast.Internal.StorageGetLocks = si.StorageGetLocks + + return &ast, nil +} + +func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { + if strings.HasPrefix("harmony:", apiInfo) { + return connectHarmony(apiInfo, fapi, mctx, lc) + } + ctx := helpers.LifecycleCtx(mctx, lc) info := cliutil.ParseApiInfo(apiInfo) addr, err := info.DialArgs("v0") @@ -55,16 +140,16 @@ func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lif } } -func ConnectSealingService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle) (MinerSealingService, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle) (MinerSealingService, error) { +func ConnectSealingService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerSealingService, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerSealingService, error) { log.Info("Connecting sealing service to miner") - return connectMinerService(apiInfo)(mctx, lc) + return connectMinerService(apiInfo)(mctx, lc, fapi) } } -func ConnectStorageService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle) (MinerStorageService, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle) (MinerStorageService, error) { +func ConnectStorageService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerStorageService, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerStorageService, error) { log.Info("Connecting storage service to miner") - return connectMinerService(apiInfo)(mctx, lc) + return connectMinerService(apiInfo)(mctx, lc, fapi) } } diff --git a/provider/lpmarket/fakelm/iface.go b/provider/lpmarket/fakelm/iface.go index 7bf20fefb..a3f647c81 100644 --- a/provider/lpmarket/fakelm/iface.go +++ b/provider/lpmarket/fakelm/iface.go @@ -2,8 +2,6 @@ package fakelm import ( "context" - "io" - "github.com/google/uuid" "github.com/filecoin-project/go-address" @@ -28,10 +26,6 @@ type MinimalLMApi interface { SectorsListInStates(context.Context, []api.SectorState) ([]abi.SectorNumber, error) StorageRedeclareLocal(context.Context, *storiface.ID, bool) error - StorageList(context.Context) (map[storiface.ID][]storiface.Decl, error) - - UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) - IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) diff --git a/provider/lpmarket/fakelm/lmimpl.go b/provider/lpmarket/fakelm/lmimpl.go index 59383c89f..b5c9370eb 100644 --- a/provider/lpmarket/fakelm/lmimpl.go +++ b/provider/lpmarket/fakelm/lmimpl.go @@ -2,7 +2,6 @@ package fakelm import ( "context" - "io" "net/http" "net/url" @@ -30,6 +29,16 @@ type LMRPCProvider struct { pi lpmarket.Ingester } +func NewLMRPCProvider(si paths.SectorIndex, maddr address.Address, minerID abi.ActorID, ssize abi.SectorSize, pi lpmarket.Ingester) *LMRPCProvider { + return &LMRPCProvider{ + si: si, + maddr: maddr, + minerID: minerID, + ssize: ssize, + pi: pi, + } +} + func (l *LMRPCProvider) ActorAddress(ctx context.Context) (address.Address, error) { return l.maddr, nil } @@ -176,7 +185,7 @@ func (l *LMRPCProvider) SectorsSummary(ctx context.Context) (map[api.SectorState } func (l *LMRPCProvider) SectorsListInStates(ctx context.Context, want []api.SectorState) ([]abi.SectorNumber, error) { - decls, err := l.StorageList(ctx) + decls, err := l.si.StorageList(ctx) if err != nil { return nil, err } @@ -237,14 +246,6 @@ func (l *LMRPCProvider) StorageRedeclareLocal(ctx context.Context, id *storiface return nil } -func (l *LMRPCProvider) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { - return l.si.StorageList(ctx) -} - -func (l *LMRPCProvider) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { - return nil, xerrors.Errorf("not supported") -} - func (l *LMRPCProvider) IsUnsealed(ctx context.Context, sectorNum abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { sectorID := abi.SectorID{Miner: l.minerID, Number: sectorNum}