From 06892b9bc4be66f28c6ed769b65c2ee10eaaee8b Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Thu, 6 Apr 2023 17:06:33 -0400 Subject: [PATCH] feat(testnet): allow a single active Comet RPC service (#15711) Co-authored-by: Aleksandr Bezobchuk --- simapp/internal/testnet/cometrpc_test.go | 165 +++++++++++++++++++ simapp/internal/testnet/cometstarter_test.go | 22 +-- testutil/testnet/cometrpclock.go | 100 +++++++++++ testutil/testnet/cometstarter.go | 46 +++++- testutil/testnet/nodehelpers.go | 32 ++++ 5 files changed, 347 insertions(+), 18 deletions(-) create mode 100644 simapp/internal/testnet/cometrpc_test.go create mode 100644 testutil/testnet/cometrpclock.go create mode 100644 testutil/testnet/nodehelpers.go diff --git a/simapp/internal/testnet/cometrpc_test.go b/simapp/internal/testnet/cometrpc_test.go new file mode 100644 index 0000000000..18fea62d99 --- /dev/null +++ b/simapp/internal/testnet/cometrpc_test.go @@ -0,0 +1,165 @@ +package testnet_test + +import ( + "context" + "testing" + "time" + + "cosmossdk.io/log" + "cosmossdk.io/simapp" + cmtcfg "github.com/cometbft/cometbft/config" + "github.com/cometbft/cometbft/rpc/client/http" + dbm "github.com/cosmos/cosmos-db" + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims" + "github.com/cosmos/cosmos-sdk/testutil/testnet" + sdk "github.com/cosmos/cosmos-sdk/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/stretchr/testify/require" +) + +// A single comet server in a network runs an RPC server successfully. +func TestCometRPC_SingleRPCServer(t *testing.T) { + const nVals = 2 + + valPKs := testnet.NewValidatorPrivKeys(nVals) + cmtVals := valPKs.CometGenesisValidators() + stakingVals := cmtVals.StakingValidators() + + const chainID = "comet-rpc-singleton" + + b := testnet.DefaultGenesisBuilderOnlyValidators( + chainID, + stakingVals, + sdk.NewCoin(sdk.DefaultBondDenom, sdk.DefaultPowerReduction), + ) + + jGenesis := b.Encode() + + // Logs shouldn't be necessary here because we are exercising CometStarter, + // and only doing a very basic check that the RPC talks to the app. + logger := log.NewNopLogger() + + nodes, err := testnet.NewNetwork(nVals, func(idx int) *testnet.CometStarter { + rootDir := t.TempDir() + + app := simapp.NewSimApp( + logger, + dbm.NewMemDB(), + nil, + true, + simtestutil.NewAppOptionsWithFlagHome(rootDir), + baseapp.SetChainID(chainID), + ) + + cfg := cmtcfg.DefaultConfig() + cfg.BaseConfig.DBBackend = "memdb" + + cs := testnet.NewCometStarter( + app, + cfg, + valPKs[idx].Val, + jGenesis, + rootDir, + ) + + // Only enable the RPC on the first service. + if idx == 0 { + cs = cs.RPCListen() + } + + return cs + }) + defer nodes.StopAndWait() + require.NoError(t, err) + + // Once HTTP client to be shared across the following subtests. + c, err := http.New(nodes[0].Config().RPC.ListenAddress, "/websocket") + require.NoError(t, err) + + t.Run("status query", func(t *testing.T) { + ctx := context.Background() + st, err := c.Status(ctx) + require.NoError(t, err) + + // Simple assertion to ensure we have a functioning RPC. + require.Equal(t, chainID, st.NodeInfo.Network) + }) + + // Block until reported height is at least 1, + // otherwise we can't make transactions. + require.NoError(t, testnet.WaitForNodeHeight(nodes[0], 1, 10*time.Second)) + + t.Run("simple abci query", func(t *testing.T) { + res, err := c.ABCIQuery( + context.Background(), + "/cosmos.bank.v1beta1.Query/TotalSupply", + nil, + ) + require.NoError(t, err) + + registry := codectypes.NewInterfaceRegistry() + cdc := codec.NewProtoCodec(registry) + + var tsResp banktypes.QueryTotalSupplyResponse + require.NoError(t, cdc.Unmarshal(res.Response.Value, &tsResp)) + + // Just check that something is reported in the supply. + require.NotEmpty(t, tsResp.Supply) + }) +} + +// Starting two comet instances with an RPC server, +// fails with a predictable error. +func TestCometRPC_MultipleRPCError(t *testing.T) { + const nVals = 2 + + valPKs := testnet.NewValidatorPrivKeys(nVals) + cmtVals := valPKs.CometGenesisValidators() + stakingVals := cmtVals.StakingValidators() + + const chainID = "comet-rpc-multiple" + + b := testnet.DefaultGenesisBuilderOnlyValidators( + chainID, + stakingVals, + sdk.NewCoin(sdk.DefaultBondDenom, sdk.DefaultPowerReduction), + ) + + jGenesis := b.Encode() + + // Logs shouldn't be necessary here because we are exercising CometStarter. + logger := log.NewNopLogger() + + nodes, err := testnet.NewNetwork(nVals, func(idx int) *testnet.CometStarter { + rootDir := t.TempDir() + + app := simapp.NewSimApp( + logger, + dbm.NewMemDB(), + nil, + true, + simtestutil.NewAppOptionsWithFlagHome(rootDir), + baseapp.SetChainID(chainID), + ) + + cfg := cmtcfg.DefaultConfig() + cfg.BaseConfig.DBBackend = "memdb" + + return testnet.NewCometStarter( + app, + cfg, + valPKs[idx].Val, + jGenesis, + rootDir, + ).RPCListen() // Every node has RPCListen enabled, which will cause a failure. + }) + defer nodes.StopAndWait() + + // Returned error is convertible to CometRPCInUseError. + // We can't test the exact value because it includes a stack trace. + require.Error(t, err) + require.ErrorAs(t, err, new(testnet.CometRPCInUseError)) +} diff --git a/simapp/internal/testnet/cometstarter_test.go b/simapp/internal/testnet/cometstarter_test.go index efd008bed0..9e9d1d53c4 100644 --- a/simapp/internal/testnet/cometstarter_test.go +++ b/simapp/internal/testnet/cometstarter_test.go @@ -120,25 +120,15 @@ func TestCometStarter_PortContention(t *testing.T) { return reuseAddrs[rand.Intn(len(reuseAddrs))] }) }) + + // Ensure nodes are stopped completely, + // so that we don't get t.Cleanup errors around directories not being empty. defer nodes.StopAndWait() require.NoError(t, err) - heightAdvanced := false - for j := 0; j < 40; j++ { - cs := nodes[0].ConsensusState() - if cs.GetLastHeight() < 2 { - time.Sleep(250 * time.Millisecond) - continue - } - - // Saw height advance. - heightAdvanced = true - break - } - - if !heightAdvanced { - t.Fatalf("consensus height did not advance in approximately 10 seconds") - } + // Ensure that the height advances. + // Looking for height 2 seems more meaningful than 1. + require.NoError(t, testnet.WaitForNodeHeight(nodes[0], 2, 10*time.Second)) }) } } diff --git a/testutil/testnet/cometrpclock.go b/testutil/testnet/cometrpclock.go new file mode 100644 index 0000000000..17c05440d3 --- /dev/null +++ b/testutil/testnet/cometrpclock.go @@ -0,0 +1,100 @@ +package testnet + +import ( + "fmt" + "runtime/debug" + "sync" + + "github.com/cometbft/cometbft/node" +) + +// CometBFT v0.37 uses a singleton to manage the RPC "environment". +// v0.38 will not have that restriction, which was removed in commit: +// https://github.com/cometbft/cometbft/commit/3324f49fb7e7b40189726746493e83b82a61b558 +// +// We manage a corresponding global lock to ensure +// we don't attempt to use multiple active RPC servers in one process, +// which would result in unpredictable or incorrect behavior. +// Once the SDK adopts Comet v0.38+, we can remove this global lock mechanism. + +// Our singleton complementing Comet's global RPC state. +var globalCometMu = new(cometRPCMutex) + +type cometRPCMutex struct { + mu sync.Mutex + + prevLockStack []byte +} + +// CometRPCInUseError is returned on a failure to acquire +// the global comet RPC lock. +// +// This type will be removed once the Cosmos SDK adopts CometBFT v0.38 or newer. +type CometRPCInUseError struct { + prevStack []byte +} + +func (e CometRPCInUseError) Error() string { + return fmt.Sprintf(`Failed to acquire global lock for Comet RPC servers. + +If this in a test using t.Parallel(), remove the call to t.Parallel(). + +If this is in a test NOT using t.Parallel, +ensure that other callers call both Stop() and Wait() on the nodes. + +If there are multiple comet instances in one test using RPC servers, +ensure that only one instance has the RPC listener enabled. + +These restrictions will be loosened once cosmos-sdk adopts comet-bft v0.38 or newer. + +Stack where lock was previously acquired: +%s +`, e.prevStack) +} + +// Acquire attempts to acquire the underlying mutex. +// If it cannot be acquired on the first attempt, +// Acquire returns a [CometRPCInUseError] value. +func (m *cometRPCMutex) Acquire() error { + if !m.mu.TryLock() { + // If we can't acquire the lock, + // there is another active comet node using RPC. + // + // This was initially going to be a panic, + // but we can't easily write tests against that since + // the panic occurs in a separate goroutine + // when called through NewNetwork. + // + // Note, reading m.prevLockStack without holding m.mu + // is technically a data race, + // since it is possible that the previous caller was about to unlock. + // Nonetheless, the programmer is responsible for avoiding that situation, + // and a data race during a failure isn't particularly concerning. + return CometRPCInUseError{prevStack: m.prevLockStack} + } + + // Now we hold the lock, so first record the stack when the lock was taken. + m.prevLockStack = debug.Stack() + + return nil +} + +// Release unlocks m depending on n. +// If n is nil, m is unlocked immediately. +// If n is not nil, a new goroutine is created +// and n is released after the node has finished running. +func (m *cometRPCMutex) Release(n *node.Node) { + if n == nil { + m.prevLockStack = nil + m.mu.Unlock() + return + } + + go m.releaseAfterWait(n) +} + +func (m *cometRPCMutex) releaseAfterWait(n *node.Node) { + n.Wait() + m.prevLockStack = nil + m.mu.Unlock() +} diff --git a/testutil/testnet/cometstarter.go b/testutil/testnet/cometstarter.go index 428cc0231c..1461610c5c 100644 --- a/testutil/testnet/cometstarter.go +++ b/testutil/testnet/cometstarter.go @@ -37,6 +37,8 @@ type CometStarter struct { rootDir string + rpcListen bool + tcpAddrChooser func() string startTries int @@ -71,7 +73,19 @@ func NewCometStarter( // // At that point, we should keep the default as RPC off, // but we should add a RPCListen method to opt in to enabling it. - cfg.RPC.ListenAddress = "" + + // If RPC.ListenAddress is the default value, clear it. + const defaultRPCListenAddr = "tcp://127.0.0.1:26657" + if cfg.RPC.ListenAddress == defaultRPCListenAddr { + cfg.RPC.ListenAddress = "" + } + + // Then if it was set to anything other than empty or the default value, + // fail with a clear explanation on how to enable RPC. + // The RPCListen method must be used in order to correctly pick an available listen address. + if cfg.RPC.ListenAddress != "" { + panic(fmt.Errorf("NewCometStarter: cfg.RPC.ListenAddress must be empty (but was %q); use (*CometStarter).RPCListen() instead", cfg.RPC.ListenAddress)) + } // defaultStartTries is somewhat arbitrary. // Occasionally TestCometStarter_PortContention would fail with 10 tries, @@ -98,8 +112,33 @@ func (s *CometStarter) Logger(logger log.Logger) *CometStarter { return s } +// RPCListen enables the RPC listener service on the underlying Comet node. +// The RPC service must be enabled this way so that s can choose a dynamic port, +// retrying if necessary. +// +// Note that there is a limitation in CometBFT v0.37 that +// prevents more than one RPC server running at a time. +// Once the Cosmos SDK has adopted CometBFT v0.38 or newer, +// that limitation will be removed. +func (s *CometStarter) RPCListen() *CometStarter { + s.rpcListen = true + return s +} + // Start returns a started Comet node. -func (s *CometStarter) Start() (*node.Node, error) { +func (s *CometStarter) Start() (n *node.Node, err error) { + if s.rpcListen { + if err := globalCometMu.Acquire(); err != nil { + return nil, err + } + + // Wrap this defer in an anonymous function so we don't immediately evaluate n, + // which would always be nil at thi spoint. + defer func() { + globalCometMu.Release(n) + }() + } + fpv, nodeKey, err := s.initDisk() if err != nil { return nil, err @@ -116,6 +155,9 @@ func (s *CometStarter) Start() (*node.Node, error) { for i := 0; i < s.startTries; i++ { s.cfg.P2P.ListenAddress = s.likelyAvailableAddress() + if s.rpcListen { + s.cfg.RPC.ListenAddress = s.likelyAvailableAddress() + } n, err := node.NewNode( s.cfg, diff --git a/testutil/testnet/nodehelpers.go b/testutil/testnet/nodehelpers.go new file mode 100644 index 0000000000..686e58fec1 --- /dev/null +++ b/testutil/testnet/nodehelpers.go @@ -0,0 +1,32 @@ +package testnet + +import ( + "fmt" + "time" + + "github.com/cometbft/cometbft/node" +) + +// WaitForNodeHeight blocks until the node's consensus state reports +// a last height equal to or greater than desiredHeight. +// If totalWait has elapsed and the desired height has not been reached, +// an error is returned. +func WaitForNodeHeight(n *node.Node, desiredHeight int64, totalWait time.Duration) error { + const backoff = 100 * time.Millisecond + attempts := int64(totalWait / backoff) + + curHeight := int64(-1) + for i := int64(0); i < attempts; i++ { + curHeight = n.ConsensusState().GetLastHeight() + + if curHeight < desiredHeight { + time.Sleep(backoff) + continue + } + + // Met or exceeded target height. + return nil + } + + return fmt.Errorf("node did not reach desired height %d in %s; only reached height %d", desiredHeight, totalWait, curHeight) +}