From 5b53ccaf0c1b2e7724fa69898340b6e04024b2f0 Mon Sep 17 00:00:00 2001 From: psiphi5 <158285278+psiphi5@users.noreply.github.com> Date: Thu, 10 Oct 2024 18:31:29 +1100 Subject: [PATCH] feat(tools/cosmovisor): cosmovisor batch upgrades (#21790) Co-authored-by: Julien Robert --- tools/cosmovisor/CHANGELOG.md | 1 + tools/cosmovisor/args.go | 5 + .../cosmovisor/cmd/cosmovisor/add_upgrade.go | 69 +++++--- .../cmd/cosmovisor/batch_upgrade.go | 143 ++++++++++++++++ tools/cosmovisor/cmd/cosmovisor/root.go | 1 + tools/cosmovisor/go.mod | 4 +- tools/cosmovisor/process.go | 159 ++++++++++++++++++ 7 files changed, 354 insertions(+), 28 deletions(-) create mode 100644 tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go diff --git a/tools/cosmovisor/CHANGELOG.md b/tools/cosmovisor/CHANGELOG.md index 1285007e6d..3dd65d8055 100644 --- a/tools/cosmovisor/CHANGELOG.md +++ b/tools/cosmovisor/CHANGELOG.md @@ -38,6 +38,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Features +* [#21790](https://github.com/cosmos/cosmos-sdk/pull/21790) Add `add-batch-upgrade` command. * [#21972](https://github.com/cosmos/cosmos-sdk/pull/21972) Add `prepare-upgrade` command ### Improvements diff --git a/tools/cosmovisor/args.go b/tools/cosmovisor/args.go index 577a16f251..733f561faf 100644 --- a/tools/cosmovisor/args.go +++ b/tools/cosmovisor/args.go @@ -111,6 +111,11 @@ func (cfg *Config) UpgradeInfoFilePath() string { return filepath.Join(cfg.Home, "data", upgradetypes.UpgradeInfoFilename) } +// UpgradeInfoBatchFilePath is the same as UpgradeInfoFilePath but with a batch suffix. +func (cfg *Config) UpgradeInfoBatchFilePath() string { + return cfg.UpgradeInfoFilePath() + ".batch" +} + // SymLinkToGenesis creates a symbolic link from "./current" to the genesis directory. func (cfg *Config) SymLinkToGenesis() (string, error) { // workdir is set to cosmovisor directory so relative diff --git a/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go b/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go index a197f33bc1..3832efa05e 100644 --- a/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go +++ b/tools/cosmovisor/cmd/cosmovisor/add_upgrade.go @@ -19,7 +19,7 @@ func NewAddUpgradeCmd() *cobra.Command { Short: "Add APP upgrade binary to cosmovisor", SilenceUsage: true, Args: cobra.ExactArgs(2), - RunE: AddUpgrade, + RunE: addUpgradeCmd, } addUpgrade.Flags().Bool(cosmovisor.FlagForce, false, "overwrite existing upgrade binary / upgrade-info.json file") @@ -28,26 +28,14 @@ func NewAddUpgradeCmd() *cobra.Command { return addUpgrade } -// AddUpgrade adds upgrade info to manifest -func AddUpgrade(cmd *cobra.Command, args []string) error { - configPath, err := cmd.Flags().GetString(cosmovisor.FlagCosmovisorConfig) - if err != nil { - return fmt.Errorf("failed to get config flag: %w", err) - } - - cfg, err := cosmovisor.GetConfigFromFile(configPath) - if err != nil { - return err - } - +// addUpgrade adds upgrade info to manifest +func addUpgrade(cfg *cosmovisor.Config, force bool, upgradeHeight int64, upgradeName, executablePath, upgradeInfoPath string) error { logger := cfg.Logger(os.Stdout) - upgradeName := args[0] if !cfg.DisableRecase { - upgradeName = strings.ToLower(args[0]) + upgradeName = strings.ToLower(upgradeName) } - executablePath := args[1] if _, err := os.Stat(executablePath); err != nil { if os.IsNotExist(err) { return fmt.Errorf("invalid executable path: %w", err) @@ -68,11 +56,6 @@ func AddUpgrade(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to read binary: %w", err) } - force, err := cmd.Flags().GetBool(cosmovisor.FlagForce) - if err != nil { - return fmt.Errorf("failed to get force flag: %w", err) - } - if err := saveOrAbort(cfg.UpgradeBin(upgradeName), executableData, force); err != nil { return err } @@ -80,9 +63,7 @@ func AddUpgrade(cmd *cobra.Command, args []string) error { logger.Info(fmt.Sprintf("Using %s for %s upgrade", executablePath, upgradeName)) logger.Info(fmt.Sprintf("Upgrade binary located at %s", cfg.UpgradeBin(upgradeName))) - if upgradeHeight, err := cmd.Flags().GetInt64(cosmovisor.FlagUpgradeHeight); err != nil { - return fmt.Errorf("failed to get upgrade-height flag: %w", err) - } else if upgradeHeight > 0 { + if upgradeHeight > 0 { plan := upgradetypes.Plan{Name: upgradeName, Height: upgradeHeight} if err := plan.ValidateBasic(); err != nil { panic(fmt.Errorf("something is wrong with cosmovisor: %w", err)) @@ -94,16 +75,52 @@ func AddUpgrade(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to marshal upgrade plan: %w", err) } - if err := saveOrAbort(cfg.UpgradeInfoFilePath(), planData, force); err != nil { + if err := saveOrAbort(upgradeInfoPath, planData, force); err != nil { return err } - logger.Info(fmt.Sprintf("%s created, %s upgrade binary will switch at height %d", cfg.UpgradeInfoFilePath(), upgradeName, upgradeHeight)) + logger.Info(fmt.Sprintf("%s created, %s upgrade binary will switch at height %d", upgradeInfoPath, upgradeName, upgradeHeight)) } return nil } +// GetConfig returns a Config using passed-in flag +func getConfigFromCmd(cmd *cobra.Command) (*cosmovisor.Config, error) { + configPath, err := cmd.Flags().GetString(cosmovisor.FlagCosmovisorConfig) + if err != nil { + return nil, fmt.Errorf("failed to get config flag: %w", err) + } + + cfg, err := cosmovisor.GetConfigFromFile(configPath) + if err != nil { + return nil, err + } + return cfg, nil +} + +// addUpgradeCmd parses input flags and adds upgrade info to manifest +func addUpgradeCmd(cmd *cobra.Command, args []string) error { + cfg, err := getConfigFromCmd(cmd) + if err != nil { + return err + } + + upgradeName, executablePath := args[0], args[1] + + force, err := cmd.Flags().GetBool(cosmovisor.FlagForce) + if err != nil { + return fmt.Errorf("failed to get force flag: %w", err) + } + + upgradeHeight, err := cmd.Flags().GetInt64(cosmovisor.FlagUpgradeHeight) + if err != nil { + return fmt.Errorf("failed to get upgrade-height flag: %w", err) + } + + return addUpgrade(cfg, force, upgradeHeight, upgradeName, executablePath, cfg.UpgradeInfoFilePath()) +} + // saveOrAbort saves data to path or aborts if file exists and force is false func saveOrAbort(path string, data []byte, force bool) error { if _, err := os.Stat(path); err == nil { diff --git a/tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go b/tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go new file mode 100644 index 0000000000..a66f65b406 --- /dev/null +++ b/tools/cosmovisor/cmd/cosmovisor/batch_upgrade.go @@ -0,0 +1,143 @@ +package main + +import ( + "encoding/csv" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/spf13/cobra" + + "cosmossdk.io/tools/cosmovisor" +) + +func NewBatchAddUpgradeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "add-batch-upgrade [flags]", + Short: "Add multiple upgrade binaries at specified heights to cosmovisor", + Long: `This command allows you to specify multiple upgrades at once at specific heights, copying or creating a batch upgrade file that's actively watched during 'cosmovisor run'. +You can provide upgrades in two ways: + +1. Using --upgrade-file: Specify a path to a headerless CSV batch upgrade file in the format: + upgrade-name,path-to-exec,upgrade-height + +2. Using --upgrade-list: Provide a comma-separated list of upgrades. + Each upgrade is defined by three colon-separated values: + a. upgrade-name: A unique identifier for the upgrade + b. path-to-exec: The file path to the upgrade's executable binary + c. upgrade-height: The block height at which the upgrade should occur + This creates a batch upgrade JSON file with the upgrade-info objects in the upgrade directory. + +Note: You must provide either --upgrade-file or --upgrade-list.`, + Example: `cosmovisor add-batch-upgrade --upgrade-list upgrade_v2:/path/to/v2/binary:1000000,upgrade_v3:/path/to/v3/binary:2000000 + +cosmovisor add-batch-upgrade --upgrade-file /path/to/batch_upgrade.csv`, + SilenceUsage: true, + Args: cobra.NoArgs, + RunE: addBatchUpgrade, + } + + cmd.Flags().String("upgrade-file", "", "Path to a batch upgrade file which is a JSON array of upgrade-info objects") + cmd.Flags().StringSlice("upgrade-list", []string{}, "List of comma-separated upgrades in the format 'name:path/to/binary:height'") + cmd.MarkFlagsMutuallyExclusive("upgrade-file", "upgrade-list") + + return cmd +} + +// addBatchUpgrade takes in multiple specified upgrades and creates a single +// batch upgrade file out of them +func addBatchUpgrade(cmd *cobra.Command, args []string) error { + cfg, err := getConfigFromCmd(cmd) + if err != nil { + return err + } + upgradeFile, err := cmd.Flags().GetString("upgrade-file") + if err == nil && upgradeFile != "" { + return processUpgradeFile(cfg, upgradeFile) + } + upgradeList, err := cmd.Flags().GetStringSlice("upgrade-list") + if err != nil || len(upgradeList) == 0 { + return fmt.Errorf("either --upgrade-file or --upgrade-list must be provided") + } + var splitUpgrades [][]string + for _, upgrade := range upgradeList { + splitUpgrades = append(splitUpgrades, strings.Split(upgrade, ":")) + } + return processUpgradeList(cfg, splitUpgrades) +} + +// processUpgradeList takes in a list of upgrades and creates a batch upgrade file +func processUpgradeList(cfg *cosmovisor.Config, upgradeList [][]string) error { + upgradeInfoPaths := []string{} + for i, upgrade := range upgradeList { + if len(upgrade) != 3 { + return fmt.Errorf("argument at position %d (%s) is invalid", i, upgrade) + } + upgradeName := filepath.Base(upgrade[0]) + upgradePath := upgrade[1] + upgradeHeight, err := strconv.ParseInt(upgrade[2], 10, 64) + if err != nil { + return fmt.Errorf("upgrade height at position %d (%s) is invalid", i, upgrade[2]) + } + upgradeInfoPath := cfg.UpgradeInfoFilePath() + "." + upgradeName + upgradeInfoPaths = append(upgradeInfoPaths, upgradeInfoPath) + if err := addUpgrade(cfg, true, upgradeHeight, upgradeName, upgradePath, upgradeInfoPath); err != nil { + return err + } + } + + var allData []json.RawMessage + for _, uip := range upgradeInfoPaths { + fileData, err := os.ReadFile(uip) + if err != nil { + return fmt.Errorf("error reading file %s: %w", uip, err) + } + + // Verify it's valid JSON + var jsonData json.RawMessage + if err := json.Unmarshal(fileData, &jsonData); err != nil { + return fmt.Errorf("error parsing JSON from file %s: %w", uip, err) + } + + // Add to our slice + allData = append(allData, jsonData) + } + + // Marshal the combined data + batchData, err := json.MarshalIndent(allData, "", " ") + if err != nil { + return fmt.Errorf("error marshaling combined JSON: %w", err) + } + + // Write to output file + err = os.WriteFile(cfg.UpgradeInfoBatchFilePath(), batchData, 0o600) + if err != nil { + return fmt.Errorf("error writing combined JSON to file: %w", err) + } + + return nil +} + +// processUpgradeFile takes in a CSV batch upgrade file, parses it and calls processUpgradeList +func processUpgradeFile(cfg *cosmovisor.Config, upgradeFile string) error { + file, err := os.Open(upgradeFile) + if err != nil { + return fmt.Errorf("error opening upgrade CSV file %s: %w", upgradeFile, err) + } + defer file.Close() + + r := csv.NewReader(file) + r.FieldsPerRecord = 3 + r.TrimLeadingSpace = true + records, err := r.ReadAll() + if err != nil { + return fmt.Errorf("error parsing upgrade CSV file %s: %w", upgradeFile, err) + } + if err := processUpgradeList(cfg, records); err != nil { + return err + } + return nil +} diff --git a/tools/cosmovisor/cmd/cosmovisor/root.go b/tools/cosmovisor/cmd/cosmovisor/root.go index de5e06d83e..92b1af2e11 100644 --- a/tools/cosmovisor/cmd/cosmovisor/root.go +++ b/tools/cosmovisor/cmd/cosmovisor/root.go @@ -20,6 +20,7 @@ func NewRootCmd() *cobra.Command { NewVersionCmd(), NewAddUpgradeCmd(), NewShowUpgradeInfoCmd(), + NewBatchAddUpgradeCmd(), NewPrepareUpgradeCmd(), ) diff --git a/tools/cosmovisor/go.mod b/tools/cosmovisor/go.mod index 8174ce1566..b04d101494 100644 --- a/tools/cosmovisor/go.mod +++ b/tools/cosmovisor/go.mod @@ -5,6 +5,8 @@ go 1.23 require ( cosmossdk.io/log v1.4.1 cosmossdk.io/x/upgrade v0.1.4 + github.com/cosmos/cosmos-sdk v0.50.7 + github.com/fsnotify/fsnotify v1.7.0 github.com/otiai10/copy v1.14.0 github.com/pelletier/go-toml/v2 v2.2.3 github.com/spf13/cobra v1.8.1 @@ -51,7 +53,6 @@ require ( github.com/cosmos/btcutil v1.0.5 // indirect github.com/cosmos/cosmos-db v1.0.2 // indirect github.com/cosmos/cosmos-proto v1.0.0-beta.5 // indirect - github.com/cosmos/cosmos-sdk v0.50.7 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect github.com/cosmos/gogoproto v1.7.0 // indirect @@ -69,7 +70,6 @@ require ( github.com/emicklei/dot v1.6.2 // indirect github.com/fatih/color v1.17.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/getsentry/sentry-go v0.28.0 // indirect github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect diff --git a/tools/cosmovisor/process.go b/tools/cosmovisor/process.go index ac759bde39..4e9fad8be8 100644 --- a/tools/cosmovisor/process.go +++ b/tools/cosmovisor/process.go @@ -1,6 +1,7 @@ package cosmovisor import ( + "context" "encoding/json" "errors" "fmt" @@ -9,16 +10,23 @@ import ( "os/exec" "os/signal" "path/filepath" + "sort" "strconv" "strings" + "sync" "syscall" "time" + "github.com/fsnotify/fsnotify" "github.com/otiai10/copy" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "cosmossdk.io/log" "cosmossdk.io/x/upgrade/plan" upgradetypes "cosmossdk.io/x/upgrade/types" + + cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" ) type Launcher struct { @@ -36,6 +44,144 @@ func NewLauncher(logger log.Logger, cfg *Config) (Launcher, error) { return Launcher{logger: logger, cfg: cfg, fw: fw}, nil } +// loadBatchUpgradeFile loads the batch upgrade file into memory, sorted by +// their upgrade heights +func loadBatchUpgradeFile(cfg *Config) ([]upgradetypes.Plan, error) { + var uInfos []upgradetypes.Plan + upgradeInfoFile, err := os.ReadFile(cfg.UpgradeInfoBatchFilePath()) + if os.IsNotExist(err) { + return uInfos, nil + } else if err != nil { + return nil, fmt.Errorf("error while reading %s: %w", cfg.UpgradeInfoBatchFilePath(), err) + } + + if err = json.Unmarshal(upgradeInfoFile, &uInfos); err != nil { + return nil, err + } + sort.Slice(uInfos, func(i, j int) bool { + return uInfos[i].Height < uInfos[j].Height + }) + return uInfos, nil +} + +// BatchUpgradeWatcher starts a watcher loop that swaps upgrade manifests at the correct +// height, given the batch upgrade file. It watches the current state of the chain +// via the websocket API. +func BatchUpgradeWatcher(ctx context.Context, cfg *Config, logger log.Logger) { + // load batch file in memory + uInfos, err := loadBatchUpgradeFile(cfg) + if err != nil { + logger.Warn("failed to load batch upgrade file", "error", err) + uInfos = []upgradetypes.Plan{} + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + logger.Warn("failed to init watcher", "error", err) + return + } + defer watcher.Close() + err = watcher.Add(filepath.Dir(cfg.UpgradeInfoBatchFilePath())) + if err != nil { + logger.Warn("watcher failed to add upgrade directory", "error", err) + return + } + + var conn *grpc.ClientConn + var grpcErr error + + defer func() { + if conn != nil { + if err := conn.Close(); err != nil { + logger.Warn("couldn't stop gRPC client", "error", err) + } + } + }() + + // Wait for the chain process to be ready +pollLoop: + for { + select { + case <-ctx.Done(): + return + default: + conn, grpcErr = grpc.NewClient(cfg.GRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if grpcErr == nil { + break pollLoop + } + time.Sleep(time.Second) + } + } + + client := cmtservice.NewServiceClient(conn) + + var prevUpgradeHeight int64 = -1 + + logger.Info("starting the batch watcher loop") + for { + select { + case event := <-watcher.Events: + if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { + uInfos, err = loadBatchUpgradeFile(cfg) + if err != nil { + logger.Warn("failed to load batch upgrade file", "error", err) + continue + } + } + case <-ctx.Done(): + return + default: + if len(uInfos) == 0 { + // prevent spending extra CPU cycles + time.Sleep(time.Second) + continue + } + resp, err := client.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{}) + if err != nil { + logger.Warn("error getting latest block", "error", err) + time.Sleep(time.Second) + continue + } + + h := resp.SdkBlock.Header.Height + upcomingUpgrade := uInfos[0].Height + // replace upgrade-info and upgrade-info batch file + if h > prevUpgradeHeight && h < upcomingUpgrade { + jsonBytes, err := json.Marshal(uInfos[0]) + if err != nil { + logger.Warn("error marshaling JSON for upgrade-info.json", "error", err, "upgrade", uInfos[0]) + continue + } + if err := os.WriteFile(cfg.UpgradeInfoFilePath(), jsonBytes, 0o600); err != nil { + logger.Warn("error writing upgrade-info.json", "error", err) + continue + } + uInfos = uInfos[1:] + + jsonBytes, err = json.Marshal(uInfos) + if err != nil { + logger.Warn("error marshaling JSON for upgrade-info.json.batch", "error", err, "upgrades", uInfos) + continue + } + if err := os.WriteFile(cfg.UpgradeInfoBatchFilePath(), jsonBytes, 0o600); err != nil { + logger.Warn("error writing upgrade-info.json.batch", "error", err) + // remove the upgrade-info.json.batch file to avoid non-deterministic behavior + err := os.Remove(cfg.UpgradeInfoBatchFilePath()) + if err != nil && !os.IsNotExist(err) { + logger.Warn("error removing upgrade-info.json.batch", "error", err) + return + } + continue + } + prevUpgradeHeight = upcomingUpgrade + } + + // Add a small delay to avoid hammering the gRPC endpoint + time.Sleep(time.Second) + } + } +} + // Run launches the app in a subprocess and returns when the subprocess (app) // exits (either when it dies, or *after* a successful upgrade.) and upgrade finished. // Returns true if the upgrade request was detected and the upgrade process started. @@ -58,10 +204,20 @@ func (l Launcher) Run(args []string, stdin io.Reader, stdout, stderr io.Writer) return false, fmt.Errorf("launching process %s %s failed: %w", bin, strings.Join(args, " "), err) } + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + BatchUpgradeWatcher(ctx, l.cfg, l.logger) + }() + sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGQUIT, syscall.SIGTERM) go func() { sig := <-sigs + cancel() + wg.Wait() if err := cmd.Process.Signal(sig); err != nil { l.logger.Error("terminated", "error", err, "bin", bin) os.Exit(1) @@ -94,6 +250,9 @@ func (l Launcher) Run(args []string, stdin io.Reader, stdout, stderr io.Writer) return true, nil } + cancel() + wg.Wait() + return false, nil }