laconicd-deprecated/tests/integration_tests/cosmoscli.py
yihuang 35850e620d
fix(rpc): fix gRPC query failure on legacy blocks (#1354)
* Problem: grpc query fail on legacy blocks

`BaseFee` and `EthCall`.

Solution:
- since grpc query handlers are used for all versions of the blocks, it need to be compatible with legacy formats.

debug

fix basefee fetch

Revert "debug"

This reverts commit 50ebaf697fc06b0d6e26abd8de8f89717e8a219d.

update gomod2nix

Update CHANGELOG.md

debug

fix panic

Revert "debug"

This reverts commit e08af04b0776bd390c42706cc9ec978e00bcb3bb.

* add upgrade integration test

* Update tests/integration_tests/configs/upgrade-test-package.nix

Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>
2022-09-23 18:06:25 +02:00

835 lines
23 KiB
Python

import json
import tempfile
import requests
from dateutil.parser import isoparse
from pystarport.utils import build_cli_args_safe, interact
DEFAULT_GAS_PRICE = "5000000000000aphoton"
class ChainCommand:
def __init__(self, cmd):
self.cmd = cmd
def __call__(self, cmd, *args, stdin=None, **kwargs):
"execute chain-maind"
args = " ".join(build_cli_args_safe(cmd, *args, **kwargs))
return interact(f"{self.cmd} {args}", input=stdin)
class CosmosCLI:
"the apis to interact with wallet and blockchain"
def __init__(
self,
data_dir,
node_rpc,
cmd,
):
self.data_dir = data_dir
self._genesis = json.loads(
(self.data_dir / "config" / "genesis.json").read_text()
)
self.chain_id = self._genesis["chain_id"]
self.node_rpc = node_rpc
self.raw = ChainCommand(cmd)
self.output = None
self.error = None
@property
def node_rpc_http(self):
return "http" + self.node_rpc.removeprefix("tcp")
def node_id(self):
"get tendermint node id"
output = self.raw("tendermint", "show-node-id", home=self.data_dir)
return output.decode().strip()
def delete_account(self, name):
"delete wallet account in node's keyring"
return self.raw(
"keys",
"delete",
name,
"-y",
"--force",
home=self.data_dir,
output="json",
keyring_backend="test",
)
def create_account(self, name, mnemonic=None):
"create new keypair in node's keyring"
if mnemonic is None:
output = self.raw(
"keys",
"add",
name,
home=self.data_dir,
output="json",
keyring_backend="test",
)
else:
output = self.raw(
"keys",
"add",
name,
"--recover",
home=self.data_dir,
output="json",
keyring_backend="test",
stdin=mnemonic.encode() + b"\n",
)
return json.loads(output)
def init(self, moniker):
"the node's config is already added"
return self.raw(
"init",
moniker,
chain_id=self.chain_id,
home=self.data_dir,
)
def validate_genesis(self):
return self.raw("validate-genesis", home=self.data_dir)
def add_genesis_account(self, addr, coins, **kwargs):
return self.raw(
"add-genesis-account",
addr,
coins,
home=self.data_dir,
output="json",
**kwargs,
)
def gentx(self, name, coins, min_self_delegation=1, pubkey=None):
return self.raw(
"gentx",
name,
coins,
min_self_delegation=str(min_self_delegation),
home=self.data_dir,
chain_id=self.chain_id,
keyring_backend="test",
pubkey=pubkey,
)
def collect_gentxs(self, gentx_dir):
return self.raw("collect-gentxs", gentx_dir, home=self.data_dir)
def status(self):
return json.loads(self.raw("status", node=self.node_rpc))
def block_height(self):
return int(self.status()["SyncInfo"]["latest_block_height"])
def block_time(self):
return isoparse(self.status()["SyncInfo"]["latest_block_time"])
def balances(self, addr):
return json.loads(
self.raw("query", "bank", "balances", addr, home=self.data_dir)
)["balances"]
def balance(self, addr, denom="aphoton"):
denoms = {coin["denom"]: int(coin["amount"]) for coin in self.balances(addr)}
return denoms.get(denom, 0)
def query_tx(self, tx_type, tx_value):
tx = self.raw(
"query",
"tx",
"--type",
tx_type,
tx_value,
home=self.data_dir,
chain_id=self.chain_id,
node=self.node_rpc,
)
return json.loads(tx)
def query_all_txs(self, addr):
txs = self.raw(
"query",
"txs-all",
addr,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
return json.loads(txs)
def distribution_commission(self, addr):
coin = json.loads(
self.raw(
"query",
"distribution",
"commission",
addr,
output="json",
node=self.node_rpc,
)
)["commission"][0]
return float(coin["amount"])
def distribution_community(self):
coin = json.loads(
self.raw(
"query",
"distribution",
"community-pool",
output="json",
node=self.node_rpc,
)
)["pool"][0]
return float(coin["amount"])
def distribution_reward(self, delegator_addr):
coin = json.loads(
self.raw(
"query",
"distribution",
"rewards",
delegator_addr,
output="json",
node=self.node_rpc,
)
)["total"][0]
return float(coin["amount"])
def address(self, name, bech="acc"):
output = self.raw(
"keys",
"show",
name,
"-a",
home=self.data_dir,
keyring_backend="test",
bech=bech,
)
return output.strip().decode()
def account(self, addr):
return json.loads(
self.raw(
"query", "auth", "account", addr, output="json", node=self.node_rpc
)
)
def tx_search(self, events: str):
"/tx_search"
return json.loads(
self.raw("query", "txs", events=events, output="json", node=self.node_rpc)
)
def tx_search_rpc(self, events: str):
rsp = requests.get(
f"{self.node_rpc_http}/tx_search",
params={
"query": f'"{events}"',
},
).json()
assert "error" not in rsp, rsp["error"]
return rsp["result"]["txs"]
def tx(self, value, **kwargs):
"/tx"
default_kwargs = {
"home": self.data_dir,
}
return json.loads(self.raw("query", "tx", value, **(default_kwargs | kwargs)))
def total_supply(self):
return json.loads(
self.raw("query", "bank", "total", output="json", node=self.node_rpc)
)
def validator(self, addr):
return json.loads(
self.raw(
"query",
"staking",
"validator",
addr,
output="json",
node=self.node_rpc,
)
)
def validators(self):
return json.loads(
self.raw(
"query", "staking", "validators", output="json", node=self.node_rpc
)
)["validators"]
def staking_params(self):
return json.loads(
self.raw("query", "staking", "params", output="json", node=self.node_rpc)
)
def staking_pool(self, bonded=True):
return int(
json.loads(
self.raw("query", "staking", "pool", output="json", node=self.node_rpc)
)["bonded_tokens" if bonded else "not_bonded_tokens"]
)
def transfer(self, from_, to, coins, generate_only=False, **kwargs):
kwargs.setdefault("gas_prices", DEFAULT_GAS_PRICE)
return json.loads(
self.raw(
"tx",
"bank",
"send",
from_,
to,
coins,
"-y",
"--generate-only" if generate_only else None,
home=self.data_dir,
**kwargs,
)
)
def get_delegated_amount(self, which_addr):
return json.loads(
self.raw(
"query",
"staking",
"delegations",
which_addr,
home=self.data_dir,
chain_id=self.chain_id,
node=self.node_rpc,
output="json",
)
)
def delegate_amount(self, to_addr, amount, from_addr, gas_price=None):
if gas_price is None:
return json.loads(
self.raw(
"tx",
"staking",
"delegate",
to_addr,
amount,
"-y",
home=self.data_dir,
from_=from_addr,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
else:
return json.loads(
self.raw(
"tx",
"staking",
"delegate",
to_addr,
amount,
"-y",
home=self.data_dir,
from_=from_addr,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
gas_prices=gas_price,
)
)
# to_addr: croclcl1... , from_addr: cro1...
def unbond_amount(self, to_addr, amount, from_addr):
return json.loads(
self.raw(
"tx",
"staking",
"unbond",
to_addr,
amount,
"-y",
home=self.data_dir,
from_=from_addr,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
# to_validator_addr: crocncl1... , from_from_validator_addraddr: crocl1...
def redelegate_amount(
self, to_validator_addr, from_validator_addr, amount, from_addr
):
return json.loads(
self.raw(
"tx",
"staking",
"redelegate",
from_validator_addr,
to_validator_addr,
amount,
"-y",
home=self.data_dir,
from_=from_addr,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
# from_delegator can be account name or address
def withdraw_all_rewards(self, from_delegator):
return json.loads(
self.raw(
"tx",
"distribution",
"withdraw-all-rewards",
"-y",
from_=from_delegator,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
def make_multisig(self, name, signer1, signer2):
self.raw(
"keys",
"add",
name,
multisig=f"{signer1},{signer2}",
multisig_threshold="2",
home=self.data_dir,
keyring_backend="test",
)
def sign_multisig_tx(self, tx_file, multi_addr, signer_name):
return json.loads(
self.raw(
"tx",
"sign",
tx_file,
from_=signer_name,
multisig=multi_addr,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
def sign_batch_multisig_tx(
self, tx_file, multi_addr, signer_name, account_number, sequence_number
):
r = self.raw(
"tx",
"sign-batch",
"--offline",
tx_file,
account_number=account_number,
sequence=sequence_number,
from_=signer_name,
multisig=multi_addr,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
return r.decode("utf-8")
def encode_signed_tx(self, signed_tx):
return self.raw(
"tx",
"encode",
signed_tx,
)
def sign_tx(self, tx_file, signer):
return json.loads(
self.raw(
"tx",
"sign",
tx_file,
from_=signer,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
def sign_tx_json(self, tx, signer, max_priority_price=None):
if max_priority_price is not None:
tx["body"]["extension_options"].append(
{
"@type": "/ethermint.types.v1.ExtensionOptionDynamicFeeTx",
"max_priority_price": str(max_priority_price),
}
)
with tempfile.NamedTemporaryFile("w") as fp:
json.dump(tx, fp)
fp.flush()
return self.sign_tx(fp.name, signer)
def combine_multisig_tx(self, tx_file, multi_name, signer1_file, signer2_file):
return json.loads(
self.raw(
"tx",
"multisign",
tx_file,
multi_name,
signer1_file,
signer2_file,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
)
def combine_batch_multisig_tx(
self, tx_file, multi_name, signer1_file, signer2_file
):
r = self.raw(
"tx",
"multisign-batch",
tx_file,
multi_name,
signer1_file,
signer2_file,
home=self.data_dir,
keyring_backend="test",
chain_id=self.chain_id,
node=self.node_rpc,
)
return r.decode("utf-8")
def broadcast_tx(self, tx_file, **kwargs):
kwargs.setdefault("broadcast_mode", "block")
kwargs.setdefault("output", "json")
return json.loads(
self.raw("tx", "broadcast", tx_file, node=self.node_rpc, **kwargs)
)
def broadcast_tx_json(self, tx, **kwargs):
with tempfile.NamedTemporaryFile("w") as fp:
json.dump(tx, fp)
fp.flush()
return self.broadcast_tx(fp.name, **kwargs)
def unjail(self, addr):
return json.loads(
self.raw(
"tx",
"slashing",
"unjail",
"-y",
from_=addr,
home=self.data_dir,
node=self.node_rpc,
keyring_backend="test",
chain_id=self.chain_id,
)
)
def create_validator(
self,
amount,
moniker=None,
commission_max_change_rate="0.01",
commission_rate="0.1",
commission_max_rate="0.2",
min_self_delegation="1",
identity="",
website="",
security_contact="",
details="",
):
"""MsgCreateValidator
create the node with create_node before call this"""
pubkey = (
"'"
+ (
self.raw(
"tendermint",
"show-validator",
home=self.data_dir,
)
.strip()
.decode()
)
+ "'"
)
return json.loads(
self.raw(
"tx",
"staking",
"create-validator",
"-y",
from_=self.address("validator"),
amount=amount,
pubkey=pubkey,
min_self_delegation=min_self_delegation,
# commision
commission_rate=commission_rate,
commission_max_rate=commission_max_rate,
commission_max_change_rate=commission_max_change_rate,
# description
moniker=moniker,
identity=identity,
website=website,
security_contact=security_contact,
details=details,
# basic
home=self.data_dir,
node=self.node_rpc,
keyring_backend="test",
chain_id=self.chain_id,
)
)
def edit_validator(
self,
commission_rate=None,
moniker=None,
identity=None,
website=None,
security_contact=None,
details=None,
):
"""MsgEditValidator"""
options = dict(
commission_rate=commission_rate,
# description
moniker=moniker,
identity=identity,
website=website,
security_contact=security_contact,
details=details,
)
return json.loads(
self.raw(
"tx",
"staking",
"edit-validator",
"-y",
from_=self.address("validator"),
home=self.data_dir,
node=self.node_rpc,
keyring_backend="test",
chain_id=self.chain_id,
**{k: v for k, v in options.items() if v is not None},
)
)
def gov_propose(self, proposer, kind, proposal, **kwargs):
kwargs.setdefault("gas_prices", DEFAULT_GAS_PRICE)
if kind == "software-upgrade":
return json.loads(
self.raw(
"tx",
"gov",
"submit-proposal",
kind,
proposal["name"],
"-y",
from_=proposer,
# content
title=proposal.get("title"),
description=proposal.get("description"),
upgrade_height=proposal.get("upgrade-height"),
upgrade_time=proposal.get("upgrade-time"),
upgrade_info=proposal.get("upgrade-info"),
deposit=proposal.get("deposit"),
# basic
home=self.data_dir,
**kwargs,
)
)
elif kind == "cancel-software-upgrade":
return json.loads(
self.raw(
"tx",
"gov",
"submit-proposal",
kind,
"-y",
from_=proposer,
# content
title=proposal.get("title"),
description=proposal.get("description"),
deposit=proposal.get("deposit"),
# basic
home=self.data_dir,
**kwargs,
)
)
else:
with tempfile.NamedTemporaryFile("w") as fp:
json.dump(proposal, fp)
fp.flush()
return json.loads(
self.raw(
"tx",
"gov",
"submit-proposal",
kind,
fp.name,
"-y",
from_=proposer,
# basic
home=self.data_dir,
**kwargs,
)
)
def gov_vote(self, voter, proposal_id, option, **kwargs):
kwargs.setdefault("gas_prices", DEFAULT_GAS_PRICE)
return json.loads(
self.raw(
"tx",
"gov",
"vote",
proposal_id,
option,
"-y",
from_=voter,
home=self.data_dir,
**kwargs,
)
)
def gov_deposit(self, depositor, proposal_id, amount):
return json.loads(
self.raw(
"tx",
"gov",
"deposit",
proposal_id,
amount,
"-y",
from_=depositor,
home=self.data_dir,
node=self.node_rpc,
keyring_backend="test",
chain_id=self.chain_id,
)
)
def query_proposals(self, depositor=None, limit=None, status=None, voter=None):
return json.loads(
self.raw(
"query",
"gov",
"proposals",
depositor=depositor,
count_total=limit,
status=status,
voter=voter,
output="json",
node=self.node_rpc,
)
)
def query_proposal(self, proposal_id):
return json.loads(
self.raw(
"query",
"gov",
"proposal",
proposal_id,
output="json",
node=self.node_rpc,
)
)
def query_tally(self, proposal_id):
return json.loads(
self.raw(
"query",
"gov",
"tally",
proposal_id,
output="json",
node=self.node_rpc,
)
)
def ibc_transfer(
self,
from_,
to,
amount,
channel, # src channel
target_version, # chain version number of target chain
i=0,
):
return json.loads(
self.raw(
"tx",
"ibc-transfer",
"transfer",
"transfer", # src port
channel,
to,
amount,
"-y",
# FIXME https://github.com/cosmos/cosmos-sdk/issues/8059
"--absolute-timeouts",
from_=from_,
home=self.data_dir,
node=self.node_rpc,
keyring_backend="test",
chain_id=self.chain_id,
packet_timeout_height=f"{target_version}-10000000000",
packet_timeout_timestamp=0,
)
)
def export(self):
return self.raw("export", home=self.data_dir)
def unsaferesetall(self):
return self.raw("unsafe-reset-all")
def build_evm_tx(self, raw_tx: str, **kwargs):
return json.loads(
self.raw(
"tx",
"evm",
"raw",
raw_tx,
"-y",
"--generate-only",
home=self.data_dir,
**kwargs,
)
)
def query_base_fee(self, **kwargs):
default_kwargs = {"home": self.data_dir}
return int(
json.loads(
self.raw(
"q",
"feemarket",
"base-fee",
**(default_kwargs | kwargs),
)
)["base_fee"]
)
def rollback(self):
self.raw("rollback", home=self.data_dir)
def migrate_keystore(self):
return self.raw("keys", "migrate", home=self.data_dir)