* socket: Fork @iov/socket * socket: Remove nonces * socket: Update package.json * socket: Update README * socket: Remove tslint * socket: Fix lint warnings * scripts: Fork socketserver from IOV Core * root: Update NOTICE for socket * tendermint-rpc: Replace @iov/socket dependency with @cosmjs/socket * root: Update CI config for tendermint/socket * scripts: Add wait to tendermint start script * socket: Add coverage
64 lines
1.7 KiB
TypeScript
64 lines
1.7 KiB
TypeScript
import { Listener, Producer, Stream } from "xstream";
|
|
|
|
import { SocketWrapper, SocketWrapperMessageEvent } from "./socketwrapper";
|
|
|
|
/**
|
|
* A WebSocket wrapper that exposes all events as a stream.
|
|
*
|
|
* This underlying socket will not be closed when the stream has no listeners
|
|
*/
|
|
export class StreamingSocket {
|
|
public readonly connected: Promise<void>;
|
|
public readonly events: Stream<SocketWrapperMessageEvent>;
|
|
private eventProducerListener: Listener<SocketWrapperMessageEvent> | undefined;
|
|
private readonly socket: SocketWrapper;
|
|
|
|
public constructor(url: string, timeout = 10_000) {
|
|
this.socket = new SocketWrapper(
|
|
url,
|
|
(event) => {
|
|
if (this.eventProducerListener) {
|
|
this.eventProducerListener.next(event);
|
|
}
|
|
},
|
|
(errorEvent) => {
|
|
if (this.eventProducerListener) {
|
|
this.eventProducerListener.error(errorEvent);
|
|
}
|
|
},
|
|
() => {
|
|
// socket opened
|
|
},
|
|
(closeEvent) => {
|
|
if (this.eventProducerListener) {
|
|
if (closeEvent.wasClean) {
|
|
this.eventProducerListener.complete();
|
|
} else {
|
|
this.eventProducerListener.error("Socket was closed unclean");
|
|
}
|
|
}
|
|
},
|
|
timeout,
|
|
);
|
|
this.connected = this.socket.connected;
|
|
|
|
const eventProducer: Producer<any> = {
|
|
start: (listener) => (this.eventProducerListener = listener),
|
|
stop: () => (this.eventProducerListener = undefined),
|
|
};
|
|
this.events = Stream.create(eventProducer);
|
|
}
|
|
|
|
public connect(): void {
|
|
this.socket.connect();
|
|
}
|
|
|
|
public disconnect(): void {
|
|
this.socket.disconnect();
|
|
}
|
|
|
|
public async send(data: string): Promise<void> {
|
|
return this.socket.send(data);
|
|
}
|
|
}
|