cosmjs-util/packages/socket/src/queueingstreamingsocket.ts
Will Clark 200a0f7fe6
Fork @iov/socket (#250)
* socket: Fork @iov/socket

* socket: Remove nonces

* socket: Update package.json

* socket: Update README

* socket: Remove tslint

* socket: Fix lint warnings

* scripts: Fork socketserver from IOV Core

* root: Update NOTICE for socket

* tendermint-rpc: Replace @iov/socket dependency with @cosmjs/socket

* root: Update CI config for tendermint/socket

* scripts: Add wait to tendermint start script

* socket: Add coverage
2020-06-25 16:01:41 +02:00

119 lines
3.9 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { DefaultValueProducer, ValueAndUpdates } from "@iov/stream";
import { Listener, Producer, Stream } from "xstream";
import { SocketWrapperMessageEvent } from "./socketwrapper";
import { StreamingSocket } from "./streamingsocket";
export enum ConnectionStatus {
Unconnected,
Connecting,
Connected,
Disconnected,
}
/**
* A wrapper around StreamingSocket that can queue requests.
*/
export class QueueingStreamingSocket {
public readonly connectionStatus: ValueAndUpdates<ConnectionStatus>;
public readonly events: Stream<SocketWrapperMessageEvent>;
private readonly url: string;
private readonly timeout: number;
private readonly queue: string[] = [];
private socket: StreamingSocket;
private isProcessingQueue = false;
private eventProducerListener: Listener<SocketWrapperMessageEvent> | undefined;
private readonly connectionStatusProducer: DefaultValueProducer<ConnectionStatus>;
private readonly reconnectedHandler?: () => void;
public constructor(url: string, timeout = 10_000, reconnectedHandler?: () => void) {
this.url = url;
this.timeout = timeout;
this.reconnectedHandler = reconnectedHandler;
const eventProducer: Producer<any> = {
start: (listener) => (this.eventProducerListener = listener),
stop: () => (this.eventProducerListener = undefined),
};
this.events = Stream.create(eventProducer);
this.connectionStatusProducer = new DefaultValueProducer<ConnectionStatus>(ConnectionStatus.Unconnected);
this.connectionStatus = new ValueAndUpdates(this.connectionStatusProducer);
this.socket = new StreamingSocket(this.url, this.timeout);
this.socket.events.subscribe({
next: (event) => {
if (!this.eventProducerListener) throw new Error("No event producer listener set");
this.eventProducerListener.next(event);
},
error: () => this.connectionStatusProducer.update(ConnectionStatus.Disconnected),
});
}
public connect(): void {
this.connectionStatusProducer.update(ConnectionStatus.Connecting);
this.socket.connected.then(
async () => {
this.connectionStatusProducer.update(ConnectionStatus.Connected);
return this.processQueue();
},
() => this.connectionStatusProducer.update(ConnectionStatus.Disconnected),
);
this.socket.connect();
}
public disconnect(): void {
this.connectionStatusProducer.update(ConnectionStatus.Disconnected);
this.socket.disconnect();
}
public reconnect(): void {
this.socket = new StreamingSocket(this.url, this.timeout);
this.socket.events.subscribe({
next: (event) => {
if (!this.eventProducerListener) throw new Error("No event producer listener set");
this.eventProducerListener.next(event);
},
error: () => this.connectionStatusProducer.update(ConnectionStatus.Disconnected),
});
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.socket.connected.then(() => {
if (this.reconnectedHandler) {
this.reconnectedHandler();
}
});
this.connect();
}
public getQueueLength(): number {
return this.queue.length;
}
public queueRequest(request: string): void {
this.queue.push(request);
// We dont need to wait for the queue to be processed.
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.processQueue();
}
private async processQueue(): Promise<void> {
if (this.isProcessingQueue || this.connectionStatus.value !== ConnectionStatus.Connected) {
return;
}
this.isProcessingQueue = true;
let request: string | undefined;
while ((request = this.queue.shift())) {
try {
await this.socket.send(request);
this.isProcessingQueue = false;
} catch (error) {
// Probably the connection is down; will try again automatically when reconnected.
this.queue.unshift(request);
this.isProcessingQueue = false;
return;
}
}
}
}