From 200a0f7fe67719f10ce87292fe599574127752fe Mon Sep 17 00:00:00 2001 From: Will Clark Date: Thu, 25 Jun 2020 16:01:41 +0200 Subject: [PATCH] Fork @iov/socket (#250) * socket: Fork @iov/socket * socket: Remove nonces * socket: Update package.json * socket: Update README * socket: Remove tslint * socket: Fix lint warnings * scripts: Fork socketserver from IOV Core * root: Update NOTICE for socket * tendermint-rpc: Replace @iov/socket dependency with @cosmjs/socket * root: Update CI config for tendermint/socket * scripts: Add wait to tendermint start script * socket: Add coverage --- .circleci/config.yml | 277 ++++++++++-------- NOTICE | 2 + packages/socket/.eslintignore | 1 + packages/socket/.gitignore | 3 + packages/socket/.nycrc.yml | 2 + packages/socket/README.md | 10 + packages/socket/jasmine-testrunner.js | 32 ++ packages/socket/karma.conf.js | 47 +++ packages/socket/nonces/README.txt | 1 + packages/socket/package.json | 55 ++++ packages/socket/src/index.ts | 9 + .../src/queueingstreamingsocket.spec.ts | 155 ++++++++++ .../socket/src/queueingstreamingsocket.ts | 118 ++++++++ .../socket/src/reconnectingsocket.spec.ts | 194 ++++++++++++ packages/socket/src/reconnectingsocket.ts | 92 ++++++ packages/socket/src/socketwrapper.spec.ts | 248 ++++++++++++++++ packages/socket/src/socketwrapper.ts | 203 +++++++++++++ packages/socket/src/streamingsocket.spec.ts | 96 ++++++ packages/socket/src/streamingsocket.ts | 63 ++++ packages/socket/tsconfig.json | 12 + packages/socket/typedoc.js | 14 + packages/socket/types/index.d.ts | 9 + .../socket/types/queueingstreamingsocket.d.ts | 31 ++ packages/socket/types/reconnectingsocket.d.ts | 23 ++ packages/socket/types/socketwrapper.d.ts | 59 ++++ packages/socket/types/streamingsocket.d.ts | 17 ++ packages/socket/webpack.web.config.js | 19 ++ packages/tendermint-rpc/package.json | 2 +- .../src/rpcclients/websocketclient.ts | 2 +- scripts/socketserver/Dockerfile | 8 + scripts/socketserver/echo.py | 50 ++++ scripts/socketserver/start.sh | 37 +++ scripts/socketserver/stop.sh | 6 + scripts/tendermint/start.sh | 8 + yarn.lock | 17 +- 35 files changed, 1792 insertions(+), 130 deletions(-) create mode 120000 packages/socket/.eslintignore create mode 100644 packages/socket/.gitignore create mode 100644 packages/socket/.nycrc.yml create mode 100644 packages/socket/README.md create mode 100755 packages/socket/jasmine-testrunner.js create mode 100644 packages/socket/karma.conf.js create mode 100644 packages/socket/nonces/README.txt create mode 100644 packages/socket/package.json create mode 100644 packages/socket/src/index.ts create mode 100644 packages/socket/src/queueingstreamingsocket.spec.ts create mode 100644 packages/socket/src/queueingstreamingsocket.ts create mode 100644 packages/socket/src/reconnectingsocket.spec.ts create mode 100644 packages/socket/src/reconnectingsocket.ts create mode 100644 packages/socket/src/socketwrapper.spec.ts create mode 100644 packages/socket/src/socketwrapper.ts create mode 100644 packages/socket/src/streamingsocket.spec.ts create mode 100644 packages/socket/src/streamingsocket.ts create mode 100644 packages/socket/tsconfig.json create mode 100644 packages/socket/typedoc.js create mode 100644 packages/socket/types/index.d.ts create mode 100644 packages/socket/types/queueingstreamingsocket.d.ts create mode 100644 packages/socket/types/reconnectingsocket.d.ts create mode 100644 packages/socket/types/socketwrapper.d.ts create mode 100644 packages/socket/types/streamingsocket.d.ts create mode 100644 packages/socket/webpack.web.config.js create mode 100644 scripts/socketserver/Dockerfile create mode 100755 scripts/socketserver/echo.py create mode 100755 scripts/socketserver/start.sh create mode 100755 scripts/socketserver/stop.sh diff --git a/.circleci/config.yml b/.circleci/config.yml index 152dc1a1..a5d44254 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,124 +43,6 @@ jobs: paths: - packages/*/build/* test: - machine: - # We can't use a containerized environment since it requires remote docker to start custom containers. - # However, we can't access the remote docker's network from the primary container. This is a - # feature, as documented in https://circleci.com/docs/2.0/building-docker-images/#separation-of-environments - # As a consequence, we cannot use the circleci CLI for this job because "You cannot use the machine - # executor in local jobs." (https://circleci.com/docs/2.0/local-cli/#limitations-of-running-jobs-locally) - # - # Available images: https://circleci.com/docs/2.0/configuration-reference/#available-machine-images - image: ubuntu-1604:202004-01 - steps: - - checkout - - run: # start early for less wait time below - command: ./scripts/wasmd/start.sh - background: true - - attach_workspace: - at: /tmp/builds - - run: - name: Merge build folders into project (merge with hardlinks) - command: cp --recursive --link /tmp/builds/* . - - run: - # The images ubuntu-1604:201903-01 comes with preinstalled nvm, which does not work well with non-login shells - name: Uninstall nvm - command: rm -rf "$NVM_DIR" ~/.npm ~/.bower - - run: - name: Install nodejs and yarn - command: | - curl -sL https://deb.nodesource.com/setup_10.x | sudo -E bash - - curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add - - echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list - sudo apt update && sudo apt install nodejs yarn - - run: - name: Version information - command: echo "node $(node --version)"; echo "yarn $(yarn --version)" - - restore_cache: - name: Restore Yarn Package Cache - keys: - - yarn-packages-{{ checksum "yarn.lock" }} - - run: - name: Install Dependencies - command: yarn install --frozen-lockfile - - save_cache: - name: Save Yarn Package Cache - key: yarn-packages-{{ checksum "yarn.lock" }} - paths: - - ~/.cache/yarn - - run: - name: Initialize blockchain (deploy contracts and friends) - command: ./scripts/wasmd/init.sh - - run: - environment: - WASMD_ENABLED: 1 - SKIP_BUILD: 1 - command: yarn test --stream - - run: - name: Run CLI selftest - working_directory: packages/cli - environment: - SKIP_BUILD: 1 - command: yarn selftest - - run: - command: ./scripts/wasmd/stop.sh - test-chrome: - machine: - # We can't use a containerized environment since it requires remote docker to start custom containers. - # However, we can't access the remote docker's network from the primary container. This is a - # feature, as documented in https://circleci.com/docs/2.0/building-docker-images/#separation-of-environments - # As a consequence, we cannot use the circleci CLI for this job because "You cannot use the machine - # executor in local jobs." (https://circleci.com/docs/2.0/local-cli/#limitations-of-running-jobs-locally) - # - # Available images: https://circleci.com/docs/2.0/configuration-reference/#available-machine-images - image: ubuntu-1604:202004-01 - steps: - - checkout - - run: # start early for less wait time below - command: ./scripts/wasmd/start.sh - background: true - - attach_workspace: - at: /tmp/builds - - run: - name: Merge build folders into project (merge with hardlinks) - command: cp --recursive --link /tmp/builds/* . - - run: - # The images ubuntu-1604:201903-01 comes with preinstalled nvm, which does not work well with non-login shells - name: Uninstall nvm - command: rm -rf "$NVM_DIR" ~/.npm ~/.bower - - run: - name: Install nodejs and yarn - command: | - curl -sL https://deb.nodesource.com/setup_10.x | sudo -E bash - - curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add - - echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list - sudo apt update && sudo apt install nodejs yarn - - run: - name: Version information - command: echo "node $(node --version)"; echo "yarn $(yarn --version)" - - restore_cache: - name: Restore Yarn Package Cache - keys: - - yarn-packages-{{ checksum "yarn.lock" }} - - run: - name: Install Dependencies - command: yarn install --frozen-lockfile - - save_cache: - name: Save Yarn Package Cache - key: yarn-packages-{{ checksum "yarn.lock" }} - paths: - - ~/.cache/yarn - - run: - name: Initialize blockchain (deploy contracts and friends) - command: ./scripts/wasmd/init.sh - - run: - environment: - WASMD_ENABLED: 1 - SKIP_BUILD: 1 - command: yarn test-chrome - - run: - command: ./scripts/wasmd/stop.sh - coverage: machine: # We can't use a containerized environment since it requires remote docker to start custom containers. # However, we can't access the remote docker's network from the primary container. This is a @@ -176,6 +58,10 @@ jobs: name: Start wasmd command: ./scripts/wasmd/start.sh background: true + - run: + name: Start Tendermint blockchains + command: ./scripts/tendermint/all_start.sh + background: true - attach_workspace: at: /tmp/builds - run: @@ -210,9 +96,160 @@ jobs: - run: name: Initialize wasmd (deploy contracts and friends) command: ./scripts/wasmd/init.sh + - run: + name: Start socket server + command: ./scripts/socketserver/start.sh - run: environment: WASMD_ENABLED: 1 + TENDERMINT_ENABLED: 1 + SOCKETSERVER_ENABLED: 1 + SKIP_BUILD: 1 + command: yarn test --stream + - run: + name: Run CLI selftest + working_directory: packages/cli + environment: + SKIP_BUILD: 1 + command: yarn selftest + - run: + command: ./scripts/wasmd/stop.sh + - run: + command: ./scripts/tendermint/all_stop.sh + - run: + command: ./scripts/socketserver/stop.sh + test-chrome: + machine: + # We can't use a containerized environment since it requires remote docker to start custom containers. + # However, we can't access the remote docker's network from the primary container. This is a + # feature, as documented in https://circleci.com/docs/2.0/building-docker-images/#separation-of-environments + # As a consequence, we cannot use the circleci CLI for this job because "You cannot use the machine + # executor in local jobs." (https://circleci.com/docs/2.0/local-cli/#limitations-of-running-jobs-locally) + # + # Available images: https://circleci.com/docs/2.0/configuration-reference/#available-machine-images + image: ubuntu-1604:202004-01 + steps: + - checkout + - run: # start early for less wait time below + name: Start wasmd + command: ./scripts/wasmd/start.sh + background: true + - run: + name: Start Tendermint blockchains + command: ./scripts/tendermint/all_start.sh + background: true + - attach_workspace: + at: /tmp/builds + - run: + name: Merge build folders into project (merge with hardlinks) + command: cp --recursive --link /tmp/builds/* . + - run: + # The images ubuntu-1604:201903-01 comes with preinstalled nvm, which does not work well with non-login shells + name: Uninstall nvm + command: rm -rf "$NVM_DIR" ~/.npm ~/.bower + - run: + name: Install nodejs and yarn + command: | + curl -sL https://deb.nodesource.com/setup_10.x | sudo -E bash - + curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add - + echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list + sudo apt update && sudo apt install nodejs yarn + - run: + name: Version information + command: echo "node $(node --version)"; echo "yarn $(yarn --version)" + - restore_cache: + name: Restore Yarn Package Cache + keys: + - yarn-packages-{{ checksum "yarn.lock" }} + - run: + name: Install Dependencies + command: yarn install --frozen-lockfile + - save_cache: + name: Save Yarn Package Cache + key: yarn-packages-{{ checksum "yarn.lock" }} + paths: + - ~/.cache/yarn + - run: + name: Initialize wasmd (deploy contracts and friends) + command: ./scripts/wasmd/init.sh + - run: + name: Start socket server + command: ./scripts/socketserver/start.sh + - run: + environment: + WASMD_ENABLED: 1 + TENDERMINT_ENABLED: 1 + SOCKETSERVER_ENABLED: 1 + SKIP_BUILD: 1 + command: yarn test-chrome + - run: + command: ./scripts/wasmd/stop.sh + - run: + command: ./scripts/tendermint/all_stop.sh + - run: + command: ./scripts/socketserver/stop.sh + coverage: + machine: + # We can't use a containerized environment since it requires remote docker to start custom containers. + # However, we can't access the remote docker's network from the primary container. This is a + # feature, as documented in https://circleci.com/docs/2.0/building-docker-images/#separation-of-environments + # As a consequence, we cannot use the circleci CLI for this job because "You cannot use the machine + # executor in local jobs." (https://circleci.com/docs/2.0/local-cli/#limitations-of-running-jobs-locally) + # + # Available images: https://circleci.com/docs/2.0/configuration-reference/#available-machine-images + image: ubuntu-1604:202004-01 + steps: + - checkout + - run: # start early for less wait time below + name: Start wasmd + command: ./scripts/wasmd/start.sh + background: true + - run: + name: Start Tendermint blockchains + command: ./scripts/tendermint/all_start.sh + background: true + - attach_workspace: + at: /tmp/builds + - run: + name: Merge build folders into project (merge with hardlinks) + command: cp --recursive --link /tmp/builds/* . + - run: + # The images ubuntu-1604:201903-01 comes with preinstalled nvm, which does not work well with non-login shells + name: Uninstall nvm + command: rm -rf "$NVM_DIR" ~/.npm ~/.bower + - run: + name: Install nodejs and yarn + command: | + curl -sL https://deb.nodesource.com/setup_10.x | sudo -E bash - + curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add - + echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list + sudo apt update && sudo apt install nodejs yarn + - run: + name: Version information + command: echo "node $(node --version)"; echo "yarn $(yarn --version)" + - restore_cache: + name: Restore Yarn Package Cache + keys: + - yarn-packages-{{ checksum "yarn.lock" }} + - run: + name: Install Dependencies + command: yarn install --frozen-lockfile + - save_cache: + name: Save Yarn Package Cache + key: yarn-packages-{{ checksum "yarn.lock" }} + paths: + - ~/.cache/yarn + - run: + name: Initialize wasmd (deploy contracts and friends) + command: ./scripts/wasmd/init.sh + - run: + name: Start socket server + command: ./scripts/socketserver/start.sh + - run: + environment: + WASMD_ENABLED: 1 + TENDERMINT_ENABLED: 1 + SOCKETSERVER_ENABLED: 1 SKIP_BUILD: 1 command: yarn coverage - run: @@ -220,6 +257,10 @@ jobs: command: npx codecov - run: command: ./scripts/wasmd/stop.sh + - run: + command: ./scripts/tendermint/all_stop.sh + - run: + command: ./scripts/socketserver/stop.sh lint: docker: - image: circleci/node:10 diff --git a/NOTICE b/NOTICE index 4e23c6ce..c030dd7a 100644 --- a/NOTICE +++ b/NOTICE @@ -21,6 +21,8 @@ The code in packages/tendermint-rpc and scripts/tendermint was forked from the f The code in packages/json-rpc was forked from https://github.com/iov-one/iov-core/tree/v2.5.0/packages/iov-jsonrpc, with additional code from https://github.com/iov-one/iov-core/tree/v2.5.0/packages/iov-encoding on 2020-06-24. +The code in packages/socket and scripts/socketserver was forked from the folders packages/iov-socket and scripts/socketserver respectively of https://github.com/iov-one/iov-core at tag v2.5.0 on 2020-06-24. + Copyright 2018-2020 IOV SAS Copyright 2020 Confio UO Copyright 2020 Simon Warta diff --git a/packages/socket/.eslintignore b/packages/socket/.eslintignore new file mode 120000 index 00000000..86039baf --- /dev/null +++ b/packages/socket/.eslintignore @@ -0,0 +1 @@ +../../.eslintignore \ No newline at end of file diff --git a/packages/socket/.gitignore b/packages/socket/.gitignore new file mode 100644 index 00000000..68bf3735 --- /dev/null +++ b/packages/socket/.gitignore @@ -0,0 +1,3 @@ +build/ +dist/ +docs/ diff --git a/packages/socket/.nycrc.yml b/packages/socket/.nycrc.yml new file mode 100644 index 00000000..26e92b5a --- /dev/null +++ b/packages/socket/.nycrc.yml @@ -0,0 +1,2 @@ +extends: "@istanbuljs/nyc-config-typescript" +include: [build/**, 'src/**'] diff --git a/packages/socket/README.md b/packages/socket/README.md new file mode 100644 index 00000000..114e32b3 --- /dev/null +++ b/packages/socket/README.md @@ -0,0 +1,10 @@ +# @cosmjs/socket + +[![npm version](https://img.shields.io/npm/v/@cosmjs/socket.svg)](https://www.npmjs.com/package/@cosmjs/socket) + +@cosmjs/socket is a collection of helper methods and classes for working with WebSockets. + +## License + +This package is part of the cosmjs repository, licensed under the Apache License 2.0 +(see [NOTICE](https://github.com/CosmWasm/cosmjs/blob/master/NOTICE) and [LICENSE](https://github.com/CosmWasm/cosmjs/blob/master/LICENSE)). diff --git a/packages/socket/jasmine-testrunner.js b/packages/socket/jasmine-testrunner.js new file mode 100755 index 00000000..a04dbed9 --- /dev/null +++ b/packages/socket/jasmine-testrunner.js @@ -0,0 +1,32 @@ +#!/usr/bin/env node + +require("source-map-support").install(); +const defaultSpecReporterConfig = require("../../jasmine-spec-reporter.config.json"); + +// setup Jasmine +const Jasmine = require("jasmine"); +const jasmine = new Jasmine(); +jasmine.loadConfig({ + spec_dir: "build", + spec_files: ["**/*.spec.js"], + helpers: [], + random: false, + seed: null, + stopSpecOnExpectationFailure: false, +}); +jasmine.jasmine.DEFAULT_TIMEOUT_INTERVAL = 15 * 1000; + +// setup reporter +const { SpecReporter } = require("jasmine-spec-reporter"); +const reporter = new SpecReporter({ + ...defaultSpecReporterConfig, + spec: { + ...defaultSpecReporterConfig.spec, + displaySuccessful: !process.argv.includes("--quiet"), + }, +}); + +// initialize and execute +jasmine.env.clearReporters(); +jasmine.addReporter(reporter); +jasmine.execute(); diff --git a/packages/socket/karma.conf.js b/packages/socket/karma.conf.js new file mode 100644 index 00000000..006da5fe --- /dev/null +++ b/packages/socket/karma.conf.js @@ -0,0 +1,47 @@ +module.exports = function (config) { + config.set({ + // base path that will be used to resolve all patterns (eg. files, exclude) + basePath: ".", + + // frameworks to use + // available frameworks: https://npmjs.org/browse/keyword/karma-adapter + frameworks: ["jasmine"], + + // list of files / patterns to load in the browser + files: ["dist/web/tests.js"], + + client: { + jasmine: { + random: false, + timeoutInterval: 15000, + }, + }, + + // test results reporter to use + // possible values: 'dots', 'progress' + // available reporters: https://npmjs.org/browse/keyword/karma-reporter + reporters: ["progress", "kjhtml"], + + // web server port + port: 9876, + + // enable / disable colors in the output (reporters and logs) + colors: true, + + // level of logging + // possible values: config.LOG_DISABLE || config.LOG_ERROR || config.LOG_WARN || config.LOG_INFO || config.LOG_DEBUG + logLevel: config.LOG_INFO, + + // enable / disable watching file and executing tests whenever any file changes + autoWatch: false, + + // start these browsers + // available browser launchers: https://npmjs.org/browse/keyword/karma-launcher + browsers: ["Firefox"], + + browserNoActivityTimeout: 90000, + + // Keep brower open for debugging. This is overridden by yarn scripts + singleRun: false, + }); +}; diff --git a/packages/socket/nonces/README.txt b/packages/socket/nonces/README.txt new file mode 100644 index 00000000..092fe732 --- /dev/null +++ b/packages/socket/nonces/README.txt @@ -0,0 +1 @@ +Directory used to trigger lerna package updates for all packages diff --git a/packages/socket/package.json b/packages/socket/package.json new file mode 100644 index 00000000..ed08b727 --- /dev/null +++ b/packages/socket/package.json @@ -0,0 +1,55 @@ +{ + "name": "@cosmjs/socket", + "version": "0.20.0", + "description": "Utility functions for working with WebSockets", + "contributors": [ + "IOV SAS ", + "Confio UO ", + "Will Clark " + ], + "license": "Apache-2.0", + "main": "build/index.js", + "types": "types/index.d.ts", + "files": [ + "build/", + "types/", + "*.md", + "!*.spec.*", + "!**/testdata/" + ], + "repository": { + "type": "git", + "url": "https://github.com/CosmWasm/cosmjs/tree/master/packages/socket" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "docs": "shx rm -rf docs && typedoc --options typedoc.js", + "lint": "eslint --max-warnings 0 \"**/*.{js,ts}\"", + "lint-fix": "eslint --max-warnings 0 \"**/*.{js,ts}\" --fix", + "format": "prettier --write --loglevel warn \"./src/**/*.ts\"", + "format-text": "prettier --write --prose-wrap always --print-width 80 \"./*.md\"", + "test-node": "node jasmine-testrunner.js", + "test-edge": "yarn pack-web && karma start --single-run --browsers Edge", + "test-firefox": "yarn pack-web && karma start --single-run --browsers Firefox", + "test-chrome": "yarn pack-web && karma start --single-run --browsers ChromeHeadless", + "test-safari": "yarn pack-web && karma start --single-run --browsers Safari", + "test": "yarn build-or-skip && yarn test-node", + "coverage": "nyc --reporter=text --reporter=lcov yarn test --quiet", + "move-types": "shx rm -r ./types/* && shx mv build/types/* ./types && rm -rf ./types/testdata && shx rm -f ./types/*.spec.d.ts", + "format-types": "prettier --write --loglevel warn \"./types/**/*.d.ts\"", + "build": "shx rm -rf ./build && tsc && yarn move-types && yarn format-types", + "build-or-skip": "[ -n \"$SKIP_BUILD\" ] || yarn build", + "pack-web": "yarn build-or-skip && webpack --mode development --config webpack.web.config.js" + }, + "dependencies": { + "@iov/stream": "^2.3.2", + "isomorphic-ws": "^4.0.1", + "ws": "^6.2.0", + "xstream": "^11.10.0" + }, + "devDependencies": { + "@types/ws": "^6.0.1" + } +} diff --git a/packages/socket/src/index.ts b/packages/socket/src/index.ts new file mode 100644 index 00000000..25f0df12 --- /dev/null +++ b/packages/socket/src/index.ts @@ -0,0 +1,9 @@ +export { ConnectionStatus, QueueingStreamingSocket } from "./queueingstreamingsocket"; +export { ReconnectingSocket } from "./reconnectingsocket"; +export { + SocketWrapper, + SocketWrapperCloseEvent, + SocketWrapperErrorEvent, + SocketWrapperMessageEvent, +} from "./socketwrapper"; +export { StreamingSocket } from "./streamingsocket"; diff --git a/packages/socket/src/queueingstreamingsocket.spec.ts b/packages/socket/src/queueingstreamingsocket.spec.ts new file mode 100644 index 00000000..0da5a682 --- /dev/null +++ b/packages/socket/src/queueingstreamingsocket.spec.ts @@ -0,0 +1,155 @@ +import { ConnectionStatus, QueueingStreamingSocket } from "./queueingstreamingsocket"; + +function pendingWithoutSocketServer(): void { + if (!process.env.SOCKETSERVER_ENABLED) { + pending("Set SOCKETSERVER_ENABLED to enable socket tests"); + } +} + +describe("QueueingStreamingSocket", () => { + const socketServerUrl = "ws://localhost:4444/websocket"; + + it("can be constructed", () => { + const socket = new QueueingStreamingSocket(socketServerUrl); + expect(socket).toBeTruthy(); + }); + + describe("queueRequest", () => { + it("can queue and process requests with a connection", (done) => { + pendingWithoutSocketServer(); + const socket = new QueueingStreamingSocket(socketServerUrl); + const requests = ["request 1", "request 2", "request 3"] as const; + let eventsSeen = 0; + socket.events.subscribe({ + next: (event) => { + expect(event.data).toEqual(requests[eventsSeen++]); + if (eventsSeen === requests.length) { + expect(socket.getQueueLength()).toEqual(0); + socket.disconnect(); + done(); + } + }, + }); + + socket.connect(); + requests.forEach((request) => socket.queueRequest(request)); + }); + + it("can queue requests without a connection and process them later", (done) => { + pendingWithoutSocketServer(); + const socket = new QueueingStreamingSocket(socketServerUrl); + const requests = ["request 1", "request 2", "request 3"] as const; + let eventsSeen = 0; + socket.events.subscribe({ + next: (event) => { + expect(event.data).toEqual(requests[eventsSeen++]); + if (eventsSeen === requests.length) { + expect(socket.getQueueLength()).toEqual(0); + socket.disconnect(); + done(); + } + }, + }); + + requests.forEach((request) => socket.queueRequest(request)); + setTimeout(() => { + expect(socket.getQueueLength()).toEqual(3); + socket.connect(); + }, 5_000); + }); + }); + + describe("reconnect", () => { + it("does not emit a completed event when disconnected", (done) => { + pendingWithoutSocketServer(); + const request = "request"; + const socket = new QueueingStreamingSocket(socketServerUrl); + socket.events.subscribe({ + next: ({ data }) => { + if (data === request) { + socket.disconnect(); + done(); + } + }, + complete: () => done.fail("Stream completed"), + }); + + socket.connect(); + socket.disconnect(); + socket.reconnect(); + socket.queueRequest(request); + }); + + it("can reconnect and process remaining queue", (done) => { + pendingWithoutSocketServer(); + const socket = new QueueingStreamingSocket(socketServerUrl); + const requests = ["request 1", "request 2", "request 3"] as const; + let eventsSeen = 0; + + socket.connect(); + socket.disconnect(); + + requests.forEach((request) => socket.queueRequest(request)); + + socket.events.subscribe({ + next: (event) => { + expect(event.data).toEqual(requests[eventsSeen++]); + if (eventsSeen === requests.length) { + expect(socket.getQueueLength()).toEqual(0); + socket.disconnect(); + done(); + } + }, + }); + socket.reconnect(); + }); + + it("notifies on reconnection via a callback", (done) => { + pendingWithoutSocketServer(); + const socket = new QueueingStreamingSocket(socketServerUrl, undefined, done); + + socket.reconnect(); + }); + }); + + describe("connectionStatus", () => { + it("exposes connection status", (done) => { + pendingWithoutSocketServer(); + const socket = new QueueingStreamingSocket(socketServerUrl); + let statusChangesSeen = 0; + socket.connectionStatus.updates.subscribe({ + next: (status) => { + switch (statusChangesSeen++) { + case 0: + expect(status).toEqual(ConnectionStatus.Unconnected); + break; + case 1: + case 4: + expect(status).toEqual(ConnectionStatus.Connecting); + break; + case 2: + case 5: + expect(status).toEqual(ConnectionStatus.Connected); + break; + case 3: + case 6: + expect(status).toEqual(ConnectionStatus.Disconnected); + break; + default: + done.fail("Got too many status changes"); + } + if (statusChangesSeen === 7) { + done(); + } + }, + }); + + socket.connect(); + setTimeout(() => { + socket.disconnect(); + socket.reconnect(); + setTimeout(() => socket.disconnect(), 1000); + }, 1000); + }); + }); +}); diff --git a/packages/socket/src/queueingstreamingsocket.ts b/packages/socket/src/queueingstreamingsocket.ts new file mode 100644 index 00000000..8bd106bc --- /dev/null +++ b/packages/socket/src/queueingstreamingsocket.ts @@ -0,0 +1,118 @@ +import { DefaultValueProducer, ValueAndUpdates } from "@iov/stream"; +import { Listener, Producer, Stream } from "xstream"; + +import { SocketWrapperMessageEvent } from "./socketwrapper"; +import { StreamingSocket } from "./streamingsocket"; + +export enum ConnectionStatus { + Unconnected, + Connecting, + Connected, + Disconnected, +} + +/** + * A wrapper around StreamingSocket that can queue requests. + */ +export class QueueingStreamingSocket { + public readonly connectionStatus: ValueAndUpdates; + public readonly events: Stream; + + private readonly url: string; + private readonly timeout: number; + private readonly queue: string[] = []; + private socket: StreamingSocket; + private isProcessingQueue = false; + private eventProducerListener: Listener | undefined; + private readonly connectionStatusProducer: DefaultValueProducer; + private readonly reconnectedHandler?: () => void; + + public constructor(url: string, timeout = 10_000, reconnectedHandler?: () => void) { + this.url = url; + this.timeout = timeout; + this.reconnectedHandler = reconnectedHandler; + + const eventProducer: Producer = { + start: (listener) => (this.eventProducerListener = listener), + stop: () => (this.eventProducerListener = undefined), + }; + this.events = Stream.create(eventProducer); + this.connectionStatusProducer = new DefaultValueProducer(ConnectionStatus.Unconnected); + this.connectionStatus = new ValueAndUpdates(this.connectionStatusProducer); + + this.socket = new StreamingSocket(this.url, this.timeout); + this.socket.events.subscribe({ + next: (event) => { + if (!this.eventProducerListener) throw new Error("No event producer listener set"); + this.eventProducerListener.next(event); + }, + error: () => this.connectionStatusProducer.update(ConnectionStatus.Disconnected), + }); + } + + public connect(): void { + this.connectionStatusProducer.update(ConnectionStatus.Connecting); + this.socket.connected.then( + async () => { + this.connectionStatusProducer.update(ConnectionStatus.Connected); + return this.processQueue(); + }, + () => this.connectionStatusProducer.update(ConnectionStatus.Disconnected), + ); + this.socket.connect(); + } + + public disconnect(): void { + this.connectionStatusProducer.update(ConnectionStatus.Disconnected); + this.socket.disconnect(); + } + + public reconnect(): void { + this.socket = new StreamingSocket(this.url, this.timeout); + this.socket.events.subscribe({ + next: (event) => { + if (!this.eventProducerListener) throw new Error("No event producer listener set"); + this.eventProducerListener.next(event); + }, + error: () => this.connectionStatusProducer.update(ConnectionStatus.Disconnected), + }); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.socket.connected.then(() => { + if (this.reconnectedHandler) { + this.reconnectedHandler(); + } + }); + this.connect(); + } + + public getQueueLength(): number { + return this.queue.length; + } + + public queueRequest(request: string): void { + this.queue.push(request); + // We don’t need to wait for the queue to be processed. + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.processQueue(); + } + + private async processQueue(): Promise { + if (this.isProcessingQueue || this.connectionStatus.value !== ConnectionStatus.Connected) { + return; + } + this.isProcessingQueue = true; + + let request: string | undefined; + while ((request = this.queue.shift())) { + try { + await this.socket.send(request); + this.isProcessingQueue = false; + } catch (error) { + // Probably the connection is down; will try again automatically when reconnected. + this.queue.unshift(request); + this.isProcessingQueue = false; + return; + } + } + } +} diff --git a/packages/socket/src/reconnectingsocket.spec.ts b/packages/socket/src/reconnectingsocket.spec.ts new file mode 100644 index 00000000..4dbc6245 --- /dev/null +++ b/packages/socket/src/reconnectingsocket.spec.ts @@ -0,0 +1,194 @@ +import assert from "assert"; + +import { ReconnectingSocket } from "./reconnectingsocket"; + +/** @see https://nodejs.org/api/child_process.html#child_process_child_process_exec_command_options_callback */ +type Exec = (command: string, callback: (error: null | (Error & { readonly code: number })) => void) => void; + +let exec: Exec | undefined; +let childProcessAvailable: boolean; + +try { + exec = require("child_process").exec; + assert.strict(typeof exec === "function"); + childProcessAvailable = true; +} catch { + childProcessAvailable = false; +} + +function pendingWithoutSocketServer(): void { + if (!process.env.SOCKETSERVER_ENABLED) { + pending("Set SOCKETSERVER_ENABLED to enable socket tests"); + } +} + +function pendingWithoutChildProcess(): void { + if (!childProcessAvailable) { + pending("Run test in an environment which supports child processes to enable socket tests"); + } +} + +describe("ReconnectingSocket", () => { + const socketServerUrl = "ws://localhost:4444/websocket"; + + it("can be constructed", () => { + const socket = new ReconnectingSocket(socketServerUrl); + expect(socket).toBeTruthy(); + }); + + describe("connect", () => { + it("cannot connect after being connected", (done) => { + pendingWithoutSocketServer(); + const socket = new ReconnectingSocket(socketServerUrl); + // Necessary otherwise the producer doesn’t start + socket.events.subscribe({}); + + socket.connect(); + + setTimeout(() => { + expect(() => socket.connect()).toThrowError(/cannot connect/i); + done(); + }, 1000); + }); + }); + + describe("disconnect", () => { + it("ends the events stream", (done) => { + pendingWithoutSocketServer(); + const socket = new ReconnectingSocket(socketServerUrl); + socket.events.subscribe({ + complete: done, + }); + + socket.connect(); + + setTimeout(() => socket.disconnect(), 1000); + }); + + it("cannot connect after being disconnected", (done) => { + pendingWithoutSocketServer(); + const socket = new ReconnectingSocket(socketServerUrl); + // Necessary otherwise the producer doesn’t start + socket.events.subscribe({}); + + socket.connect(); + + setTimeout(() => { + socket.disconnect(); + expect(() => socket.connect()).toThrowError(/cannot connect/i); + done(); + }, 1000); + }); + + it("can disconnect without waiting for open", () => { + pendingWithoutSocketServer(); + const socket = new ReconnectingSocket(socketServerUrl); + expect(() => { + socket.connect(); + socket.disconnect(); + }).not.toThrow(); + }); + }); + + describe("reconnection", () => { + const dirPath = "../../scripts/socketserver"; + const PKILL_NO_PROCESSES_MATCHED = 1; + const startServerCmd = `${dirPath}/start.sh`; + const stopServerCmd = `${dirPath}/stop.sh`; + + it("automatically reconnects if no connection can be established at init", (done) => { + pendingWithoutChildProcess(); + pendingWithoutSocketServer(); + + exec!(stopServerCmd, (stopError) => { + if (stopError && stopError.code !== PKILL_NO_PROCESSES_MATCHED) { + done.fail(stopError); + } + + const socket = new ReconnectingSocket(socketServerUrl); + const requests = ["request 1", "request 2", "request 3"] as const; + let eventsSeen = 0; + socket.events.subscribe({ + next: ({ data }) => { + expect(data).toEqual(requests[eventsSeen++]); + if (eventsSeen === requests.length) { + socket.disconnect(); + } + }, + complete: () => { + // Make sure we don't get a completion unexpectedly + expect(eventsSeen).toEqual(requests.length); + done(); + }, + }); + + socket.connect(); + requests.forEach((request) => socket.queueRequest(request)); + + setTimeout( + () => + exec!(startServerCmd, (startError) => { + if (startError) { + done.fail(startError); + } + }), + 2000, + ); + }); + }); + + it("automatically reconnects if the connection is broken off", (done) => { + pendingWithoutChildProcess(); + pendingWithoutSocketServer(); + + const socket = new ReconnectingSocket(socketServerUrl); + const requests = ["request 1", "request 2", "request 3"] as const; + let eventsSeen = 0; + socket.events.subscribe({ + next: ({ data }) => { + expect(data).toEqual(requests[eventsSeen++]); + if (eventsSeen === requests.length) { + socket.disconnect(); + } + }, + complete: () => { + // Make sure we don't get a completion unexpectedly + expect(eventsSeen).toEqual(requests.length); + done(); + }, + }); + + socket.connect(); + socket.queueRequest(requests[0]); + + setTimeout( + () => + exec!(stopServerCmd, (stopError) => { + if (stopError && stopError.code !== PKILL_NO_PROCESSES_MATCHED) { + done.fail(stopError); + } + + // TODO: This timeout is here to avoid an edge case where if a request + // is sent just as a disconnection occurs, then the websocket’s `send` + // method may not error even though the request is never sent. + // Ideally we would have a way to cover this edge case and the timeout + // would not be necessary for this test to pass. + setTimeout(() => { + requests.slice(1).forEach((request) => socket.queueRequest(request)); + + setTimeout( + () => + exec!(startServerCmd, (startError) => { + if (startError) { + done.fail(startError); + } + }), + 2000, + ); + }, 2000); + }), + 1000, + ); + }); + }); +}); diff --git a/packages/socket/src/reconnectingsocket.ts b/packages/socket/src/reconnectingsocket.ts new file mode 100644 index 00000000..9141bb85 --- /dev/null +++ b/packages/socket/src/reconnectingsocket.ts @@ -0,0 +1,92 @@ +import { ValueAndUpdates } from "@iov/stream"; +import { Listener, Producer, Stream } from "xstream"; + +import { ConnectionStatus, QueueingStreamingSocket } from "./queueingstreamingsocket"; +import { SocketWrapperMessageEvent } from "./socketwrapper"; + +/** + * A wrapper around QueueingStreamingSocket that reconnects automatically. + */ +export class ReconnectingSocket { + /** Starts with a 0.1 second timeout, then doubles every attempt with a maximum timeout of 5 seconds. */ + private static calculateTimeout(index: number): number { + return Math.min(2 ** index * 100, 5_000); + } + + public readonly connectionStatus: ValueAndUpdates; + public readonly events: Stream; + + private readonly socket: QueueingStreamingSocket; + private eventProducerListener: Listener | undefined; + private unconnected = true; + private disconnected = false; + private timeoutIndex = 0; + private reconnectTimeout: NodeJS.Timeout | null = null; + + public constructor(url: string, timeout = 10_000, reconnectedHandler?: () => void) { + const eventProducer: Producer = { + start: (listener) => (this.eventProducerListener = listener), + stop: () => (this.eventProducerListener = undefined), + }; + this.events = Stream.create(eventProducer); + + this.socket = new QueueingStreamingSocket(url, timeout, reconnectedHandler); + this.socket.events.subscribe({ + next: (event) => { + if (this.eventProducerListener) { + this.eventProducerListener.next(event); + } + }, + error: (error) => { + if (this.eventProducerListener) { + this.eventProducerListener.error(error); + } + }, + }); + + this.connectionStatus = this.socket.connectionStatus; + this.connectionStatus.updates.subscribe({ + next: (status) => { + if (status === ConnectionStatus.Connected) { + this.timeoutIndex = 0; + } + if (status === ConnectionStatus.Disconnected) { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + this.reconnectTimeout = setTimeout( + () => this.socket.reconnect(), + ReconnectingSocket.calculateTimeout(this.timeoutIndex++), + ); + } + }, + }); + } + + public connect(): void { + if (!this.unconnected) { + throw new Error("Cannot connect: socket has already connected"); + } + this.socket.connect(); + this.unconnected = false; + } + + public disconnect(): void { + if (this.unconnected) { + throw new Error("Cannot disconnect: socket has not yet connected"); + } + this.socket.disconnect(); + if (this.eventProducerListener) { + this.eventProducerListener.complete(); + } + this.disconnected = true; + } + + public queueRequest(request: string): void { + if (this.disconnected) { + throw new Error("Cannot queue request: socket has disconnected"); + } + this.socket.queueRequest(request); + } +} diff --git a/packages/socket/src/socketwrapper.spec.ts b/packages/socket/src/socketwrapper.spec.ts new file mode 100644 index 00000000..52a6897b --- /dev/null +++ b/packages/socket/src/socketwrapper.spec.ts @@ -0,0 +1,248 @@ +import { SocketWrapper } from "./socketwrapper"; + +function pendingWithoutSocketServer(): void { + if (!process.env.SOCKETSERVER_ENABLED) { + pending("Set SOCKETSERVER_ENABLED to enable socket tests"); + } +} + +describe("SocketWrapper", () => { + const socketServerUrlNonExisting = "ws://localhost:4443/websocket"; + const socketServerUrl = "ws://localhost:4444/websocket"; + const socketServerUrlSlow = "ws://localhost:4445/websocket"; + + it("can be constructed", () => { + const socket = new SocketWrapper(socketServerUrl, fail, fail); + expect(socket).toBeTruthy(); + }); + + it("can connect", (done) => { + pendingWithoutSocketServer(); + + const socket = new SocketWrapper( + socketServerUrl, + () => done.fail("Got unexpected message event"), + (error) => done.fail(error.message || "Unknown socket error"), + () => { + socket.disconnect(); + done(); + }, + ); + expect(socket).toBeTruthy(); + socket.connect(); + }); + + it("fails to connect to non-existing server", (done) => { + pendingWithoutSocketServer(); + + const socket = new SocketWrapper( + socketServerUrlNonExisting, + () => done.fail("Got unexpected message event"), + (error) => { + if (error.message) { + // error message only available in nodejs + expect(error.message).toMatch(/ECONNREFUSED/i); + } + done(); + }, + () => done.fail("Got unexpected open event"), + ); + expect(socket).toBeTruthy(); + socket.connect(); + }); + + it("fails to connect to non-existing server but timeout is not triggered", (done) => { + pendingWithoutSocketServer(); + const timeout = 1200; // ms + + const socket = new SocketWrapper( + socketServerUrlNonExisting, + () => done.fail("Got unexpected message event"), + (error) => { + expect(error).toBeTruthy(); + + // All done. Delay test end to ensure the timeout is not triggered + setTimeout(done, timeout * 1.3); + }, + () => done.fail("Got unexpected open event"), + () => 0, + timeout, + ); + expect(socket).toBeTruthy(); + socket.connect(); + }); + + it("can connect to slow server", (done) => { + pendingWithoutSocketServer(); + + const socket = new SocketWrapper( + socketServerUrlSlow, + () => done.fail("Got unexpected message event"), + (error) => done.fail(error.message || "Unknown socket error"), + () => { + socket.disconnect(); + done(); + }, + ); + expect(socket).toBeTruthy(); + socket.connect(); + }); + + it("times out when establishing connection takes too long", async () => { + pendingWithoutSocketServer(); + + const socket = new SocketWrapper( + socketServerUrlSlow, + () => fail("Got unexpected message event"), + (error) => fail(error.message || "Unknown socket error"), + () => fail("Got unexpected opened event"), + () => fail("Got unexpected closed event"), + 2_000, + ); + socket.connect(); + + await socket.connected + .then(() => fail("must not resolve")) + .catch((error) => expect(error).toMatch(/connection attempt timed out/i)); + }); + + it("can connect and disconnect", (done) => { + pendingWithoutSocketServer(); + + let opened = 0; + + const socket = new SocketWrapper( + socketServerUrl, + () => done.fail("Got unexpected message event"), + (error) => done.fail(error.message || "Unknown socket error"), + () => { + opened += 1; + socket.disconnect(); + }, + (closeEvent) => { + expect(closeEvent.wasClean).toEqual(true); + expect(closeEvent.code).toEqual(1000 /* Normal Closure */); + + expect(opened).toEqual(1); + done(); + }, + ); + socket.connect(); + }); + + it("can disconnect before waiting for open", (done) => { + pendingWithoutSocketServer(); + + const socket = new SocketWrapper( + socketServerUrl, + () => done.fail("Got unexpected message event"), + (error) => done.fail(error.message || "Unknown socket error"), + () => done.fail("Got unexpected open event"), + (closeEvent) => { + expect(closeEvent.wasClean).toEqual(false); + expect(closeEvent.code).toEqual(4001); + done(); + }, + ); + socket.connect(); + socket.disconnect(); + }); + + it("can disconnect before waiting for open and timeout will not be triggered", (done) => { + pendingWithoutSocketServer(); + const timeout = 500; // ms + + const socket = new SocketWrapper( + socketServerUrl, + () => done.fail("Got unexpected message event"), + (error) => done.fail(error.message || "Unknown socket error"), + () => done.fail("Got unexpected open event"), + (closeEvent) => { + expect(closeEvent.wasClean).toEqual(false); + expect(closeEvent.code).toEqual(4001); + + // All done. Delay test end to ensure the timeout is not triggered + setTimeout(done, timeout * 1.3); + }, + timeout, + ); + socket.connect(); + socket.disconnect(); + }); + + it("can send events when connected", (done) => { + pendingWithoutSocketServer(); + + const responseMessages = new Array(); + + const socket = new SocketWrapper( + socketServerUrl, + (response) => { + expect(response.type).toEqual("message"); + responseMessages.push(response.data); + + if (responseMessages.length === 3) { + socket.disconnect(); + } + }, + (error) => done.fail(error.message || "Unknown socket error"), + async () => { + await socket.send("aabbccdd"); + await socket.send("whatever"); + await socket.send("lalala"); + }, + () => { + expect(responseMessages.length).toEqual(3); + done(); + }, + ); + socket.connect(); + }); + + it("can send events after timeout period", (done) => { + pendingWithoutSocketServer(); + + // The "timeout period" is the period in which a timeout could potentially be triggered + + const timeoutPeriodLength = 1_500; + + const socket = new SocketWrapper( + socketServerUrl, + (response) => { + expect(response.type).toEqual("message"); + expect(response.data).toEqual("Hello world"); + socket.disconnect(); + }, + (error) => done.fail(error.message || "Unknown socket error"), + undefined, + () => done(), + timeoutPeriodLength, + ); + socket.connect(); + + setTimeout(() => socket.send("Hello world"), 2 * timeoutPeriodLength); + }); + + it("cannot send on a disconnect socket (it will never come back)", (done) => { + pendingWithoutSocketServer(); + + const socket = new SocketWrapper( + socketServerUrl, + () => done.fail("Got unexpected message event"), + (error) => done.fail(error.message || "Unknown socket error"), + () => { + socket.disconnect(); + }, + () => { + socket + .send("la li lu") + .then(() => done.fail("must not resolve")) + .catch((error) => { + expect(error).toMatch(/socket was closed/i); + done(); + }); + }, + ); + socket.connect(); + }); +}); diff --git a/packages/socket/src/socketwrapper.ts b/packages/socket/src/socketwrapper.ts new file mode 100644 index 00000000..5c5c06ae --- /dev/null +++ b/packages/socket/src/socketwrapper.ts @@ -0,0 +1,203 @@ +import WebSocket from "isomorphic-ws"; + +function environmentIsNodeJs(): boolean { + return ( + typeof process !== "undefined" && + typeof process.versions !== "undefined" && + typeof process.versions.node !== "undefined" + ); +} + +export interface SocketWrapperCloseEvent { + readonly wasClean: boolean; + readonly code: number; +} + +export interface SocketWrapperErrorEvent { + // fields available in browsers + readonly isTrusted?: boolean; + + // fields available in node + readonly type?: string; + readonly message?: string; +} + +export interface SocketWrapperMessageEvent { + readonly data: string; + readonly type: string; +} + +/** + * A thin wrapper around isomorphic-ws' WebSocket class that adds + * - constant message/error/open/close handlers + * - explict connection via a connect() method + * - type support for events + * - handling of corner cases in the open and close behaviour + */ +export class SocketWrapper { + public readonly connected: Promise; + + private connectedResolver: (() => void) | undefined; + private connectedRejecter: ((reason: any) => void) | undefined; + private socket: WebSocket | undefined; + private timeoutId: NodeJS.Timeout | undefined; + private closed = false; + private readonly url: string; + private readonly messageHandler: (event: SocketWrapperMessageEvent) => void; + private readonly errorHandler: (event: SocketWrapperErrorEvent) => void; + private readonly openHandler?: () => void; + private readonly closeHandler?: (event: SocketWrapperCloseEvent) => void; + private readonly timeout: number; + + public constructor( + url: string, + messageHandler: (event: SocketWrapperMessageEvent) => void, + errorHandler: (event: SocketWrapperErrorEvent) => void, + openHandler?: () => void, + closeHandler?: (event: SocketWrapperCloseEvent) => void, + timeout = 10_000, + ) { + this.connected = new Promise((resolve, reject) => { + this.connectedResolver = resolve; + this.connectedRejecter = reject; + }); + + this.url = url; + this.messageHandler = messageHandler; + this.errorHandler = errorHandler; + this.openHandler = openHandler; + this.closeHandler = closeHandler; + this.timeout = timeout; + } + + /** + * returns a promise that resolves when connection is open + */ + public connect(): void { + const socket = new WebSocket(this.url); + + socket.onerror = (error) => { + this.clearTimeout(); + if (this.errorHandler) { + this.errorHandler(error); + } + }; + socket.onmessage = (messageEvent) => { + this.messageHandler({ + type: messageEvent.type, + data: messageEvent.data as string, + }); + }; + socket.onopen = (_) => { + this.clearTimeout(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.connectedResolver!(); + + if (this.openHandler) { + this.openHandler(); + } + }; + socket.onclose = (closeEvent) => { + this.closed = true; + if (this.closeHandler) { + this.closeHandler(closeEvent); + } + }; + + const started = Date.now(); + this.timeoutId = setTimeout(() => { + socket.onmessage = () => 0; + socket.onerror = () => 0; + socket.onopen = () => 0; + socket.onclose = () => 0; + socket.close(); + this.socket = undefined; + + const elapsed = Math.floor(Date.now() - started); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.connectedRejecter!(`Connection attempt timed out after ${elapsed} ms`); + }, this.timeout); + + this.socket = socket; + } + + /** + * Closes an established connection and aborts other connection states + */ + public disconnect(): void { + if (!this.socket) { + throw new Error("Socket undefined. This must be called after connecting."); + } + + this.clearTimeout(); + + switch (this.socket.readyState) { + case WebSocket.OPEN: + this.socket.close(1000 /* Normal Closure */); + break; + case WebSocket.CLOSED: + // nothing to be done + break; + case WebSocket.CONNECTING: + // imitate missing abort API + this.socket.onopen = () => 0; + this.socket.onclose = () => 0; + this.socket.onerror = () => 0; + this.socket.onmessage = () => 0; + this.socket = undefined; + if (this.closeHandler) { + this.closeHandler({ wasClean: false, code: 4001 }); + } + break; + case WebSocket.CLOSING: + // already closing. Let it proceed + break; + default: + throw new Error(`Unknown readyState: ${this.socket.readyState}`); + } + } + + public async send(data: string): Promise { + return new Promise((resolve, reject) => { + if (!this.socket) { + throw new Error("Socket undefined. This must be called after connecting."); + } + + if (this.closed) { + throw new Error("Socket was closed, so no data can be sent anymore."); + } + + // this exception should be thrown by send() automatically according to + // https://developer.mozilla.org/de/docs/Web/API/WebSocket#send() but it does not work in browsers + if (this.socket.readyState !== WebSocket.OPEN) { + throw new Error("Websocket is not open"); + } + + if (environmentIsNodeJs()) { + this.socket.send(data, (err) => (err ? reject(err) : resolve())); + } else { + // Browser websocket send method does not accept a callback + this.socket.send(data); + resolve(); + } + }); + } + + /** + * Clears the timeout function, such that no timeout error will be raised anymore. This should be + * called when the connection is established, a connection error occurred or the socket is disconnected. + * + * This method must not be called before `connect()`. + * This method is idempotent. + */ + private clearTimeout(): void { + if (!this.timeoutId) { + throw new Error( + "Timeout ID not set. This should not happen and usually means connect() was not called.", + ); + } + + // Note: do not unset this.timeoutId to allow multiple calls to this function + clearTimeout(this.timeoutId); + } +} diff --git a/packages/socket/src/streamingsocket.spec.ts b/packages/socket/src/streamingsocket.spec.ts new file mode 100644 index 00000000..7682025b --- /dev/null +++ b/packages/socket/src/streamingsocket.spec.ts @@ -0,0 +1,96 @@ +import { toListPromise } from "@iov/stream"; + +import { StreamingSocket } from "./streamingsocket"; + +function skipTests(): boolean { + return !process.env.SOCKETSERVER_ENABLED; +} + +function pendingWithoutSocketServer(): void { + if (skipTests()) { + pending("Set SOCKETSERVER_ENABLED to enable socket tests"); + } +} + +describe("StreamingSocket", () => { + const socketServerUrl = "ws://localhost:4444/websocket"; + const socketServerUrlSlow = "ws://localhost:4445/websocket"; + + it("can be constructed", () => { + const socket = new StreamingSocket(socketServerUrl); + expect(socket).toBeTruthy(); + }); + + it("can connect", async () => { + pendingWithoutSocketServer(); + + const socket = new StreamingSocket(socketServerUrl); + expect(socket).toBeTruthy(); + socket.connect(); + await socket.connected; + socket.disconnect(); + }); + + it("can connect to slow server", async () => { + pendingWithoutSocketServer(); + + const socket = new StreamingSocket(socketServerUrlSlow); + expect(socket).toBeTruthy(); + socket.connect(); + await socket.connected; + socket.disconnect(); + }); + + it("times out when establishing connection takes too long", async () => { + pendingWithoutSocketServer(); + + const socket = new StreamingSocket(socketServerUrlSlow, 2_000); + socket.connect(); + + await socket.connected + .then(() => fail("must not resolve")) + .catch((error) => expect(error).toMatch(/connection attempt timed out/i)); + }); + + it("can send events when connected", async () => { + pendingWithoutSocketServer(); + + const socket = new StreamingSocket(socketServerUrl); + + const responsePromise = toListPromise(socket.events, 3); + + socket.connect(); + await socket.connected; + + await socket.send("aabbccdd"); + await socket.send("whatever"); + await socket.send("lalala"); + + const response = await responsePromise; + expect(response.length).toEqual(3); + + socket.disconnect(); + }); + + it("completes stream when disconnected", (done) => { + pendingWithoutSocketServer(); + + const socket = new StreamingSocket(socketServerUrl); + expect(socket).toBeTruthy(); + const subscription = socket.events.subscribe({ + complete: () => { + subscription.unsubscribe(); + done(); + }, + }); + + (async () => { + socket.connect(); + await socket.connected; + await socket.send("aabbccdd"); + await socket.send("whatever"); + await socket.send("lalala"); + socket.disconnect(); + })().catch(done.fail); + }); +}); diff --git a/packages/socket/src/streamingsocket.ts b/packages/socket/src/streamingsocket.ts new file mode 100644 index 00000000..a0e0805a --- /dev/null +++ b/packages/socket/src/streamingsocket.ts @@ -0,0 +1,63 @@ +import { Listener, Producer, Stream } from "xstream"; + +import { SocketWrapper, SocketWrapperMessageEvent } from "./socketwrapper"; + +/** + * A WebSocket wrapper that exposes all events as a stream. + * + * This underlying socket will not be closed when the stream has no listeners + */ +export class StreamingSocket { + public readonly connected: Promise; + public readonly events: Stream; + private eventProducerListener: Listener | undefined; + private readonly socket: SocketWrapper; + + public constructor(url: string, timeout = 10_000) { + this.socket = new SocketWrapper( + url, + (event) => { + if (this.eventProducerListener) { + this.eventProducerListener.next(event); + } + }, + (errorEvent) => { + if (this.eventProducerListener) { + this.eventProducerListener.error(errorEvent); + } + }, + () => { + // socket opened + }, + (closeEvent) => { + if (this.eventProducerListener) { + if (closeEvent.wasClean) { + this.eventProducerListener.complete(); + } else { + this.eventProducerListener.error("Socket was closed unclean"); + } + } + }, + timeout, + ); + this.connected = this.socket.connected; + + const eventProducer: Producer = { + start: (listener) => (this.eventProducerListener = listener), + stop: () => (this.eventProducerListener = undefined), + }; + this.events = Stream.create(eventProducer); + } + + public connect(): void { + this.socket.connect(); + } + + public disconnect(): void { + this.socket.disconnect(); + } + + public async send(data: string): Promise { + return this.socket.send(data); + } +} diff --git a/packages/socket/tsconfig.json b/packages/socket/tsconfig.json new file mode 100644 index 00000000..167e8c02 --- /dev/null +++ b/packages/socket/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "baseUrl": ".", + "outDir": "build", + "declarationDir": "build/types", + "rootDir": "src" + }, + "include": [ + "src/**/*" + ] +} diff --git a/packages/socket/typedoc.js b/packages/socket/typedoc.js new file mode 100644 index 00000000..e2387c7d --- /dev/null +++ b/packages/socket/typedoc.js @@ -0,0 +1,14 @@ +const packageJson = require("./package.json"); + +module.exports = { + src: ["./src"], + out: "docs", + exclude: "**/*.spec.ts", + target: "es6", + name: `${packageJson.name} Documentation`, + readme: "README.md", + mode: "file", + excludeExternals: true, + excludeNotExported: true, + excludePrivate: true, +}; diff --git a/packages/socket/types/index.d.ts b/packages/socket/types/index.d.ts new file mode 100644 index 00000000..25f0df12 --- /dev/null +++ b/packages/socket/types/index.d.ts @@ -0,0 +1,9 @@ +export { ConnectionStatus, QueueingStreamingSocket } from "./queueingstreamingsocket"; +export { ReconnectingSocket } from "./reconnectingsocket"; +export { + SocketWrapper, + SocketWrapperCloseEvent, + SocketWrapperErrorEvent, + SocketWrapperMessageEvent, +} from "./socketwrapper"; +export { StreamingSocket } from "./streamingsocket"; diff --git a/packages/socket/types/queueingstreamingsocket.d.ts b/packages/socket/types/queueingstreamingsocket.d.ts new file mode 100644 index 00000000..b599e63f --- /dev/null +++ b/packages/socket/types/queueingstreamingsocket.d.ts @@ -0,0 +1,31 @@ +import { ValueAndUpdates } from "@iov/stream"; +import { Stream } from "xstream"; +import { SocketWrapperMessageEvent } from "./socketwrapper"; +export declare enum ConnectionStatus { + Unconnected = 0, + Connecting = 1, + Connected = 2, + Disconnected = 3, +} +/** + * A wrapper around StreamingSocket that can queue requests. + */ +export declare class QueueingStreamingSocket { + readonly connectionStatus: ValueAndUpdates; + readonly events: Stream; + private readonly url; + private readonly timeout; + private readonly queue; + private socket; + private isProcessingQueue; + private eventProducerListener; + private readonly connectionStatusProducer; + private readonly reconnectedHandler?; + constructor(url: string, timeout?: number, reconnectedHandler?: () => void); + connect(): void; + disconnect(): void; + reconnect(): void; + getQueueLength(): number; + queueRequest(request: string): void; + private processQueue; +} diff --git a/packages/socket/types/reconnectingsocket.d.ts b/packages/socket/types/reconnectingsocket.d.ts new file mode 100644 index 00000000..f76ae3c3 --- /dev/null +++ b/packages/socket/types/reconnectingsocket.d.ts @@ -0,0 +1,23 @@ +import { ValueAndUpdates } from "@iov/stream"; +import { Stream } from "xstream"; +import { ConnectionStatus } from "./queueingstreamingsocket"; +import { SocketWrapperMessageEvent } from "./socketwrapper"; +/** + * A wrapper around QueueingStreamingSocket that reconnects automatically. + */ +export declare class ReconnectingSocket { + /** Starts with a 0.1 second timeout, then doubles every attempt with a maximum timeout of 5 seconds. */ + private static calculateTimeout; + readonly connectionStatus: ValueAndUpdates; + readonly events: Stream; + private readonly socket; + private eventProducerListener; + private unconnected; + private disconnected; + private timeoutIndex; + private reconnectTimeout; + constructor(url: string, timeout?: number, reconnectedHandler?: () => void); + connect(): void; + disconnect(): void; + queueRequest(request: string): void; +} diff --git a/packages/socket/types/socketwrapper.d.ts b/packages/socket/types/socketwrapper.d.ts new file mode 100644 index 00000000..dc77176f --- /dev/null +++ b/packages/socket/types/socketwrapper.d.ts @@ -0,0 +1,59 @@ +export interface SocketWrapperCloseEvent { + readonly wasClean: boolean; + readonly code: number; +} +export interface SocketWrapperErrorEvent { + readonly isTrusted?: boolean; + readonly type?: string; + readonly message?: string; +} +export interface SocketWrapperMessageEvent { + readonly data: string; + readonly type: string; +} +/** + * A thin wrapper around isomorphic-ws' WebSocket class that adds + * - constant message/error/open/close handlers + * - explict connection via a connect() method + * - type support for events + * - handling of corner cases in the open and close behaviour + */ +export declare class SocketWrapper { + readonly connected: Promise; + private connectedResolver; + private connectedRejecter; + private socket; + private timeoutId; + private closed; + private readonly url; + private readonly messageHandler; + private readonly errorHandler; + private readonly openHandler?; + private readonly closeHandler?; + private readonly timeout; + constructor( + url: string, + messageHandler: (event: SocketWrapperMessageEvent) => void, + errorHandler: (event: SocketWrapperErrorEvent) => void, + openHandler?: () => void, + closeHandler?: (event: SocketWrapperCloseEvent) => void, + timeout?: number, + ); + /** + * returns a promise that resolves when connection is open + */ + connect(): void; + /** + * Closes an established connection and aborts other connection states + */ + disconnect(): void; + send(data: string): Promise; + /** + * Clears the timeout function, such that no timeout error will be raised anymore. This should be + * called when the connection is established, a connection error occurred or the socket is disconnected. + * + * This method must not be called before `connect()`. + * This method is idempotent. + */ + private clearTimeout; +} diff --git a/packages/socket/types/streamingsocket.d.ts b/packages/socket/types/streamingsocket.d.ts new file mode 100644 index 00000000..6c850e6a --- /dev/null +++ b/packages/socket/types/streamingsocket.d.ts @@ -0,0 +1,17 @@ +import { Stream } from "xstream"; +import { SocketWrapperMessageEvent } from "./socketwrapper"; +/** + * A WebSocket wrapper that exposes all events as a stream. + * + * This underlying socket will not be closed when the stream has no listeners + */ +export declare class StreamingSocket { + readonly connected: Promise; + readonly events: Stream; + private eventProducerListener; + private readonly socket; + constructor(url: string, timeout?: number); + connect(): void; + disconnect(): void; + send(data: string): Promise; +} diff --git a/packages/socket/webpack.web.config.js b/packages/socket/webpack.web.config.js new file mode 100644 index 00000000..41eac744 --- /dev/null +++ b/packages/socket/webpack.web.config.js @@ -0,0 +1,19 @@ +const glob = require("glob"); +const path = require("path"); +const webpack = require("webpack"); + +const target = "web"; +const distdir = path.join(__dirname, "dist", "web"); + +module.exports = [ + { + // bundle used for Karma tests + target: target, + entry: glob.sync("./build/**/*.spec.js"), + output: { + path: distdir, + filename: "tests.js", + }, + plugins: [new webpack.EnvironmentPlugin(["SOCKETSERVER_ENABLED"])], + }, +]; diff --git a/packages/tendermint-rpc/package.json b/packages/tendermint-rpc/package.json index a2553c2c..794dc1d2 100644 --- a/packages/tendermint-rpc/package.json +++ b/packages/tendermint-rpc/package.json @@ -48,7 +48,7 @@ "@cosmjs/encoding": "^0.20.0", "@cosmjs/json-rpc": "^0.20.0", "@cosmjs/math": "^0.20.0", - "@iov/socket": "^2.3.2", + "@cosmjs/socket": "^0.20.0", "axios": "^0.19.0", "readonly-date": "^1.0.0", "type-tagger": "^1.0.0", diff --git a/packages/tendermint-rpc/src/rpcclients/websocketclient.ts b/packages/tendermint-rpc/src/rpcclients/websocketclient.ts index 5b12912d..f6909cfd 100644 --- a/packages/tendermint-rpc/src/rpcclients/websocketclient.ts +++ b/packages/tendermint-rpc/src/rpcclients/websocketclient.ts @@ -6,7 +6,7 @@ import { JsonRpcSuccessResponse, parseJsonRpcResponse, } from "@cosmjs/json-rpc"; -import { ConnectionStatus, ReconnectingSocket, SocketWrapperMessageEvent } from "@iov/socket"; +import { ConnectionStatus, ReconnectingSocket, SocketWrapperMessageEvent } from "@cosmjs/socket"; import { firstEvent } from "@iov/stream"; import { Listener, Producer, Stream, Subscription } from "xstream"; diff --git a/scripts/socketserver/Dockerfile b/scripts/socketserver/Dockerfile new file mode 100644 index 00000000..844961f9 --- /dev/null +++ b/scripts/socketserver/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.7-alpine + +WORKDIR /usr/src/app + +COPY echo.py ./ +RUN pip install websockets + +ENTRYPOINT ["python", "./echo.py"] diff --git a/scripts/socketserver/echo.py b/scripts/socketserver/echo.py new file mode 100755 index 00000000..352cc5db --- /dev/null +++ b/scripts/socketserver/echo.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +#pylint:disable=missing-docstring,invalid-name + +import argparse +import asyncio +import websockets +import sys +import time + +HOST = "0.0.0.0" + +def log(data): + print(data, flush=True) + +@asyncio.coroutine +def connection_handler(connection, path): + connection_id = hex(id(connection)) + log("{} opened connection via {}".format(connection_id, path)) + try: + while True: + incoming_message = yield from connection.recv() + log("< {}".format(incoming_message)) + outgoing_message = incoming_message + yield from connection.send(outgoing_message) + log("> {}".format(outgoing_message)) + except websockets.exceptions.ConnectionClosed: + log("{} closed connection".format(connection_id)) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--port", + help="Port to listen on", + type=int, + default=4000) + parser.add_argument("--delay", + help="Time in seconds that a connection will be delayed before establishing it", + type=int, + default=0) + args = parser.parse_args() + + def delaying_process_request(path, request_headers): + time.sleep(args.delay) + return None + + log("Starting server at {}:{}".format(HOST, args.port)) + server = websockets.serve(connection_handler, HOST, args.port, process_request=delaying_process_request) + log("Running now.") + + asyncio.get_event_loop().run_until_complete(server) + asyncio.get_event_loop().run_forever() diff --git a/scripts/socketserver/start.sh b/scripts/socketserver/start.sh new file mode 100755 index 00000000..3b8a6412 --- /dev/null +++ b/scripts/socketserver/start.sh @@ -0,0 +1,37 @@ +#!/bin/bash +set -o errexit -o nounset -o pipefail +command -v shellcheck > /dev/null && shellcheck "$0" + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +SOCKETSERVER_DIR=$(mktemp -d "${TMPDIR:-/tmp}/socketserver.XXXXXXXXX") +export SOCKETSERVER_DIR +echo "SOCKETSERVER_DIR = $SOCKETSERVER_DIR" + +NAME_DEFAULT="socketserver-default" +NAME_SLOW="socketserver-slow" + +LOGFILE_DEFAULT="${SOCKETSERVER_DIR}/socketserver_4444.log" +LOGFILE_SLOW="${SOCKETSERVER_DIR}/socketserver_4445.log" + +docker build -t "socketserver:local" "$SCRIPT_DIR" + +docker run --rm \ + --user="$UID" \ + --name "$NAME_DEFAULT" \ + -p "4444:4000" \ + socketserver:local \ + --delay 0 \ + > "$LOGFILE_DEFAULT" & +docker run --rm \ + --user="$UID" \ + --name "$NAME_SLOW" \ + -p "4445:4000" \ + socketserver:local \ + --delay 5 \ + > "$LOGFILE_SLOW" & + +# Debug start +sleep 3 +cat "$LOGFILE_DEFAULT" +cat "$LOGFILE_SLOW" diff --git a/scripts/socketserver/stop.sh b/scripts/socketserver/stop.sh new file mode 100755 index 00000000..39acd770 --- /dev/null +++ b/scripts/socketserver/stop.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -o errexit -o nounset -o pipefail +command -v shellcheck > /dev/null && shellcheck "$0" + +echo "Killing socketserver containers ..." +docker container kill "socketserver-default" "socketserver-slow" diff --git a/scripts/tendermint/start.sh b/scripts/tendermint/start.sh index 16ac73c0..e30098dd 100755 --- a/scripts/tendermint/start.sh +++ b/scripts/tendermint/start.sh @@ -41,3 +41,11 @@ docker run --rm \ > "$LOGFILE" & echo "Tendermint running and logging into $LOGFILE" + +# Give REST server some time to come alive. No idea why this helps. Needed for CI. +if [ -n "${CI:-}" ]; then + sleep 0.5 + + # Follow the logs in CI's background job + tail -f "$LOGFILE" +fi diff --git a/yarn.lock b/yarn.lock index 5e14479b..45665944 100644 --- a/yarn.lock +++ b/yarn.lock @@ -251,16 +251,6 @@ unique-filename "^1.1.1" which "^1.3.1" -"@iov/socket@^2.3.2": - version "2.3.2" - resolved "https://registry.yarnpkg.com/@iov/socket/-/socket-2.3.2.tgz#adc8ef389bafc5380e1c7415fb21f9a890d79195" - integrity sha512-LMIVGhYNvEdctHwjprVYv5QnDxXLNNZ9ASU0IRlsjHsMNR1Tcy+FsdqqQ5PJlW1r/r92dHLCfV9IKXucg/h8rQ== - dependencies: - "@iov/stream" "^2.3.2" - isomorphic-ws "^4.0.1" - ws "^6.2.0" - xstream "^11.10.0" - "@iov/stream@^2.3.2": version "2.3.2" resolved "https://registry.yarnpkg.com/@iov/stream/-/stream-2.3.2.tgz#472063f3a4fcd1e97de0ae99189f98b94825afac" @@ -1383,6 +1373,13 @@ resolved "https://registry.yarnpkg.com/@types/unorm/-/unorm-1.3.28.tgz#580141162f2fd221faae2b2d68da6c839402c375" integrity sha512-l3uh18vcvkQ964HSK7Tx0YbhxN/Hj+k1w3nLT08n770lngqVKljmF7Ht4e7elFbx6L2WYse97whtpJOo8MHtxQ== +"@types/ws@^6.0.1": + version "6.0.4" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-6.0.4.tgz#7797707c8acce8f76d8c34b370d4645b70421ff1" + integrity sha512-PpPrX7SZW9re6+Ha8ojZG4Se8AZXgf0GK6zmfqEuCsY49LFDNXO3SByp44X3dFEqtB73lkCDAdUazhAjVPiNwg== + dependencies: + "@types/node" "*" + "@types/yargs-parser@*": version "15.0.0" resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-15.0.0.tgz#cb3f9f741869e20cce330ffbeb9271590483882d"