diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d7fabb..0125474b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,20 @@ and this project adheres to ## [Unreleased] +### Added + +- @cosmjs/tendermint-rpc: The options in the `HttpBatchClient` constructor are + now of type `Partial`, allowing you to set only the + fields you want to change ([#1309]). +- @cosmjs/tendermint-rpc: Add missing exports `HttpBatchClient`, + `HttpBatchClientOptions`, `RpcClient` ([#1311]). +- @cosmjs/tendermint-rpc: Send batch immediately when full in `HttpBatchClient` + ([#1310]). + +[#1309]: https://github.com/cosmos/cosmjs/issues/1309 +[#1310]: https://github.com/cosmos/cosmjs/issues/1310 +[#1311]: https://github.com/cosmos/cosmjs/issues/1311 + ### Fixed - @cosmjs/cosmwasm-stargate: Fix `ContractCodeHistory` decoding when msg diff --git a/packages/tendermint-rpc/src/index.ts b/packages/tendermint-rpc/src/index.ts index 8e307862..a1556d15 100644 --- a/packages/tendermint-rpc/src/index.ts +++ b/packages/tendermint-rpc/src/index.ts @@ -12,11 +12,17 @@ export { toRfc3339WithNanoseconds, toSeconds, } from "./dates"; +// The public Tendermint34Client.create constructor allows manually choosing an RpcClient. +// This is currently the only way to switch to the HttpBatchClient (which may become default at some point). +// Due to this API, we make RPC client implementations public. export { - // This type is part of the Tendermint34Client.connect API - HttpEndpoint, + HttpBatchClient, + HttpBatchClientOptions, + HttpClient, + HttpEndpoint, // This type is part of the Tendermint34Client.connect API + RpcClient, // Interface type in Tendermint34Client.create + WebsocketClient, } from "./rpcclients"; -export { HttpClient, WebsocketClient } from "./rpcclients"; // TODO: Why do we export those outside of this package? export { AbciInfoRequest, AbciInfoResponse, diff --git a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts index d242f84f..b05c3e56 100644 --- a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts +++ b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts @@ -30,4 +30,23 @@ describe("HttpBatchClient", () => { client.disconnect(); }); + + it("dispatches requests as soon as batch size limit is reached", async () => { + pendingWithoutTendermint(); + const client = new HttpBatchClient(tendermintUrl, { + dispatchInterval: 3600_000 /* 1h to make test time out if this is not working */, + batchSizeLimit: 3, + }); + + const healthResponse = await Promise.all([ + client.execute(createJsonRpcRequest("health")), + client.execute(createJsonRpcRequest("health")), + client.execute(createJsonRpcRequest("health")), + ]); + expect(healthResponse[0].result).toEqual({}); + expect(healthResponse[1].result).toEqual({}); + expect(healthResponse[2].result).toEqual({}); + + client.disconnect(); + }); }); diff --git a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts index f0bafc96..4b8e4fe6 100644 --- a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts +++ b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts @@ -10,11 +10,16 @@ import { HttpEndpoint } from "./httpclient"; import { hasProtocol, RpcClient } from "./rpcclient"; export interface HttpBatchClientOptions { + /** Interval for dispatching batches (in milliseconds) */ dispatchInterval: number; + /** Max number of items sent in one request */ batchSizeLimit: number; } -export const defaultHttpBatchClientOptions: HttpBatchClientOptions = { +// Those values are private and can change any time. +// Does a user need to know them? I don't think so. You either set +// a custom value or leave the option field unset. +const defaultHttpBatchClientOptions: HttpBatchClientOptions = { dispatchInterval: 20, batchSizeLimit: 20, }; @@ -31,11 +36,11 @@ export class HttpBatchClient implements RpcClient { reject: (a: Error) => void; }> = []; - public constructor( - endpoint: string | HttpEndpoint, - options: HttpBatchClientOptions = defaultHttpBatchClientOptions, - ) { - this.options = options; + public constructor(endpoint: string | HttpEndpoint, options: Partial = {}) { + this.options = { + batchSizeLimit: options.batchSizeLimit ?? defaultHttpBatchClientOptions.batchSizeLimit, + dispatchInterval: options.dispatchInterval ?? defaultHttpBatchClientOptions.dispatchInterval, + }; if (typeof endpoint === "string") { // accept host.name:port and assume http protocol this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint; @@ -55,6 +60,11 @@ export class HttpBatchClient implements RpcClient { public async execute(request: JsonRpcRequest): Promise { return new Promise((resolve, reject) => { this.queue.push({ request, resolve, reject }); + + if (this.queue.length >= this.options.batchSizeLimit) { + // this train is full, let's go + this.tick(); + } }); } @@ -68,27 +78,43 @@ export class HttpBatchClient implements RpcClient { } } - private async tick(): Promise { + /** + * This is called in an interval where promise rejections cannot be handled. + * So this is not async and HTTP errors need to be handled by the queued promises. + */ + private tick(): void { // Avoid race conditions - const queue = this.queue.splice(0, this.options.batchSizeLimit); + const batch = this.queue.splice(0, this.options.batchSizeLimit); - if (!queue.length) return; + if (!batch.length) return; - const request = queue.map((s) => s.request); - const raw = await http("POST", this.url, this.headers, request); - // Requests with a single entry return as an object - const arr = Array.isArray(raw) ? raw : [raw]; + const requests = batch.map((s) => s.request); + const requestIds = requests.map((request) => request.id); - arr.forEach((el) => { - const req = queue.find((s) => s.request.id === el.id); - if (!req) return; - const { reject, resolve } = req; - const response = parseJsonRpcResponse(el); - if (isJsonRpcErrorResponse(response)) { - reject(new Error(JSON.stringify(response.error))); - } else { - resolve(response); - } - }); + http("POST", this.url, this.headers, requests).then( + (raw) => { + // Requests with a single entry return as an object + const arr = Array.isArray(raw) ? raw : [raw]; + + arr.forEach((el) => { + const req = batch.find((s) => s.request.id === el.id); + if (!req) return; + const { reject, resolve } = req; + const response = parseJsonRpcResponse(el); + if (isJsonRpcErrorResponse(response)) { + reject(new Error(JSON.stringify(response.error))); + } else { + resolve(response); + } + }); + }, + (error) => { + for (const requestId of requestIds) { + const req = batch.find((s) => s.request.id === requestId); + if (!req) return; + req.reject(error); + } + }, + ); } } diff --git a/packages/tendermint-rpc/src/rpcclients/index.ts b/packages/tendermint-rpc/src/rpcclients/index.ts index cf7c1a70..ac89b9bb 100644 --- a/packages/tendermint-rpc/src/rpcclients/index.ts +++ b/packages/tendermint-rpc/src/rpcclients/index.ts @@ -1,6 +1,6 @@ // This folder contains Tendermint-specific RPC clients -export { defaultHttpBatchClientOptions, HttpBatchClient } from "./httpbatchclient"; +export { HttpBatchClient, HttpBatchClientOptions } from "./httpbatchclient"; export { HttpClient, HttpEndpoint } from "./httpclient"; export { instanceOfRpcStreamingClient, RpcClient, RpcStreamingClient, SubscriptionEvent } from "./rpcclient"; export { WebsocketClient } from "./websocketclient";