package modules import ( "context" "strings" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/chain/types" cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/provider/lpmarket" "github.com/filecoin-project/lotus/provider/lpmarket/fakelm" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sectorblocks" ) type MinerSealingService api.StorageMiner type MinerStorageService api.StorageMiner var _ sectorblocks.SectorBuilder = *new(MinerSealingService) func connectHarmony(apiInfo string, fapi v1api.FullNode, mctx helpers.MetricsCtx, lc fx.Lifecycle) (api.StorageMiner, error) { hc := config.HarmonyDB{} // apiInfo - harmony:maddr:user:pass:dbname:host:port parts := strings.Split(apiInfo, ":") if len(parts) != 7 { return nil, xerrors.Errorf("invalid harmonydb info") } maddr, err := address.NewFromString(parts[1]) if err != nil { return nil, xerrors.Errorf("parsing miner address: %w", err) } hc.Username = parts[2] hc.Password = parts[3] hc.Database = parts[4] hc.Hosts = []string{parts[5]} hc.Port = parts[6] db, err := harmonydb.NewFromConfig(hc) if err != nil { return nil, xerrors.Errorf("connecting to harmonydb: %w", err) } pin := lpmarket.NewPieceIngester(db, fapi) si := paths.NewDBIndex(nil, db) mid, err := address.IDFromAddress(maddr) if err != nil { return nil, xerrors.Errorf("getting miner id: %w", err) } mi, err := fapi.StateMinerInfo(mctx, maddr, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("getting miner info: %w", err) } lp := fakelm.NewLMRPCProvider(si, maddr, abi.ActorID(mid), mi.SectorSize, pin) ast := api.StorageMinerStruct{} ast.Internal.ActorAddress = lp.ActorAddress ast.Internal.WorkerJobs = lp.WorkerJobs ast.Internal.SectorsStatus = lp.SectorsStatus ast.Internal.SectorsList = lp.SectorsList ast.Internal.SectorsSummary = lp.SectorsSummary ast.Internal.SectorsListInStates = lp.SectorsListInStates ast.Internal.StorageRedeclareLocal = lp.StorageRedeclareLocal ast.Internal.ComputeDataCid = lp.ComputeDataCid ast.Internal.SectorAddPieceToAny = lp.SectorAddPieceToAny ast.Internal.StorageList = si.StorageList ast.Internal.StorageDetach = si.StorageDetach ast.Internal.StorageReportHealth = si.StorageReportHealth ast.Internal.StorageDeclareSector = si.StorageDeclareSector ast.Internal.StorageDropSector = si.StorageDropSector ast.Internal.StorageFindSector = si.StorageFindSector ast.Internal.StorageInfo = si.StorageInfo ast.Internal.StorageBestAlloc = si.StorageBestAlloc ast.Internal.StorageLock = si.StorageLock ast.Internal.StorageTryLock = si.StorageTryLock ast.Internal.StorageGetLocks = si.StorageGetLocks return &ast, nil } func connectMinerService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (api.StorageMiner, error) { if strings.HasPrefix("harmony:", apiInfo) { return connectHarmony(apiInfo, fapi, mctx, lc) } ctx := helpers.LifecycleCtx(mctx, lc) info := cliutil.ParseApiInfo(apiInfo) addr, err := info.DialArgs("v0") if err != nil { return nil, xerrors.Errorf("could not get DialArgs: %w", err) } log.Infof("Checking (svc) api version of %s", addr) mapi, closer, err := client.NewStorageMinerRPCV0(ctx, addr, info.AuthHeader()) if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { v, err := mapi.Version(ctx) if err != nil { return xerrors.Errorf("checking version: %w", err) } if !v.APIVersion.EqMajorMinor(api.MinerAPIVersion0) { return xerrors.Errorf("remote service API version didn't match (expected %s, remote %s)", api.MinerAPIVersion0, v.APIVersion) } return nil }, OnStop: func(context.Context) error { closer() return nil }}) return mapi, nil } } func ConnectSealingService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerSealingService, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerSealingService, error) { log.Info("Connecting sealing service to miner") return connectMinerService(apiInfo)(mctx, lc, fapi) } } func ConnectStorageService(apiInfo string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerStorageService, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fapi v1api.FullNode) (MinerStorageService, error) { log.Info("Connecting storage service to miner") return connectMinerService(apiInfo)(mctx, lc, fapi) } }