Batch RPC requests 2 (#1303)
* WIP Batch RPC requests * Lint warnings * Match responses by ID * Review Comments * Lint * Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * Update packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * review updates * lint * check falsy values * CHANGELOG & export * Update CHANGELOG.md Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * Update CHANGELOG.md Co-authored-by: Simon Warta <2603011+webmaster128@users.noreply.github.com> * yarn format-text * Fix linter issues * Test batchSizeLimit for safe integer and improve error message * Move changelog to Unreleased section Co-authored-by: codehans <94654388+codehans@users.noreply.github.com>
This commit is contained in:
parent
bba7780de0
commit
55ca044b60
@ -6,6 +6,11 @@ and this project adheres to
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- @cosmjs/tendermint-rpc: Add `HttpBatchClient`, which implements `RpcClient`,
|
||||
supporting batch RPC requests ([#1300]).
|
||||
|
||||
## [0.29.2] - 2022-10-13
|
||||
|
||||
### Added
|
||||
@ -19,6 +24,8 @@ and this project adheres to
|
||||
- @cosmjs/stargate: Add missing `{is,}MsgBeginRedelegateEncodeObject`,
|
||||
`{is,MsgCreateValidatorEncodeObject}` and `{is,MsgEditValidatorEncodeObject}`.
|
||||
|
||||
[#1300]: https://github.com/cosmos/cosmjs/pull/1300
|
||||
|
||||
### Fixed
|
||||
|
||||
- @cosmjs/cosmwasm-stargate: Use type `JsonObject = any` for smart query
|
||||
|
||||
@ -0,0 +1,91 @@
|
||||
/* eslint-disable @typescript-eslint/naming-convention */
|
||||
import { createJsonRpcRequest } from "../jsonrpc";
|
||||
import { defaultInstance } from "../testutil.spec";
|
||||
import { HttpBatchClient } from "./httpbatchclient";
|
||||
import { http } from "./httpclient";
|
||||
|
||||
function pendingWithoutTendermint(): void {
|
||||
if (!process.env.TENDERMINT_ENABLED) {
|
||||
pending("Set TENDERMINT_ENABLED to enable Tendermint RPC tests");
|
||||
}
|
||||
}
|
||||
|
||||
function pendingWithoutHttpServer(): void {
|
||||
if (!process.env.HTTPSERVER_ENABLED) {
|
||||
pending("Set HTTPSERVER_ENABLED to enable HTTP tests");
|
||||
}
|
||||
}
|
||||
|
||||
const tendermintUrl = defaultInstance.url;
|
||||
const echoUrl = "http://localhost:5555/echo_headers";
|
||||
|
||||
describe("http", () => {
|
||||
it("can send a health request", async () => {
|
||||
pendingWithoutTendermint();
|
||||
const response = await http("POST", `http://${tendermintUrl}`, undefined, createJsonRpcRequest("health"));
|
||||
expect(response).toEqual(jasmine.objectContaining({ jsonrpc: "2.0" }));
|
||||
});
|
||||
|
||||
it("errors for non-open port", async () => {
|
||||
await expectAsync(
|
||||
http("POST", `http://localhost:56745`, undefined, createJsonRpcRequest("health")),
|
||||
).toBeRejectedWithError(/(ECONNREFUSED|Failed to fetch)/i);
|
||||
});
|
||||
|
||||
it("can send custom headers", async () => {
|
||||
pendingWithoutHttpServer();
|
||||
// Without custom headers
|
||||
const response1 = await http("POST", echoUrl, undefined, createJsonRpcRequest("health"));
|
||||
expect(response1).toEqual({
|
||||
request_headers: jasmine.objectContaining({
|
||||
// Basic headers from http client
|
||||
Accept: jasmine.any(String),
|
||||
"Content-Length": jasmine.any(String),
|
||||
"Content-Type": "application/json",
|
||||
Host: jasmine.any(String),
|
||||
"User-Agent": jasmine.any(String),
|
||||
}),
|
||||
});
|
||||
|
||||
// With custom headers
|
||||
const response2 = await http(
|
||||
"POST",
|
||||
echoUrl,
|
||||
{ foo: "bar123", Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz" },
|
||||
createJsonRpcRequest("health"),
|
||||
);
|
||||
expect(response2).toEqual({
|
||||
request_headers: jasmine.objectContaining({
|
||||
// Basic headers from http client
|
||||
"Content-Length": jasmine.any(String),
|
||||
"Content-Type": "application/json",
|
||||
Host: jasmine.any(String),
|
||||
"User-Agent": jasmine.any(String),
|
||||
// Custom headers
|
||||
foo: "bar123",
|
||||
Authorization: "Basic Z3Vlc3Q6bm9QYXNzMTIz",
|
||||
}),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("HttpBatchClient", () => {
|
||||
it("can make a simple call", async () => {
|
||||
pendingWithoutTendermint();
|
||||
const client = new HttpBatchClient(tendermintUrl);
|
||||
|
||||
const healthResponse = await client.execute(createJsonRpcRequest("health"));
|
||||
expect(healthResponse.result).toEqual({});
|
||||
|
||||
const statusResponse = await client.execute(createJsonRpcRequest("status"));
|
||||
expect(statusResponse.result).toBeTruthy();
|
||||
expect(statusResponse.result.node_info).toBeTruthy();
|
||||
|
||||
await client
|
||||
.execute(createJsonRpcRequest("no-such-method"))
|
||||
.then(() => fail("must not resolve"))
|
||||
.catch((error) => expect(error).toBeTruthy());
|
||||
|
||||
client.disconnect();
|
||||
});
|
||||
});
|
||||
93
packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts
Normal file
93
packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts
Normal file
@ -0,0 +1,93 @@
|
||||
import {
|
||||
isJsonRpcErrorResponse,
|
||||
JsonRpcRequest,
|
||||
JsonRpcSuccessResponse,
|
||||
parseJsonRpcResponse,
|
||||
} from "@cosmjs/json-rpc";
|
||||
|
||||
import { http, HttpEndpoint } from "./httpclient";
|
||||
import { hasProtocol, RpcClient } from "./rpcclient";
|
||||
|
||||
export interface HttpBatchClientOptions {
|
||||
dispatchInterval: number;
|
||||
batchSizeLimit: number;
|
||||
}
|
||||
|
||||
export const defaultHttpBatchClientOptions: HttpBatchClientOptions = {
|
||||
dispatchInterval: 20,
|
||||
batchSizeLimit: 20,
|
||||
};
|
||||
|
||||
export class HttpBatchClient implements RpcClient {
|
||||
protected readonly url: string;
|
||||
protected readonly headers: Record<string, string> | undefined;
|
||||
protected readonly options: HttpBatchClientOptions;
|
||||
private timer?: NodeJS.Timer;
|
||||
|
||||
private readonly queue: Array<{
|
||||
request: JsonRpcRequest;
|
||||
resolve: (a: JsonRpcSuccessResponse) => void;
|
||||
reject: (a: Error) => void;
|
||||
}> = [];
|
||||
|
||||
public constructor(
|
||||
endpoint: string | HttpEndpoint,
|
||||
options: HttpBatchClientOptions = defaultHttpBatchClientOptions,
|
||||
) {
|
||||
this.options = options;
|
||||
if (typeof endpoint === "string") {
|
||||
// accept host.name:port and assume http protocol
|
||||
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
|
||||
} else {
|
||||
this.url = endpoint.url;
|
||||
this.headers = endpoint.headers;
|
||||
}
|
||||
this.timer = setInterval(() => this.tick(), options.dispatchInterval);
|
||||
this.validate();
|
||||
}
|
||||
|
||||
public disconnect(): void {
|
||||
this.timer && clearInterval(this.timer);
|
||||
this.timer = undefined;
|
||||
}
|
||||
|
||||
public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.queue.push({ request, resolve, reject });
|
||||
});
|
||||
}
|
||||
|
||||
private validate(): void {
|
||||
if (
|
||||
!this.options.batchSizeLimit ||
|
||||
!Number.isSafeInteger(this.options.batchSizeLimit) ||
|
||||
this.options.batchSizeLimit < 1
|
||||
) {
|
||||
throw new Error("batchSizeLimit must be a safe integer >= 1");
|
||||
}
|
||||
}
|
||||
|
||||
private async tick(): Promise<void> {
|
||||
// Avoid race conditions
|
||||
const queue = this.queue.splice(0, this.options.batchSizeLimit);
|
||||
|
||||
if (!queue.length) return;
|
||||
|
||||
const request = queue.map((s) => s.request);
|
||||
const raw = await http("POST", this.url, this.headers, request);
|
||||
// Requests with a single entry return as an object
|
||||
const arr = Array.isArray(raw) ? raw : [raw];
|
||||
|
||||
arr.forEach((el) => {
|
||||
const req = queue.find((s) => s.request.id === el.id);
|
||||
if (!req) return;
|
||||
const { reject, resolve } = req;
|
||||
const response = parseJsonRpcResponse(el);
|
||||
if (isJsonRpcErrorResponse(response)) {
|
||||
reject(new Error(JSON.stringify(response.error)));
|
||||
} else {
|
||||
resolve(response);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
// This folder contains Tendermint-specific RPC clients
|
||||
|
||||
export { defaultHttpBatchClientOptions, HttpBatchClient } from "./httpbatchclient";
|
||||
export { HttpClient, HttpEndpoint } from "./httpclient";
|
||||
export { instanceOfRpcStreamingClient, RpcClient, RpcStreamingClient, SubscriptionEvent } from "./rpcclient";
|
||||
export { WebsocketClient } from "./websocketclient";
|
||||
|
||||
Loading…
Reference in New Issue
Block a user