Implement liveTx

This commit is contained in:
Simon Warta 2020-02-18 12:48:09 +01:00
parent 3f7567bdb9
commit 1f0b6845c8
3 changed files with 276 additions and 4 deletions

View File

@ -5,6 +5,7 @@ import {
Algorithm,
Amount,
ChainId,
ConfirmedTransaction,
isBlockInfoPending,
isBlockInfoSucceeded,
isConfirmedTransaction,
@ -14,6 +15,7 @@ import {
TokenTicker,
TransactionId,
TransactionState,
UnsignedTransaction,
} from "@iov/bcp";
import { Random, Secp256k1, Secp256k1Signature, Sha256 } from "@iov/crypto";
import { Bech32, Encoding } from "@iov/encoding";
@ -795,6 +797,190 @@ describe("CosmWasmConnection", () => {
});
});
describe("liveTx", () => {
it("can listen to transactions by recipient address (transactions in history and updates)", done => {
pendingWithoutWasmd();
(async () => {
const connection = await CosmWasmConnection.establish(httpUrl, defaultPrefix, defaultConfig);
const profile = new UserProfile();
const wallet = profile.addWallet(Secp256k1HdWallet.fromMnemonic(faucet.mnemonic));
const sender = await profile.createIdentity(wallet.id, defaultChainId, faucet.path);
// send transactions
const recipientAddress = makeRandomAddress();
const sendA = await connection.withDefaultFee<SendTransaction>({
kind: "bcp/send",
chainId: defaultChainId,
senderPubkey: sender.pubkey,
sender: connection.codec.identityToAddress(sender),
recipient: recipientAddress,
amount: defaultAmount,
memo: `liveTx() test A ${Math.random()}`,
});
const sendB = await connection.withDefaultFee<SendTransaction>({
kind: "bcp/send",
chainId: defaultChainId,
senderPubkey: sender.pubkey,
sender: connection.codec.identityToAddress(sender),
recipient: recipientAddress,
amount: defaultAmount,
memo: `liveTx() test B ${Math.random()}`,
});
const sendC = await connection.withDefaultFee<SendTransaction>({
kind: "bcp/send",
chainId: defaultChainId,
senderPubkey: sender.pubkey,
sender: connection.codec.identityToAddress(sender),
recipient: recipientAddress,
amount: defaultAmount,
memo: `liveTx() test C ${Math.random()}`,
});
const [nonceA, nonceB, nonceC] = await connection.getNonces({ pubkey: sender.pubkey }, 3);
const signedA = await profile.signTransaction(sender, sendA, connection.codec, nonceA);
const signedB = await profile.signTransaction(sender, sendB, connection.codec, nonceB);
const signedC = await profile.signTransaction(sender, sendC, connection.codec, nonceC);
const bytesToPostA = connection.codec.bytesToPost(signedA);
const bytesToPostB = connection.codec.bytesToPost(signedB);
const bytesToPostC = connection.codec.bytesToPost(signedC);
// Post A and B. Unfortunately the REST server API does not support sending them in parallel because the sequence check fails.
const postResultA = await connection.postTx(bytesToPostA);
await postResultA.blockInfo.waitFor(info => !isBlockInfoPending(info));
const postResultB = await connection.postTx(bytesToPostB);
await postResultB.blockInfo.waitFor(info => !isBlockInfoPending(info));
// setup listener after A and B are in block
const events = new Array<ConfirmedTransaction<UnsignedTransaction>>();
const subscription = connection.liveTx({ sentFromOrTo: recipientAddress }).subscribe({
next: event => {
assert(isConfirmedTransaction(event), "Confirmed transaction expected");
events.push(event);
assert(isSendTransaction(event.transaction), "Unexpected transaction type");
expect(event.transaction.recipient).toEqual(recipientAddress);
if (events.length === 3) {
expect(events[1].height).toEqual(events[0].height + 1);
expect(events[2].height).toBeGreaterThan(events[1].height);
subscription.unsubscribe();
connection.disconnect();
done();
}
},
});
// Post C
await connection.postTx(bytesToPostC);
})().catch(done.fail);
});
it("can listen to transactions by ID (transaction in history)", done => {
pendingWithoutWasmd();
(async () => {
const connection = await CosmWasmConnection.establish(httpUrl, defaultPrefix, defaultConfig);
const profile = new UserProfile();
const wallet = profile.addWallet(Secp256k1HdWallet.fromMnemonic(faucet.mnemonic));
const sender = await profile.createIdentity(wallet.id, defaultChainId, faucet.path);
const recipientAddress = makeRandomAddress();
const send = await connection.withDefaultFee<SendTransaction>({
kind: "bcp/send",
chainId: defaultChainId,
senderPubkey: sender.pubkey,
sender: connection.codec.identityToAddress(sender),
recipient: recipientAddress,
amount: defaultAmount,
memo: `liveTx() test ${Math.random()}`,
});
const nonce = await connection.getNonce({ pubkey: sender.pubkey });
const signed = await profile.signTransaction(sender, send, connection.codec, nonce);
const bytesToPost = connection.codec.bytesToPost(signed);
const postResult = await connection.postTx(bytesToPost);
const transactionId = postResult.transactionId;
// Wait for a block
await postResult.blockInfo.waitFor(info => !isBlockInfoPending(info));
// setup listener after transaction is in block
const events = new Array<ConfirmedTransaction<UnsignedTransaction>>();
const subscription = connection.liveTx({ id: transactionId }).subscribe({
next: event => {
assert(isConfirmedTransaction(event), "Confirmed transaction expected");
events.push(event);
assert(isSendTransaction(event.transaction), "Unexpected transaction type");
expect(event.transaction.recipient).toEqual(recipientAddress);
expect(event.transactionId).toEqual(transactionId);
subscription.unsubscribe();
connection.disconnect();
done();
},
});
})().catch(done.fail);
});
it("can listen to transactions by ID (transaction in updates)", done => {
pendingWithoutWasmd();
(async () => {
const connection = await CosmWasmConnection.establish(httpUrl, defaultPrefix, defaultConfig);
const profile = new UserProfile();
const wallet = profile.addWallet(Secp256k1HdWallet.fromMnemonic(faucet.mnemonic));
const sender = await profile.createIdentity(wallet.id, defaultChainId, faucet.path);
// send transactions
const recipientAddress = makeRandomAddress();
const send = await connection.withDefaultFee<SendTransaction>({
kind: "bcp/send",
chainId: defaultChainId,
senderPubkey: sender.pubkey,
sender: connection.codec.identityToAddress(sender),
recipient: recipientAddress,
amount: defaultAmount,
memo: `liveTx() test ${Math.random()}`,
});
const nonce = await connection.getNonce({ pubkey: sender.pubkey });
const signed = await profile.signTransaction(sender, send, connection.codec, nonce);
const bytesToPost = connection.codec.bytesToPost(signed);
const postResult = await connection.postTx(bytesToPost);
const transactionId = postResult.transactionId;
// setup listener before transaction is in block
const events = new Array<ConfirmedTransaction<UnsignedTransaction>>();
const subscription = connection.liveTx({ id: transactionId }).subscribe({
next: event => {
assert(isConfirmedTransaction(event), "Confirmed transaction expected");
events.push(event);
assert(isSendTransaction(event.transaction), "Unexpected transaction type");
expect(event.transaction.recipient).toEqual(recipientAddress);
expect(event.transactionId).toEqual(transactionId);
subscription.unsubscribe();
connection.disconnect();
done();
},
});
})().catch(done.fail);
});
});
describe("integration tests", () => {
it("can send ERC20 tokens", async () => {
pendingWithoutWasmd();

View File

@ -37,7 +37,7 @@ import {
UnsignedTransaction,
} from "@iov/bcp";
import { Encoding, Uint53 } from "@iov/encoding";
import { DefaultValueProducer, ValueAndUpdates } from "@iov/stream";
import { concat, DefaultValueProducer, ValueAndUpdates } from "@iov/stream";
import BN from "bn.js";
import equal from "fast-deep-equal";
import { ReadonlyDate } from "readonly-date";
@ -355,9 +355,55 @@ export class CosmWasmConnection implements BlockchainConnection {
}
public liveTx(
_query: TransactionQuery,
query: TransactionQuery,
): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction> {
throw new Error("not implemented");
if ([query.height, query.signedBy, query.tags].some(isDefined)) {
throw new Error("Transaction query by height, signedBy or tags not yet supported");
}
if (query.id) {
if (query.minHeight || query.maxHeight) {
throw new Error("Query by minHeight/maxHeight not supported together with ID");
}
// concat never() because we want non-completing streams consistently
return concat(this.waitForTransaction(query.id), Stream.never());
} else if (query.sentFromOrTo) {
let pollInternal: NodeJS.Timeout | undefined;
const producer: Producer<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction> = {
start: async listener => {
let minHeight = query.minHeight || 0;
const maxHeight = query.maxHeight || Number.MAX_SAFE_INTEGER;
const poll = async (): Promise<void> => {
const result = await this.searchTx({
sentFromOrTo: query.sentFromOrTo,
minHeight: minHeight,
maxHeight: maxHeight,
});
for (const item of result) {
listener.next(item);
if (item.height >= minHeight) {
// we assume we got all matching transactions from block `item.height` now
minHeight = item.height + 1;
}
}
};
await poll();
pollInternal = setInterval(poll, defaultPollInterval);
},
stop: () => {
if (pollInternal) {
clearInterval(pollInternal);
pollInternal = undefined;
}
},
};
return Stream.create(producer);
} else {
throw new Error("Unsupported query.");
}
}
public async getFeeQuote(tx: UnsignedTransaction): Promise<Fee> {
@ -433,4 +479,43 @@ export class CosmWasmConnection implements BlockchainConnection {
this.erc20Tokens,
);
}
private waitForTransaction(
id: TransactionId,
): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction> {
let pollInternal: NodeJS.Timeout | undefined;
const producer: Producer<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction> = {
start: listener => {
setInterval(async () => {
try {
const results = await this.searchTx({ id: id });
switch (results.length) {
case 0:
// okay, we'll try again
break;
case 1:
listener.next(results[0]);
listener.complete();
break;
default:
throw new Error(`Got unexpected number of search results: ${results.length}`);
}
} catch (error) {
if (pollInternal) {
clearTimeout(pollInternal);
pollInternal = undefined;
}
listener.error(error);
}
}, defaultPollInterval);
},
stop: () => {
if (pollInternal) {
clearTimeout(pollInternal);
pollInternal = undefined;
}
},
};
return Stream.create(producer);
}
}

View File

@ -81,9 +81,10 @@ export declare class CosmWasmConnection implements BlockchainConnection {
tags,
}: TransactionQuery): Promise<readonly (ConfirmedTransaction<UnsignedTransaction> | FailedTransaction)[]>;
listenTx(_query: TransactionQuery): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction>;
liveTx(_query: TransactionQuery): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction>;
liveTx(query: TransactionQuery): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction>;
getFeeQuote(tx: UnsignedTransaction): Promise<Fee>;
withDefaultFee<T extends UnsignedTransaction>(tx: T): Promise<T>;
private parseAndPopulateTxResponseUnsigned;
private parseAndPopulateTxResponseSigned;
private waitForTransaction;
}