feat(testnet): allow a single active Comet RPC service (#15711)

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
Mark Rushakoff 2023-04-06 17:06:33 -04:00 committed by GitHub
parent 401d0d72c9
commit 06892b9bc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 347 additions and 18 deletions

View File

@ -0,0 +1,165 @@
package testnet_test
import (
"context"
"testing"
"time"
"cosmossdk.io/log"
"cosmossdk.io/simapp"
cmtcfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/rpc/client/http"
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims"
"github.com/cosmos/cosmos-sdk/testutil/testnet"
sdk "github.com/cosmos/cosmos-sdk/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/stretchr/testify/require"
)
// A single comet server in a network runs an RPC server successfully.
func TestCometRPC_SingleRPCServer(t *testing.T) {
const nVals = 2
valPKs := testnet.NewValidatorPrivKeys(nVals)
cmtVals := valPKs.CometGenesisValidators()
stakingVals := cmtVals.StakingValidators()
const chainID = "comet-rpc-singleton"
b := testnet.DefaultGenesisBuilderOnlyValidators(
chainID,
stakingVals,
sdk.NewCoin(sdk.DefaultBondDenom, sdk.DefaultPowerReduction),
)
jGenesis := b.Encode()
// Logs shouldn't be necessary here because we are exercising CometStarter,
// and only doing a very basic check that the RPC talks to the app.
logger := log.NewNopLogger()
nodes, err := testnet.NewNetwork(nVals, func(idx int) *testnet.CometStarter {
rootDir := t.TempDir()
app := simapp.NewSimApp(
logger,
dbm.NewMemDB(),
nil,
true,
simtestutil.NewAppOptionsWithFlagHome(rootDir),
baseapp.SetChainID(chainID),
)
cfg := cmtcfg.DefaultConfig()
cfg.BaseConfig.DBBackend = "memdb"
cs := testnet.NewCometStarter(
app,
cfg,
valPKs[idx].Val,
jGenesis,
rootDir,
)
// Only enable the RPC on the first service.
if idx == 0 {
cs = cs.RPCListen()
}
return cs
})
defer nodes.StopAndWait()
require.NoError(t, err)
// Once HTTP client to be shared across the following subtests.
c, err := http.New(nodes[0].Config().RPC.ListenAddress, "/websocket")
require.NoError(t, err)
t.Run("status query", func(t *testing.T) {
ctx := context.Background()
st, err := c.Status(ctx)
require.NoError(t, err)
// Simple assertion to ensure we have a functioning RPC.
require.Equal(t, chainID, st.NodeInfo.Network)
})
// Block until reported height is at least 1,
// otherwise we can't make transactions.
require.NoError(t, testnet.WaitForNodeHeight(nodes[0], 1, 10*time.Second))
t.Run("simple abci query", func(t *testing.T) {
res, err := c.ABCIQuery(
context.Background(),
"/cosmos.bank.v1beta1.Query/TotalSupply",
nil,
)
require.NoError(t, err)
registry := codectypes.NewInterfaceRegistry()
cdc := codec.NewProtoCodec(registry)
var tsResp banktypes.QueryTotalSupplyResponse
require.NoError(t, cdc.Unmarshal(res.Response.Value, &tsResp))
// Just check that something is reported in the supply.
require.NotEmpty(t, tsResp.Supply)
})
}
// Starting two comet instances with an RPC server,
// fails with a predictable error.
func TestCometRPC_MultipleRPCError(t *testing.T) {
const nVals = 2
valPKs := testnet.NewValidatorPrivKeys(nVals)
cmtVals := valPKs.CometGenesisValidators()
stakingVals := cmtVals.StakingValidators()
const chainID = "comet-rpc-multiple"
b := testnet.DefaultGenesisBuilderOnlyValidators(
chainID,
stakingVals,
sdk.NewCoin(sdk.DefaultBondDenom, sdk.DefaultPowerReduction),
)
jGenesis := b.Encode()
// Logs shouldn't be necessary here because we are exercising CometStarter.
logger := log.NewNopLogger()
nodes, err := testnet.NewNetwork(nVals, func(idx int) *testnet.CometStarter {
rootDir := t.TempDir()
app := simapp.NewSimApp(
logger,
dbm.NewMemDB(),
nil,
true,
simtestutil.NewAppOptionsWithFlagHome(rootDir),
baseapp.SetChainID(chainID),
)
cfg := cmtcfg.DefaultConfig()
cfg.BaseConfig.DBBackend = "memdb"
return testnet.NewCometStarter(
app,
cfg,
valPKs[idx].Val,
jGenesis,
rootDir,
).RPCListen() // Every node has RPCListen enabled, which will cause a failure.
})
defer nodes.StopAndWait()
// Returned error is convertible to CometRPCInUseError.
// We can't test the exact value because it includes a stack trace.
require.Error(t, err)
require.ErrorAs(t, err, new(testnet.CometRPCInUseError))
}

View File

@ -120,25 +120,15 @@ func TestCometStarter_PortContention(t *testing.T) {
return reuseAddrs[rand.Intn(len(reuseAddrs))]
})
})
// Ensure nodes are stopped completely,
// so that we don't get t.Cleanup errors around directories not being empty.
defer nodes.StopAndWait()
require.NoError(t, err)
heightAdvanced := false
for j := 0; j < 40; j++ {
cs := nodes[0].ConsensusState()
if cs.GetLastHeight() < 2 {
time.Sleep(250 * time.Millisecond)
continue
}
// Saw height advance.
heightAdvanced = true
break
}
if !heightAdvanced {
t.Fatalf("consensus height did not advance in approximately 10 seconds")
}
// Ensure that the height advances.
// Looking for height 2 seems more meaningful than 1.
require.NoError(t, testnet.WaitForNodeHeight(nodes[0], 2, 10*time.Second))
})
}
}

View File

@ -0,0 +1,100 @@
package testnet
import (
"fmt"
"runtime/debug"
"sync"
"github.com/cometbft/cometbft/node"
)
// CometBFT v0.37 uses a singleton to manage the RPC "environment".
// v0.38 will not have that restriction, which was removed in commit:
// https://github.com/cometbft/cometbft/commit/3324f49fb7e7b40189726746493e83b82a61b558
//
// We manage a corresponding global lock to ensure
// we don't attempt to use multiple active RPC servers in one process,
// which would result in unpredictable or incorrect behavior.
// Once the SDK adopts Comet v0.38+, we can remove this global lock mechanism.
// Our singleton complementing Comet's global RPC state.
var globalCometMu = new(cometRPCMutex)
type cometRPCMutex struct {
mu sync.Mutex
prevLockStack []byte
}
// CometRPCInUseError is returned on a failure to acquire
// the global comet RPC lock.
//
// This type will be removed once the Cosmos SDK adopts CometBFT v0.38 or newer.
type CometRPCInUseError struct {
prevStack []byte
}
func (e CometRPCInUseError) Error() string {
return fmt.Sprintf(`Failed to acquire global lock for Comet RPC servers.
If this in a test using t.Parallel(), remove the call to t.Parallel().
If this is in a test NOT using t.Parallel,
ensure that other callers call both Stop() and Wait() on the nodes.
If there are multiple comet instances in one test using RPC servers,
ensure that only one instance has the RPC listener enabled.
These restrictions will be loosened once cosmos-sdk adopts comet-bft v0.38 or newer.
Stack where lock was previously acquired:
%s
`, e.prevStack)
}
// Acquire attempts to acquire the underlying mutex.
// If it cannot be acquired on the first attempt,
// Acquire returns a [CometRPCInUseError] value.
func (m *cometRPCMutex) Acquire() error {
if !m.mu.TryLock() {
// If we can't acquire the lock,
// there is another active comet node using RPC.
//
// This was initially going to be a panic,
// but we can't easily write tests against that since
// the panic occurs in a separate goroutine
// when called through NewNetwork.
//
// Note, reading m.prevLockStack without holding m.mu
// is technically a data race,
// since it is possible that the previous caller was about to unlock.
// Nonetheless, the programmer is responsible for avoiding that situation,
// and a data race during a failure isn't particularly concerning.
return CometRPCInUseError{prevStack: m.prevLockStack}
}
// Now we hold the lock, so first record the stack when the lock was taken.
m.prevLockStack = debug.Stack()
return nil
}
// Release unlocks m depending on n.
// If n is nil, m is unlocked immediately.
// If n is not nil, a new goroutine is created
// and n is released after the node has finished running.
func (m *cometRPCMutex) Release(n *node.Node) {
if n == nil {
m.prevLockStack = nil
m.mu.Unlock()
return
}
go m.releaseAfterWait(n)
}
func (m *cometRPCMutex) releaseAfterWait(n *node.Node) {
n.Wait()
m.prevLockStack = nil
m.mu.Unlock()
}

View File

@ -37,6 +37,8 @@ type CometStarter struct {
rootDir string
rpcListen bool
tcpAddrChooser func() string
startTries int
@ -71,7 +73,19 @@ func NewCometStarter(
//
// At that point, we should keep the default as RPC off,
// but we should add a RPCListen method to opt in to enabling it.
cfg.RPC.ListenAddress = ""
// If RPC.ListenAddress is the default value, clear it.
const defaultRPCListenAddr = "tcp://127.0.0.1:26657"
if cfg.RPC.ListenAddress == defaultRPCListenAddr {
cfg.RPC.ListenAddress = ""
}
// Then if it was set to anything other than empty or the default value,
// fail with a clear explanation on how to enable RPC.
// The RPCListen method must be used in order to correctly pick an available listen address.
if cfg.RPC.ListenAddress != "" {
panic(fmt.Errorf("NewCometStarter: cfg.RPC.ListenAddress must be empty (but was %q); use (*CometStarter).RPCListen() instead", cfg.RPC.ListenAddress))
}
// defaultStartTries is somewhat arbitrary.
// Occasionally TestCometStarter_PortContention would fail with 10 tries,
@ -98,8 +112,33 @@ func (s *CometStarter) Logger(logger log.Logger) *CometStarter {
return s
}
// RPCListen enables the RPC listener service on the underlying Comet node.
// The RPC service must be enabled this way so that s can choose a dynamic port,
// retrying if necessary.
//
// Note that there is a limitation in CometBFT v0.37 that
// prevents more than one RPC server running at a time.
// Once the Cosmos SDK has adopted CometBFT v0.38 or newer,
// that limitation will be removed.
func (s *CometStarter) RPCListen() *CometStarter {
s.rpcListen = true
return s
}
// Start returns a started Comet node.
func (s *CometStarter) Start() (*node.Node, error) {
func (s *CometStarter) Start() (n *node.Node, err error) {
if s.rpcListen {
if err := globalCometMu.Acquire(); err != nil {
return nil, err
}
// Wrap this defer in an anonymous function so we don't immediately evaluate n,
// which would always be nil at thi spoint.
defer func() {
globalCometMu.Release(n)
}()
}
fpv, nodeKey, err := s.initDisk()
if err != nil {
return nil, err
@ -116,6 +155,9 @@ func (s *CometStarter) Start() (*node.Node, error) {
for i := 0; i < s.startTries; i++ {
s.cfg.P2P.ListenAddress = s.likelyAvailableAddress()
if s.rpcListen {
s.cfg.RPC.ListenAddress = s.likelyAvailableAddress()
}
n, err := node.NewNode(
s.cfg,

View File

@ -0,0 +1,32 @@
package testnet
import (
"fmt"
"time"
"github.com/cometbft/cometbft/node"
)
// WaitForNodeHeight blocks until the node's consensus state reports
// a last height equal to or greater than desiredHeight.
// If totalWait has elapsed and the desired height has not been reached,
// an error is returned.
func WaitForNodeHeight(n *node.Node, desiredHeight int64, totalWait time.Duration) error {
const backoff = 100 * time.Millisecond
attempts := int64(totalWait / backoff)
curHeight := int64(-1)
for i := int64(0); i < attempts; i++ {
curHeight = n.ConsensusState().GetLastHeight()
if curHeight < desiredHeight {
time.Sleep(backoff)
continue
}
// Met or exceeded target height.
return nil
}
return fmt.Errorf("node did not reach desired height %d in %s; only reached height %d", desiredHeight, totalWait, curHeight)
}