From 1f0b6845c83d8f16e831cb9c185043e1f2b11113 Mon Sep 17 00:00:00 2001 From: Simon Warta Date: Tue, 18 Feb 2020 12:48:09 +0100 Subject: [PATCH] Implement liveTx --- packages/bcp/src/cosmwasmconnection.spec.ts | 186 ++++++++++++++++++++ packages/bcp/src/cosmwasmconnection.ts | 91 +++++++++- packages/bcp/types/cosmwasmconnection.d.ts | 3 +- 3 files changed, 276 insertions(+), 4 deletions(-) diff --git a/packages/bcp/src/cosmwasmconnection.spec.ts b/packages/bcp/src/cosmwasmconnection.spec.ts index 269b08af..c02fc489 100644 --- a/packages/bcp/src/cosmwasmconnection.spec.ts +++ b/packages/bcp/src/cosmwasmconnection.spec.ts @@ -5,6 +5,7 @@ import { Algorithm, Amount, ChainId, + ConfirmedTransaction, isBlockInfoPending, isBlockInfoSucceeded, isConfirmedTransaction, @@ -14,6 +15,7 @@ import { TokenTicker, TransactionId, TransactionState, + UnsignedTransaction, } from "@iov/bcp"; import { Random, Secp256k1, Secp256k1Signature, Sha256 } from "@iov/crypto"; import { Bech32, Encoding } from "@iov/encoding"; @@ -795,6 +797,190 @@ describe("CosmWasmConnection", () => { }); }); + describe("liveTx", () => { + it("can listen to transactions by recipient address (transactions in history and updates)", done => { + pendingWithoutWasmd(); + + (async () => { + const connection = await CosmWasmConnection.establish(httpUrl, defaultPrefix, defaultConfig); + + const profile = new UserProfile(); + const wallet = profile.addWallet(Secp256k1HdWallet.fromMnemonic(faucet.mnemonic)); + const sender = await profile.createIdentity(wallet.id, defaultChainId, faucet.path); + + // send transactions + + const recipientAddress = makeRandomAddress(); + const sendA = await connection.withDefaultFee({ + kind: "bcp/send", + chainId: defaultChainId, + senderPubkey: sender.pubkey, + sender: connection.codec.identityToAddress(sender), + recipient: recipientAddress, + amount: defaultAmount, + memo: `liveTx() test A ${Math.random()}`, + }); + + const sendB = await connection.withDefaultFee({ + kind: "bcp/send", + chainId: defaultChainId, + senderPubkey: sender.pubkey, + sender: connection.codec.identityToAddress(sender), + recipient: recipientAddress, + amount: defaultAmount, + memo: `liveTx() test B ${Math.random()}`, + }); + + const sendC = await connection.withDefaultFee({ + kind: "bcp/send", + chainId: defaultChainId, + senderPubkey: sender.pubkey, + sender: connection.codec.identityToAddress(sender), + recipient: recipientAddress, + amount: defaultAmount, + memo: `liveTx() test C ${Math.random()}`, + }); + + const [nonceA, nonceB, nonceC] = await connection.getNonces({ pubkey: sender.pubkey }, 3); + const signedA = await profile.signTransaction(sender, sendA, connection.codec, nonceA); + const signedB = await profile.signTransaction(sender, sendB, connection.codec, nonceB); + const signedC = await profile.signTransaction(sender, sendC, connection.codec, nonceC); + const bytesToPostA = connection.codec.bytesToPost(signedA); + const bytesToPostB = connection.codec.bytesToPost(signedB); + const bytesToPostC = connection.codec.bytesToPost(signedC); + + // Post A and B. Unfortunately the REST server API does not support sending them in parallel because the sequence check fails. + const postResultA = await connection.postTx(bytesToPostA); + await postResultA.blockInfo.waitFor(info => !isBlockInfoPending(info)); + const postResultB = await connection.postTx(bytesToPostB); + await postResultB.blockInfo.waitFor(info => !isBlockInfoPending(info)); + + // setup listener after A and B are in block + const events = new Array>(); + const subscription = connection.liveTx({ sentFromOrTo: recipientAddress }).subscribe({ + next: event => { + assert(isConfirmedTransaction(event), "Confirmed transaction expected"); + events.push(event); + + assert(isSendTransaction(event.transaction), "Unexpected transaction type"); + expect(event.transaction.recipient).toEqual(recipientAddress); + + if (events.length === 3) { + expect(events[1].height).toEqual(events[0].height + 1); + expect(events[2].height).toBeGreaterThan(events[1].height); + + subscription.unsubscribe(); + connection.disconnect(); + done(); + } + }, + }); + + // Post C + await connection.postTx(bytesToPostC); + })().catch(done.fail); + }); + + it("can listen to transactions by ID (transaction in history)", done => { + pendingWithoutWasmd(); + + (async () => { + const connection = await CosmWasmConnection.establish(httpUrl, defaultPrefix, defaultConfig); + + const profile = new UserProfile(); + const wallet = profile.addWallet(Secp256k1HdWallet.fromMnemonic(faucet.mnemonic)); + const sender = await profile.createIdentity(wallet.id, defaultChainId, faucet.path); + + const recipientAddress = makeRandomAddress(); + const send = await connection.withDefaultFee({ + kind: "bcp/send", + chainId: defaultChainId, + senderPubkey: sender.pubkey, + sender: connection.codec.identityToAddress(sender), + recipient: recipientAddress, + amount: defaultAmount, + memo: `liveTx() test ${Math.random()}`, + }); + + const nonce = await connection.getNonce({ pubkey: sender.pubkey }); + const signed = await profile.signTransaction(sender, send, connection.codec, nonce); + const bytesToPost = connection.codec.bytesToPost(signed); + + const postResult = await connection.postTx(bytesToPost); + const transactionId = postResult.transactionId; + + // Wait for a block + await postResult.blockInfo.waitFor(info => !isBlockInfoPending(info)); + + // setup listener after transaction is in block + const events = new Array>(); + const subscription = connection.liveTx({ id: transactionId }).subscribe({ + next: event => { + assert(isConfirmedTransaction(event), "Confirmed transaction expected"); + events.push(event); + + assert(isSendTransaction(event.transaction), "Unexpected transaction type"); + expect(event.transaction.recipient).toEqual(recipientAddress); + expect(event.transactionId).toEqual(transactionId); + + subscription.unsubscribe(); + connection.disconnect(); + done(); + }, + }); + })().catch(done.fail); + }); + + it("can listen to transactions by ID (transaction in updates)", done => { + pendingWithoutWasmd(); + + (async () => { + const connection = await CosmWasmConnection.establish(httpUrl, defaultPrefix, defaultConfig); + + const profile = new UserProfile(); + const wallet = profile.addWallet(Secp256k1HdWallet.fromMnemonic(faucet.mnemonic)); + const sender = await profile.createIdentity(wallet.id, defaultChainId, faucet.path); + + // send transactions + + const recipientAddress = makeRandomAddress(); + const send = await connection.withDefaultFee({ + kind: "bcp/send", + chainId: defaultChainId, + senderPubkey: sender.pubkey, + sender: connection.codec.identityToAddress(sender), + recipient: recipientAddress, + amount: defaultAmount, + memo: `liveTx() test ${Math.random()}`, + }); + + const nonce = await connection.getNonce({ pubkey: sender.pubkey }); + const signed = await profile.signTransaction(sender, send, connection.codec, nonce); + const bytesToPost = connection.codec.bytesToPost(signed); + + const postResult = await connection.postTx(bytesToPost); + const transactionId = postResult.transactionId; + + // setup listener before transaction is in block + const events = new Array>(); + const subscription = connection.liveTx({ id: transactionId }).subscribe({ + next: event => { + assert(isConfirmedTransaction(event), "Confirmed transaction expected"); + events.push(event); + + assert(isSendTransaction(event.transaction), "Unexpected transaction type"); + expect(event.transaction.recipient).toEqual(recipientAddress); + expect(event.transactionId).toEqual(transactionId); + + subscription.unsubscribe(); + connection.disconnect(); + done(); + }, + }); + })().catch(done.fail); + }); + }); + describe("integration tests", () => { it("can send ERC20 tokens", async () => { pendingWithoutWasmd(); diff --git a/packages/bcp/src/cosmwasmconnection.ts b/packages/bcp/src/cosmwasmconnection.ts index d27d102c..c0b57339 100644 --- a/packages/bcp/src/cosmwasmconnection.ts +++ b/packages/bcp/src/cosmwasmconnection.ts @@ -37,7 +37,7 @@ import { UnsignedTransaction, } from "@iov/bcp"; import { Encoding, Uint53 } from "@iov/encoding"; -import { DefaultValueProducer, ValueAndUpdates } from "@iov/stream"; +import { concat, DefaultValueProducer, ValueAndUpdates } from "@iov/stream"; import BN from "bn.js"; import equal from "fast-deep-equal"; import { ReadonlyDate } from "readonly-date"; @@ -355,9 +355,55 @@ export class CosmWasmConnection implements BlockchainConnection { } public liveTx( - _query: TransactionQuery, + query: TransactionQuery, ): Stream | FailedTransaction> { - throw new Error("not implemented"); + if ([query.height, query.signedBy, query.tags].some(isDefined)) { + throw new Error("Transaction query by height, signedBy or tags not yet supported"); + } + + if (query.id) { + if (query.minHeight || query.maxHeight) { + throw new Error("Query by minHeight/maxHeight not supported together with ID"); + } + + // concat never() because we want non-completing streams consistently + return concat(this.waitForTransaction(query.id), Stream.never()); + } else if (query.sentFromOrTo) { + let pollInternal: NodeJS.Timeout | undefined; + const producer: Producer | FailedTransaction> = { + start: async listener => { + let minHeight = query.minHeight || 0; + const maxHeight = query.maxHeight || Number.MAX_SAFE_INTEGER; + + const poll = async (): Promise => { + const result = await this.searchTx({ + sentFromOrTo: query.sentFromOrTo, + minHeight: minHeight, + maxHeight: maxHeight, + }); + for (const item of result) { + listener.next(item); + if (item.height >= minHeight) { + // we assume we got all matching transactions from block `item.height` now + minHeight = item.height + 1; + } + } + }; + + await poll(); + pollInternal = setInterval(poll, defaultPollInterval); + }, + stop: () => { + if (pollInternal) { + clearInterval(pollInternal); + pollInternal = undefined; + } + }, + }; + return Stream.create(producer); + } else { + throw new Error("Unsupported query."); + } } public async getFeeQuote(tx: UnsignedTransaction): Promise { @@ -433,4 +479,43 @@ export class CosmWasmConnection implements BlockchainConnection { this.erc20Tokens, ); } + + private waitForTransaction( + id: TransactionId, + ): Stream | FailedTransaction> { + let pollInternal: NodeJS.Timeout | undefined; + const producer: Producer | FailedTransaction> = { + start: listener => { + setInterval(async () => { + try { + const results = await this.searchTx({ id: id }); + switch (results.length) { + case 0: + // okay, we'll try again + break; + case 1: + listener.next(results[0]); + listener.complete(); + break; + default: + throw new Error(`Got unexpected number of search results: ${results.length}`); + } + } catch (error) { + if (pollInternal) { + clearTimeout(pollInternal); + pollInternal = undefined; + } + listener.error(error); + } + }, defaultPollInterval); + }, + stop: () => { + if (pollInternal) { + clearTimeout(pollInternal); + pollInternal = undefined; + } + }, + }; + return Stream.create(producer); + } } diff --git a/packages/bcp/types/cosmwasmconnection.d.ts b/packages/bcp/types/cosmwasmconnection.d.ts index a141324d..2cf15164 100644 --- a/packages/bcp/types/cosmwasmconnection.d.ts +++ b/packages/bcp/types/cosmwasmconnection.d.ts @@ -81,9 +81,10 @@ export declare class CosmWasmConnection implements BlockchainConnection { tags, }: TransactionQuery): Promise | FailedTransaction)[]>; listenTx(_query: TransactionQuery): Stream | FailedTransaction>; - liveTx(_query: TransactionQuery): Stream | FailedTransaction>; + liveTx(query: TransactionQuery): Stream | FailedTransaction>; getFeeQuote(tx: UnsignedTransaction): Promise; withDefaultFee(tx: T): Promise; private parseAndPopulateTxResponseUnsigned; private parseAndPopulateTxResponseSigned; + private waitForTransaction; }