feat(tools/cosmovisor): cosmovisor batch upgrades (#21790)

Co-authored-by: Julien Robert <julien@rbrt.fr>
This commit is contained in:
psiphi5 2024-10-10 18:31:29 +11:00 committed by GitHub
parent 7e517369e3
commit 5b53ccaf0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 354 additions and 28 deletions

View File

@ -38,6 +38,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Features
* [#21790](https://github.com/cosmos/cosmos-sdk/pull/21790) Add `add-batch-upgrade` command.
* [#21972](https://github.com/cosmos/cosmos-sdk/pull/21972) Add `prepare-upgrade` command
### Improvements

View File

@ -111,6 +111,11 @@ func (cfg *Config) UpgradeInfoFilePath() string {
return filepath.Join(cfg.Home, "data", upgradetypes.UpgradeInfoFilename)
}
// UpgradeInfoBatchFilePath is the same as UpgradeInfoFilePath but with a batch suffix.
func (cfg *Config) UpgradeInfoBatchFilePath() string {
return cfg.UpgradeInfoFilePath() + ".batch"
}
// SymLinkToGenesis creates a symbolic link from "./current" to the genesis directory.
func (cfg *Config) SymLinkToGenesis() (string, error) {
// workdir is set to cosmovisor directory so relative

View File

@ -19,7 +19,7 @@ func NewAddUpgradeCmd() *cobra.Command {
Short: "Add APP upgrade binary to cosmovisor",
SilenceUsage: true,
Args: cobra.ExactArgs(2),
RunE: AddUpgrade,
RunE: addUpgradeCmd,
}
addUpgrade.Flags().Bool(cosmovisor.FlagForce, false, "overwrite existing upgrade binary / upgrade-info.json file")
@ -28,26 +28,14 @@ func NewAddUpgradeCmd() *cobra.Command {
return addUpgrade
}
// AddUpgrade adds upgrade info to manifest
func AddUpgrade(cmd *cobra.Command, args []string) error {
configPath, err := cmd.Flags().GetString(cosmovisor.FlagCosmovisorConfig)
if err != nil {
return fmt.Errorf("failed to get config flag: %w", err)
}
cfg, err := cosmovisor.GetConfigFromFile(configPath)
if err != nil {
return err
}
// addUpgrade adds upgrade info to manifest
func addUpgrade(cfg *cosmovisor.Config, force bool, upgradeHeight int64, upgradeName, executablePath, upgradeInfoPath string) error {
logger := cfg.Logger(os.Stdout)
upgradeName := args[0]
if !cfg.DisableRecase {
upgradeName = strings.ToLower(args[0])
upgradeName = strings.ToLower(upgradeName)
}
executablePath := args[1]
if _, err := os.Stat(executablePath); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("invalid executable path: %w", err)
@ -68,11 +56,6 @@ func AddUpgrade(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to read binary: %w", err)
}
force, err := cmd.Flags().GetBool(cosmovisor.FlagForce)
if err != nil {
return fmt.Errorf("failed to get force flag: %w", err)
}
if err := saveOrAbort(cfg.UpgradeBin(upgradeName), executableData, force); err != nil {
return err
}
@ -80,9 +63,7 @@ func AddUpgrade(cmd *cobra.Command, args []string) error {
logger.Info(fmt.Sprintf("Using %s for %s upgrade", executablePath, upgradeName))
logger.Info(fmt.Sprintf("Upgrade binary located at %s", cfg.UpgradeBin(upgradeName)))
if upgradeHeight, err := cmd.Flags().GetInt64(cosmovisor.FlagUpgradeHeight); err != nil {
return fmt.Errorf("failed to get upgrade-height flag: %w", err)
} else if upgradeHeight > 0 {
if upgradeHeight > 0 {
plan := upgradetypes.Plan{Name: upgradeName, Height: upgradeHeight}
if err := plan.ValidateBasic(); err != nil {
panic(fmt.Errorf("something is wrong with cosmovisor: %w", err))
@ -94,16 +75,52 @@ func AddUpgrade(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to marshal upgrade plan: %w", err)
}
if err := saveOrAbort(cfg.UpgradeInfoFilePath(), planData, force); err != nil {
if err := saveOrAbort(upgradeInfoPath, planData, force); err != nil {
return err
}
logger.Info(fmt.Sprintf("%s created, %s upgrade binary will switch at height %d", cfg.UpgradeInfoFilePath(), upgradeName, upgradeHeight))
logger.Info(fmt.Sprintf("%s created, %s upgrade binary will switch at height %d", upgradeInfoPath, upgradeName, upgradeHeight))
}
return nil
}
// GetConfig returns a Config using passed-in flag
func getConfigFromCmd(cmd *cobra.Command) (*cosmovisor.Config, error) {
configPath, err := cmd.Flags().GetString(cosmovisor.FlagCosmovisorConfig)
if err != nil {
return nil, fmt.Errorf("failed to get config flag: %w", err)
}
cfg, err := cosmovisor.GetConfigFromFile(configPath)
if err != nil {
return nil, err
}
return cfg, nil
}
// addUpgradeCmd parses input flags and adds upgrade info to manifest
func addUpgradeCmd(cmd *cobra.Command, args []string) error {
cfg, err := getConfigFromCmd(cmd)
if err != nil {
return err
}
upgradeName, executablePath := args[0], args[1]
force, err := cmd.Flags().GetBool(cosmovisor.FlagForce)
if err != nil {
return fmt.Errorf("failed to get force flag: %w", err)
}
upgradeHeight, err := cmd.Flags().GetInt64(cosmovisor.FlagUpgradeHeight)
if err != nil {
return fmt.Errorf("failed to get upgrade-height flag: %w", err)
}
return addUpgrade(cfg, force, upgradeHeight, upgradeName, executablePath, cfg.UpgradeInfoFilePath())
}
// saveOrAbort saves data to path or aborts if file exists and force is false
func saveOrAbort(path string, data []byte, force bool) error {
if _, err := os.Stat(path); err == nil {

View File

@ -0,0 +1,143 @@
package main
import (
"encoding/csv"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/spf13/cobra"
"cosmossdk.io/tools/cosmovisor"
)
func NewBatchAddUpgradeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "add-batch-upgrade [flags]",
Short: "Add multiple upgrade binaries at specified heights to cosmovisor",
Long: `This command allows you to specify multiple upgrades at once at specific heights, copying or creating a batch upgrade file that's actively watched during 'cosmovisor run'.
You can provide upgrades in two ways:
1. Using --upgrade-file: Specify a path to a headerless CSV batch upgrade file in the format:
upgrade-name,path-to-exec,upgrade-height
2. Using --upgrade-list: Provide a comma-separated list of upgrades.
Each upgrade is defined by three colon-separated values:
a. upgrade-name: A unique identifier for the upgrade
b. path-to-exec: The file path to the upgrade's executable binary
c. upgrade-height: The block height at which the upgrade should occur
This creates a batch upgrade JSON file with the upgrade-info objects in the upgrade directory.
Note: You must provide either --upgrade-file or --upgrade-list.`,
Example: `cosmovisor add-batch-upgrade --upgrade-list upgrade_v2:/path/to/v2/binary:1000000,upgrade_v3:/path/to/v3/binary:2000000
cosmovisor add-batch-upgrade --upgrade-file /path/to/batch_upgrade.csv`,
SilenceUsage: true,
Args: cobra.NoArgs,
RunE: addBatchUpgrade,
}
cmd.Flags().String("upgrade-file", "", "Path to a batch upgrade file which is a JSON array of upgrade-info objects")
cmd.Flags().StringSlice("upgrade-list", []string{}, "List of comma-separated upgrades in the format 'name:path/to/binary:height'")
cmd.MarkFlagsMutuallyExclusive("upgrade-file", "upgrade-list")
return cmd
}
// addBatchUpgrade takes in multiple specified upgrades and creates a single
// batch upgrade file out of them
func addBatchUpgrade(cmd *cobra.Command, args []string) error {
cfg, err := getConfigFromCmd(cmd)
if err != nil {
return err
}
upgradeFile, err := cmd.Flags().GetString("upgrade-file")
if err == nil && upgradeFile != "" {
return processUpgradeFile(cfg, upgradeFile)
}
upgradeList, err := cmd.Flags().GetStringSlice("upgrade-list")
if err != nil || len(upgradeList) == 0 {
return fmt.Errorf("either --upgrade-file or --upgrade-list must be provided")
}
var splitUpgrades [][]string
for _, upgrade := range upgradeList {
splitUpgrades = append(splitUpgrades, strings.Split(upgrade, ":"))
}
return processUpgradeList(cfg, splitUpgrades)
}
// processUpgradeList takes in a list of upgrades and creates a batch upgrade file
func processUpgradeList(cfg *cosmovisor.Config, upgradeList [][]string) error {
upgradeInfoPaths := []string{}
for i, upgrade := range upgradeList {
if len(upgrade) != 3 {
return fmt.Errorf("argument at position %d (%s) is invalid", i, upgrade)
}
upgradeName := filepath.Base(upgrade[0])
upgradePath := upgrade[1]
upgradeHeight, err := strconv.ParseInt(upgrade[2], 10, 64)
if err != nil {
return fmt.Errorf("upgrade height at position %d (%s) is invalid", i, upgrade[2])
}
upgradeInfoPath := cfg.UpgradeInfoFilePath() + "." + upgradeName
upgradeInfoPaths = append(upgradeInfoPaths, upgradeInfoPath)
if err := addUpgrade(cfg, true, upgradeHeight, upgradeName, upgradePath, upgradeInfoPath); err != nil {
return err
}
}
var allData []json.RawMessage
for _, uip := range upgradeInfoPaths {
fileData, err := os.ReadFile(uip)
if err != nil {
return fmt.Errorf("error reading file %s: %w", uip, err)
}
// Verify it's valid JSON
var jsonData json.RawMessage
if err := json.Unmarshal(fileData, &jsonData); err != nil {
return fmt.Errorf("error parsing JSON from file %s: %w", uip, err)
}
// Add to our slice
allData = append(allData, jsonData)
}
// Marshal the combined data
batchData, err := json.MarshalIndent(allData, "", " ")
if err != nil {
return fmt.Errorf("error marshaling combined JSON: %w", err)
}
// Write to output file
err = os.WriteFile(cfg.UpgradeInfoBatchFilePath(), batchData, 0o600)
if err != nil {
return fmt.Errorf("error writing combined JSON to file: %w", err)
}
return nil
}
// processUpgradeFile takes in a CSV batch upgrade file, parses it and calls processUpgradeList
func processUpgradeFile(cfg *cosmovisor.Config, upgradeFile string) error {
file, err := os.Open(upgradeFile)
if err != nil {
return fmt.Errorf("error opening upgrade CSV file %s: %w", upgradeFile, err)
}
defer file.Close()
r := csv.NewReader(file)
r.FieldsPerRecord = 3
r.TrimLeadingSpace = true
records, err := r.ReadAll()
if err != nil {
return fmt.Errorf("error parsing upgrade CSV file %s: %w", upgradeFile, err)
}
if err := processUpgradeList(cfg, records); err != nil {
return err
}
return nil
}

View File

@ -20,6 +20,7 @@ func NewRootCmd() *cobra.Command {
NewVersionCmd(),
NewAddUpgradeCmd(),
NewShowUpgradeInfoCmd(),
NewBatchAddUpgradeCmd(),
NewPrepareUpgradeCmd(),
)

View File

@ -5,6 +5,8 @@ go 1.23
require (
cosmossdk.io/log v1.4.1
cosmossdk.io/x/upgrade v0.1.4
github.com/cosmos/cosmos-sdk v0.50.7
github.com/fsnotify/fsnotify v1.7.0
github.com/otiai10/copy v1.14.0
github.com/pelletier/go-toml/v2 v2.2.3
github.com/spf13/cobra v1.8.1
@ -51,7 +53,6 @@ require (
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/cosmos/cosmos-db v1.0.2 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.5 // indirect
github.com/cosmos/cosmos-sdk v0.50.7 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cosmos/gogogateway v1.2.0 // indirect
github.com/cosmos/gogoproto v1.7.0 // indirect
@ -69,7 +70,6 @@ require (
github.com/emicklei/dot v1.6.2 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.28.0 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/log v0.2.1 // indirect

View File

@ -1,6 +1,7 @@
package cosmovisor
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -9,16 +10,23 @@ import (
"os/exec"
"os/signal"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/fsnotify/fsnotify"
"github.com/otiai10/copy"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"cosmossdk.io/log"
"cosmossdk.io/x/upgrade/plan"
upgradetypes "cosmossdk.io/x/upgrade/types"
cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
)
type Launcher struct {
@ -36,6 +44,144 @@ func NewLauncher(logger log.Logger, cfg *Config) (Launcher, error) {
return Launcher{logger: logger, cfg: cfg, fw: fw}, nil
}
// loadBatchUpgradeFile loads the batch upgrade file into memory, sorted by
// their upgrade heights
func loadBatchUpgradeFile(cfg *Config) ([]upgradetypes.Plan, error) {
var uInfos []upgradetypes.Plan
upgradeInfoFile, err := os.ReadFile(cfg.UpgradeInfoBatchFilePath())
if os.IsNotExist(err) {
return uInfos, nil
} else if err != nil {
return nil, fmt.Errorf("error while reading %s: %w", cfg.UpgradeInfoBatchFilePath(), err)
}
if err = json.Unmarshal(upgradeInfoFile, &uInfos); err != nil {
return nil, err
}
sort.Slice(uInfos, func(i, j int) bool {
return uInfos[i].Height < uInfos[j].Height
})
return uInfos, nil
}
// BatchUpgradeWatcher starts a watcher loop that swaps upgrade manifests at the correct
// height, given the batch upgrade file. It watches the current state of the chain
// via the websocket API.
func BatchUpgradeWatcher(ctx context.Context, cfg *Config, logger log.Logger) {
// load batch file in memory
uInfos, err := loadBatchUpgradeFile(cfg)
if err != nil {
logger.Warn("failed to load batch upgrade file", "error", err)
uInfos = []upgradetypes.Plan{}
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
logger.Warn("failed to init watcher", "error", err)
return
}
defer watcher.Close()
err = watcher.Add(filepath.Dir(cfg.UpgradeInfoBatchFilePath()))
if err != nil {
logger.Warn("watcher failed to add upgrade directory", "error", err)
return
}
var conn *grpc.ClientConn
var grpcErr error
defer func() {
if conn != nil {
if err := conn.Close(); err != nil {
logger.Warn("couldn't stop gRPC client", "error", err)
}
}
}()
// Wait for the chain process to be ready
pollLoop:
for {
select {
case <-ctx.Done():
return
default:
conn, grpcErr = grpc.NewClient(cfg.GRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if grpcErr == nil {
break pollLoop
}
time.Sleep(time.Second)
}
}
client := cmtservice.NewServiceClient(conn)
var prevUpgradeHeight int64 = -1
logger.Info("starting the batch watcher loop")
for {
select {
case event := <-watcher.Events:
if event.Op&(fsnotify.Write|fsnotify.Create) != 0 {
uInfos, err = loadBatchUpgradeFile(cfg)
if err != nil {
logger.Warn("failed to load batch upgrade file", "error", err)
continue
}
}
case <-ctx.Done():
return
default:
if len(uInfos) == 0 {
// prevent spending extra CPU cycles
time.Sleep(time.Second)
continue
}
resp, err := client.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{})
if err != nil {
logger.Warn("error getting latest block", "error", err)
time.Sleep(time.Second)
continue
}
h := resp.SdkBlock.Header.Height
upcomingUpgrade := uInfos[0].Height
// replace upgrade-info and upgrade-info batch file
if h > prevUpgradeHeight && h < upcomingUpgrade {
jsonBytes, err := json.Marshal(uInfos[0])
if err != nil {
logger.Warn("error marshaling JSON for upgrade-info.json", "error", err, "upgrade", uInfos[0])
continue
}
if err := os.WriteFile(cfg.UpgradeInfoFilePath(), jsonBytes, 0o600); err != nil {
logger.Warn("error writing upgrade-info.json", "error", err)
continue
}
uInfos = uInfos[1:]
jsonBytes, err = json.Marshal(uInfos)
if err != nil {
logger.Warn("error marshaling JSON for upgrade-info.json.batch", "error", err, "upgrades", uInfos)
continue
}
if err := os.WriteFile(cfg.UpgradeInfoBatchFilePath(), jsonBytes, 0o600); err != nil {
logger.Warn("error writing upgrade-info.json.batch", "error", err)
// remove the upgrade-info.json.batch file to avoid non-deterministic behavior
err := os.Remove(cfg.UpgradeInfoBatchFilePath())
if err != nil && !os.IsNotExist(err) {
logger.Warn("error removing upgrade-info.json.batch", "error", err)
return
}
continue
}
prevUpgradeHeight = upcomingUpgrade
}
// Add a small delay to avoid hammering the gRPC endpoint
time.Sleep(time.Second)
}
}
}
// Run launches the app in a subprocess and returns when the subprocess (app)
// exits (either when it dies, or *after* a successful upgrade.) and upgrade finished.
// Returns true if the upgrade request was detected and the upgrade process started.
@ -58,10 +204,20 @@ func (l Launcher) Run(args []string, stdin io.Reader, stdout, stderr io.Writer)
return false, fmt.Errorf("launching process %s %s failed: %w", bin, strings.Join(args, " "), err)
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
BatchUpgradeWatcher(ctx, l.cfg, l.logger)
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT, syscall.SIGTERM)
go func() {
sig := <-sigs
cancel()
wg.Wait()
if err := cmd.Process.Signal(sig); err != nil {
l.logger.Error("terminated", "error", err, "bin", bin)
os.Exit(1)
@ -94,6 +250,9 @@ func (l Launcher) Run(args []string, stdin io.Reader, stdout, stderr io.Writer)
return true, nil
}
cancel()
wg.Wait()
return false, nil
}