cosmjs-util/packages/socket/src/queueingstreamingsocket.spec.ts
Will Clark 200a0f7fe6
Fork @iov/socket (#250)
* socket: Fork @iov/socket

* socket: Remove nonces

* socket: Update package.json

* socket: Update README

* socket: Remove tslint

* socket: Fix lint warnings

* scripts: Fork socketserver from IOV Core

* root: Update NOTICE for socket

* tendermint-rpc: Replace @iov/socket dependency with @cosmjs/socket

* root: Update CI config for tendermint/socket

* scripts: Add wait to tendermint start script

* socket: Add coverage
2020-06-25 16:01:41 +02:00

156 lines
4.7 KiB
TypeScript

import { ConnectionStatus, QueueingStreamingSocket } from "./queueingstreamingsocket";
function pendingWithoutSocketServer(): void {
if (!process.env.SOCKETSERVER_ENABLED) {
pending("Set SOCKETSERVER_ENABLED to enable socket tests");
}
}
describe("QueueingStreamingSocket", () => {
const socketServerUrl = "ws://localhost:4444/websocket";
it("can be constructed", () => {
const socket = new QueueingStreamingSocket(socketServerUrl);
expect(socket).toBeTruthy();
});
describe("queueRequest", () => {
it("can queue and process requests with a connection", (done) => {
pendingWithoutSocketServer();
const socket = new QueueingStreamingSocket(socketServerUrl);
const requests = ["request 1", "request 2", "request 3"] as const;
let eventsSeen = 0;
socket.events.subscribe({
next: (event) => {
expect(event.data).toEqual(requests[eventsSeen++]);
if (eventsSeen === requests.length) {
expect(socket.getQueueLength()).toEqual(0);
socket.disconnect();
done();
}
},
});
socket.connect();
requests.forEach((request) => socket.queueRequest(request));
});
it("can queue requests without a connection and process them later", (done) => {
pendingWithoutSocketServer();
const socket = new QueueingStreamingSocket(socketServerUrl);
const requests = ["request 1", "request 2", "request 3"] as const;
let eventsSeen = 0;
socket.events.subscribe({
next: (event) => {
expect(event.data).toEqual(requests[eventsSeen++]);
if (eventsSeen === requests.length) {
expect(socket.getQueueLength()).toEqual(0);
socket.disconnect();
done();
}
},
});
requests.forEach((request) => socket.queueRequest(request));
setTimeout(() => {
expect(socket.getQueueLength()).toEqual(3);
socket.connect();
}, 5_000);
});
});
describe("reconnect", () => {
it("does not emit a completed event when disconnected", (done) => {
pendingWithoutSocketServer();
const request = "request";
const socket = new QueueingStreamingSocket(socketServerUrl);
socket.events.subscribe({
next: ({ data }) => {
if (data === request) {
socket.disconnect();
done();
}
},
complete: () => done.fail("Stream completed"),
});
socket.connect();
socket.disconnect();
socket.reconnect();
socket.queueRequest(request);
});
it("can reconnect and process remaining queue", (done) => {
pendingWithoutSocketServer();
const socket = new QueueingStreamingSocket(socketServerUrl);
const requests = ["request 1", "request 2", "request 3"] as const;
let eventsSeen = 0;
socket.connect();
socket.disconnect();
requests.forEach((request) => socket.queueRequest(request));
socket.events.subscribe({
next: (event) => {
expect(event.data).toEqual(requests[eventsSeen++]);
if (eventsSeen === requests.length) {
expect(socket.getQueueLength()).toEqual(0);
socket.disconnect();
done();
}
},
});
socket.reconnect();
});
it("notifies on reconnection via a callback", (done) => {
pendingWithoutSocketServer();
const socket = new QueueingStreamingSocket(socketServerUrl, undefined, done);
socket.reconnect();
});
});
describe("connectionStatus", () => {
it("exposes connection status", (done) => {
pendingWithoutSocketServer();
const socket = new QueueingStreamingSocket(socketServerUrl);
let statusChangesSeen = 0;
socket.connectionStatus.updates.subscribe({
next: (status) => {
switch (statusChangesSeen++) {
case 0:
expect(status).toEqual(ConnectionStatus.Unconnected);
break;
case 1:
case 4:
expect(status).toEqual(ConnectionStatus.Connecting);
break;
case 2:
case 5:
expect(status).toEqual(ConnectionStatus.Connected);
break;
case 3:
case 6:
expect(status).toEqual(ConnectionStatus.Disconnected);
break;
default:
done.fail("Got too many status changes");
}
if (statusChangesSeen === 7) {
done();
}
},
});
socket.connect();
setTimeout(() => {
socket.disconnect();
socket.reconnect();
setTimeout(() => socket.disconnect(), 1000);
}, 1000);
});
});
});