From ea68244a5f66bf228e799a59d814c9e32f2ffe86 Mon Sep 17 00:00:00 2001 From: willclarktech Date: Wed, 24 Jun 2020 17:50:47 +0200 Subject: [PATCH] stream: Fork @iov/stream --- packages/stream/.eslintignore | 1 + packages/stream/.gitignore | 3 + packages/stream/README.md | 16 ++ packages/stream/jasmine-testrunner.js | 26 +++ packages/stream/karma.conf.js | 47 ++++ packages/stream/nonces/1552895838 | 0 packages/stream/nonces/1553085893 | 0 packages/stream/nonces/1554721221 | 0 packages/stream/nonces/1554724217 | 0 packages/stream/nonces/1554737147 | 0 packages/stream/nonces/1555314694 | 0 packages/stream/nonces/1556008552 | 0 packages/stream/nonces/1556028926 | 0 packages/stream/nonces/1556095341 | 0 packages/stream/nonces/1556616100 | 0 packages/stream/nonces/1557811966 | 0 packages/stream/nonces/1558346811 | 0 packages/stream/nonces/1558456815 | 0 packages/stream/nonces/1558460837 | 0 packages/stream/nonces/1559802671 | 0 packages/stream/nonces/1561970534 | 0 packages/stream/nonces/1562080432 | 0 packages/stream/nonces/1563468776 | 0 packages/stream/nonces/1563887488 | 0 packages/stream/nonces/1563960408 | 0 packages/stream/nonces/1563981076 | 0 packages/stream/nonces/1564503008 | 0 packages/stream/nonces/1564651088 | 0 packages/stream/nonces/1565101189 | 0 packages/stream/nonces/1565595547 | 0 packages/stream/nonces/1565876849 | 0 packages/stream/nonces/1566487600 | 0 packages/stream/nonces/1567435567 | 0 packages/stream/nonces/1567608963 | 0 packages/stream/nonces/1567694160 | 0 packages/stream/nonces/1568039925 | 0 packages/stream/nonces/1568116477 | 0 packages/stream/nonces/1568786866 | 0 packages/stream/nonces/1568910632 | 0 packages/stream/nonces/1569319493 | 0 packages/stream/nonces/1569487848 | 0 packages/stream/nonces/1569929617 | 0 packages/stream/nonces/1570527883 | 0 packages/stream/nonces/1573026590 | 0 packages/stream/nonces/1574869843 | 0 packages/stream/nonces/1576569788 | 0 packages/stream/nonces/1576595306 | 0 packages/stream/nonces/1576678551 | 0 packages/stream/nonces/1576746493 | 0 packages/stream/nonces/1576760285 | 0 packages/stream/nonces/1576767119 | 0 packages/stream/nonces/1579019908 | 0 packages/stream/nonces/1581606289 | 0 packages/stream/nonces/1581681020 | 0 packages/stream/nonces/1584038020 | 0 packages/stream/nonces/1588011428 | 0 packages/stream/nonces/1591293896 | 0 packages/stream/nonces/README.txt | 1 + packages/stream/package.json | 43 ++++ packages/stream/src/concat.spec.ts | 203 +++++++++++++++++ packages/stream/src/concat.ts | 104 +++++++++ .../stream/src/defaultvalueproducer.spec.ts | 113 ++++++++++ packages/stream/src/defaultvalueproducer.ts | 72 ++++++ packages/stream/src/dropduplicates.spec.ts | 95 ++++++++ packages/stream/src/dropduplicates.ts | 38 ++++ packages/stream/src/index.ts | 6 + packages/stream/src/promise.spec.ts | 145 ++++++++++++ packages/stream/src/promise.ts | 67 ++++++ packages/stream/src/reducer.spec.ts | 56 +++++ packages/stream/src/reducer.ts | 87 ++++++++ packages/stream/src/valueandupdates.spec.ts | 211 ++++++++++++++++++ packages/stream/src/valueandupdates.ts | 57 +++++ packages/stream/tsconfig.json | 12 + packages/stream/tslint.json | 3 + packages/stream/typedoc.js | 14 ++ packages/stream/types/concat.d.ts | 19 ++ .../stream/types/defaultvalueproducer.d.ts | 31 +++ packages/stream/types/dropduplicates.d.ts | 25 +++ packages/stream/types/index.d.ts | 6 + packages/stream/types/promise.d.ts | 18 ++ packages/stream/types/reducer.d.ts | 14 ++ packages/stream/types/valueandupdates.d.ts | 21 ++ packages/stream/webpack.web.config.js | 17 ++ 83 files changed, 1571 insertions(+) create mode 120000 packages/stream/.eslintignore create mode 100644 packages/stream/.gitignore create mode 100644 packages/stream/README.md create mode 100644 packages/stream/jasmine-testrunner.js create mode 100644 packages/stream/karma.conf.js create mode 100644 packages/stream/nonces/1552895838 create mode 100644 packages/stream/nonces/1553085893 create mode 100644 packages/stream/nonces/1554721221 create mode 100644 packages/stream/nonces/1554724217 create mode 100644 packages/stream/nonces/1554737147 create mode 100644 packages/stream/nonces/1555314694 create mode 100644 packages/stream/nonces/1556008552 create mode 100644 packages/stream/nonces/1556028926 create mode 100644 packages/stream/nonces/1556095341 create mode 100644 packages/stream/nonces/1556616100 create mode 100644 packages/stream/nonces/1557811966 create mode 100644 packages/stream/nonces/1558346811 create mode 100644 packages/stream/nonces/1558456815 create mode 100644 packages/stream/nonces/1558460837 create mode 100644 packages/stream/nonces/1559802671 create mode 100644 packages/stream/nonces/1561970534 create mode 100644 packages/stream/nonces/1562080432 create mode 100644 packages/stream/nonces/1563468776 create mode 100644 packages/stream/nonces/1563887488 create mode 100644 packages/stream/nonces/1563960408 create mode 100644 packages/stream/nonces/1563981076 create mode 100644 packages/stream/nonces/1564503008 create mode 100644 packages/stream/nonces/1564651088 create mode 100644 packages/stream/nonces/1565101189 create mode 100644 packages/stream/nonces/1565595547 create mode 100644 packages/stream/nonces/1565876849 create mode 100644 packages/stream/nonces/1566487600 create mode 100644 packages/stream/nonces/1567435567 create mode 100644 packages/stream/nonces/1567608963 create mode 100644 packages/stream/nonces/1567694160 create mode 100644 packages/stream/nonces/1568039925 create mode 100644 packages/stream/nonces/1568116477 create mode 100644 packages/stream/nonces/1568786866 create mode 100644 packages/stream/nonces/1568910632 create mode 100644 packages/stream/nonces/1569319493 create mode 100644 packages/stream/nonces/1569487848 create mode 100644 packages/stream/nonces/1569929617 create mode 100644 packages/stream/nonces/1570527883 create mode 100644 packages/stream/nonces/1573026590 create mode 100644 packages/stream/nonces/1574869843 create mode 100644 packages/stream/nonces/1576569788 create mode 100644 packages/stream/nonces/1576595306 create mode 100644 packages/stream/nonces/1576678551 create mode 100644 packages/stream/nonces/1576746493 create mode 100644 packages/stream/nonces/1576760285 create mode 100644 packages/stream/nonces/1576767119 create mode 100644 packages/stream/nonces/1579019908 create mode 100644 packages/stream/nonces/1581606289 create mode 100644 packages/stream/nonces/1581681020 create mode 100644 packages/stream/nonces/1584038020 create mode 100644 packages/stream/nonces/1588011428 create mode 100644 packages/stream/nonces/1591293896 create mode 100644 packages/stream/nonces/README.txt create mode 100644 packages/stream/package.json create mode 100644 packages/stream/src/concat.spec.ts create mode 100644 packages/stream/src/concat.ts create mode 100644 packages/stream/src/defaultvalueproducer.spec.ts create mode 100644 packages/stream/src/defaultvalueproducer.ts create mode 100644 packages/stream/src/dropduplicates.spec.ts create mode 100644 packages/stream/src/dropduplicates.ts create mode 100644 packages/stream/src/index.ts create mode 100644 packages/stream/src/promise.spec.ts create mode 100644 packages/stream/src/promise.ts create mode 100644 packages/stream/src/reducer.spec.ts create mode 100644 packages/stream/src/reducer.ts create mode 100644 packages/stream/src/valueandupdates.spec.ts create mode 100644 packages/stream/src/valueandupdates.ts create mode 100644 packages/stream/tsconfig.json create mode 100644 packages/stream/tslint.json create mode 100644 packages/stream/typedoc.js create mode 100644 packages/stream/types/concat.d.ts create mode 100644 packages/stream/types/defaultvalueproducer.d.ts create mode 100644 packages/stream/types/dropduplicates.d.ts create mode 100644 packages/stream/types/index.d.ts create mode 100644 packages/stream/types/promise.d.ts create mode 100644 packages/stream/types/reducer.d.ts create mode 100644 packages/stream/types/valueandupdates.d.ts create mode 100644 packages/stream/webpack.web.config.js diff --git a/packages/stream/.eslintignore b/packages/stream/.eslintignore new file mode 120000 index 00000000..86039baf --- /dev/null +++ b/packages/stream/.eslintignore @@ -0,0 +1 @@ +../../.eslintignore \ No newline at end of file diff --git a/packages/stream/.gitignore b/packages/stream/.gitignore new file mode 100644 index 00000000..68bf3735 --- /dev/null +++ b/packages/stream/.gitignore @@ -0,0 +1,3 @@ +build/ +dist/ +docs/ diff --git a/packages/stream/README.md b/packages/stream/README.md new file mode 100644 index 00000000..5c364e5e --- /dev/null +++ b/packages/stream/README.md @@ -0,0 +1,16 @@ +# @iov/stream + +[![npm version](https://img.shields.io/npm/v/@iov/stream.svg)](https://www.npmjs.com/package/@iov/stream) + +@iov/stream are some helper methods and classes to ceal with stream processing. +As BCP's `BlockchainConnection` exposes a Stream based event API, and +tendermint/RpcClient exposes Stream based subscriptions, there is common useful +functionality for creating and consuming streams both in applications as well as +in the test code. + +## License + +This package is part of the IOV-Core repository, licensed under the Apache +License 2.0 (see +[NOTICE](https://github.com/iov-one/iov-core/blob/master/NOTICE) and +[LICENSE](https://github.com/iov-one/iov-core/blob/master/LICENSE)). diff --git a/packages/stream/jasmine-testrunner.js b/packages/stream/jasmine-testrunner.js new file mode 100644 index 00000000..9fada59b --- /dev/null +++ b/packages/stream/jasmine-testrunner.js @@ -0,0 +1,26 @@ +#!/usr/bin/env node + +require("source-map-support").install(); +const defaultSpecReporterConfig = require("../../jasmine-spec-reporter.config.json"); + +// setup Jasmine +const Jasmine = require("jasmine"); +const jasmine = new Jasmine(); +jasmine.loadConfig({ + spec_dir: "build", + spec_files: ["**/*.spec.js"], + helpers: [], + random: false, + seed: null, + stopSpecOnExpectationFailure: false, +}); +jasmine.jasmine.DEFAULT_TIMEOUT_INTERVAL = 15 * 1000; + +// setup reporter +const { SpecReporter } = require("jasmine-spec-reporter"); +const reporter = new SpecReporter({ ...defaultSpecReporterConfig }); + +// initialize and execute +jasmine.env.clearReporters(); +jasmine.addReporter(reporter); +jasmine.execute(); diff --git a/packages/stream/karma.conf.js b/packages/stream/karma.conf.js new file mode 100644 index 00000000..006da5fe --- /dev/null +++ b/packages/stream/karma.conf.js @@ -0,0 +1,47 @@ +module.exports = function (config) { + config.set({ + // base path that will be used to resolve all patterns (eg. files, exclude) + basePath: ".", + + // frameworks to use + // available frameworks: https://npmjs.org/browse/keyword/karma-adapter + frameworks: ["jasmine"], + + // list of files / patterns to load in the browser + files: ["dist/web/tests.js"], + + client: { + jasmine: { + random: false, + timeoutInterval: 15000, + }, + }, + + // test results reporter to use + // possible values: 'dots', 'progress' + // available reporters: https://npmjs.org/browse/keyword/karma-reporter + reporters: ["progress", "kjhtml"], + + // web server port + port: 9876, + + // enable / disable colors in the output (reporters and logs) + colors: true, + + // level of logging + // possible values: config.LOG_DISABLE || config.LOG_ERROR || config.LOG_WARN || config.LOG_INFO || config.LOG_DEBUG + logLevel: config.LOG_INFO, + + // enable / disable watching file and executing tests whenever any file changes + autoWatch: false, + + // start these browsers + // available browser launchers: https://npmjs.org/browse/keyword/karma-launcher + browsers: ["Firefox"], + + browserNoActivityTimeout: 90000, + + // Keep brower open for debugging. This is overridden by yarn scripts + singleRun: false, + }); +}; diff --git a/packages/stream/nonces/1552895838 b/packages/stream/nonces/1552895838 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1553085893 b/packages/stream/nonces/1553085893 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1554721221 b/packages/stream/nonces/1554721221 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1554724217 b/packages/stream/nonces/1554724217 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1554737147 b/packages/stream/nonces/1554737147 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1555314694 b/packages/stream/nonces/1555314694 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1556008552 b/packages/stream/nonces/1556008552 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1556028926 b/packages/stream/nonces/1556028926 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1556095341 b/packages/stream/nonces/1556095341 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1556616100 b/packages/stream/nonces/1556616100 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1557811966 b/packages/stream/nonces/1557811966 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1558346811 b/packages/stream/nonces/1558346811 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1558456815 b/packages/stream/nonces/1558456815 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1558460837 b/packages/stream/nonces/1558460837 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1559802671 b/packages/stream/nonces/1559802671 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1561970534 b/packages/stream/nonces/1561970534 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1562080432 b/packages/stream/nonces/1562080432 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1563468776 b/packages/stream/nonces/1563468776 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1563887488 b/packages/stream/nonces/1563887488 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1563960408 b/packages/stream/nonces/1563960408 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1563981076 b/packages/stream/nonces/1563981076 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1564503008 b/packages/stream/nonces/1564503008 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1564651088 b/packages/stream/nonces/1564651088 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1565101189 b/packages/stream/nonces/1565101189 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1565595547 b/packages/stream/nonces/1565595547 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1565876849 b/packages/stream/nonces/1565876849 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1566487600 b/packages/stream/nonces/1566487600 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1567435567 b/packages/stream/nonces/1567435567 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1567608963 b/packages/stream/nonces/1567608963 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1567694160 b/packages/stream/nonces/1567694160 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1568039925 b/packages/stream/nonces/1568039925 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1568116477 b/packages/stream/nonces/1568116477 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1568786866 b/packages/stream/nonces/1568786866 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1568910632 b/packages/stream/nonces/1568910632 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1569319493 b/packages/stream/nonces/1569319493 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1569487848 b/packages/stream/nonces/1569487848 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1569929617 b/packages/stream/nonces/1569929617 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1570527883 b/packages/stream/nonces/1570527883 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1573026590 b/packages/stream/nonces/1573026590 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1574869843 b/packages/stream/nonces/1574869843 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1576569788 b/packages/stream/nonces/1576569788 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1576595306 b/packages/stream/nonces/1576595306 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1576678551 b/packages/stream/nonces/1576678551 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1576746493 b/packages/stream/nonces/1576746493 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1576760285 b/packages/stream/nonces/1576760285 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1576767119 b/packages/stream/nonces/1576767119 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1579019908 b/packages/stream/nonces/1579019908 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1581606289 b/packages/stream/nonces/1581606289 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1581681020 b/packages/stream/nonces/1581681020 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1584038020 b/packages/stream/nonces/1584038020 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1588011428 b/packages/stream/nonces/1588011428 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/1591293896 b/packages/stream/nonces/1591293896 new file mode 100644 index 00000000..e69de29b diff --git a/packages/stream/nonces/README.txt b/packages/stream/nonces/README.txt new file mode 100644 index 00000000..092fe732 --- /dev/null +++ b/packages/stream/nonces/README.txt @@ -0,0 +1 @@ +Directory used to trigger lerna package updates for all packages diff --git a/packages/stream/package.json b/packages/stream/package.json new file mode 100644 index 00000000..89958189 --- /dev/null +++ b/packages/stream/package.json @@ -0,0 +1,43 @@ +{ + "name": "@iov/stream", + "version": "2.3.2", + "description": "Utility functions for producing and consuming Streams", + "author": "IOV SAS ", + "license": "Apache-2.0", + "main": "build/index.js", + "types": "types/index.d.ts", + "files": [ + "build/", + "types/", + "*.md", + "!*.spec.*", + "!**/testdata/" + ], + "repository": { + "type": "git", + "url": "https://github.com/iov-one/iov-core/tree/master/packages/iov-stream" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "docs": "shx rm -rf docs && typedoc --options typedoc.js", + "lint": "eslint --max-warnings 0 \"**/*.{js,ts}\" && tslint -t verbose --project .", + "format": "prettier --write --loglevel warn \"./src/**/*.ts\"", + "format-text": "prettier --write --prose-wrap always --print-width 80 \"./*.md\"", + "test-node": "node jasmine-testrunner.js", + "test-edge": "yarn pack-web && karma start --single-run --browsers Edge", + "test-firefox": "yarn pack-web && karma start --single-run --browsers Firefox", + "test-chrome": "yarn pack-web && karma start --single-run --browsers ChromeHeadless", + "test-safari": "yarn pack-web && karma start --single-run --browsers Safari", + "test": "yarn build-or-skip && yarn test-node", + "move-types": "shx rm -r ./types/* && shx mv build/types/* ./types && rm -rf ./types/testdata && shx rm -f ./types/*.spec.d.ts", + "format-types": "prettier --write --loglevel warn \"./types/**/*.d.ts\"", + "build": "shx rm -rf ./build && tsc && yarn move-types && yarn format-types", + "build-or-skip": "[ -n \"$SKIP_BUILD\" ] || yarn build", + "pack-web": "yarn build-or-skip && webpack --mode development --config webpack.web.config.js" + }, + "dependencies": { + "xstream": "^11.10.0" + } +} diff --git a/packages/stream/src/concat.spec.ts b/packages/stream/src/concat.spec.ts new file mode 100644 index 00000000..6653e45b --- /dev/null +++ b/packages/stream/src/concat.spec.ts @@ -0,0 +1,203 @@ +// tslint:disable:readonly-array no-submodule-imports +import { Producer, Stream } from "xstream"; + +import { concat } from "./concat"; + +async function producerIsStopped(): Promise { + return new Promise((resolve) => setTimeout(resolve, 50)); +} + +describe("concat", () => { + it("can concat 0 streams", (done) => { + const concatenatedStream = concat(); + const expected: string[] = []; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("can concat 1 streams", (done) => { + const stream1 = Stream.of("1", "2", "3"); + const concatenatedStream = concat(stream1); + const expected = ["1", "2", "3"]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("can concat 2 streams", (done) => { + const stream1 = Stream.of("1", "2", "3"); + const stream2 = Stream.of("a", "b", "c"); + const concatenatedStream = concat(stream1, stream2); + const expected = ["1", "2", "3", "a", "b", "c"]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("can concat 3 streams", (done) => { + const stream1 = Stream.of("1", "2", "3"); + const stream2 = Stream.of("a", "b", "c"); + const stream3 = Stream.of("X", "Y", "Z"); + const concatenatedStream = concat(stream1, stream2, stream3); + const expected = ["1", "2", "3", "a", "b", "c", "X", "Y", "Z"]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("changes output order when order of streams switch", (done) => { + const stream1 = Stream.of("1", "2", "3"); + const stream2 = Stream.of("a", "b", "c"); + const concatenatedStream = concat(stream2, stream1); + const expected = ["a", "b", "c", "1", "2", "3"]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("should concat two asynchronous short streams together", (done) => { + const stream1 = Stream.periodic(25).take(3); + const stream2 = Stream.periodic(50).take(2); + const concatenatedStream = concat(stream1, stream2); + const expected = [0, 1, 2, 0, 1]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("should append a synchronous stream after an asynchronous stream", (done) => { + const stream1 = Stream.periodic(25).take(3); + const stream2 = Stream.of(30, 40, 50, 60); + const concatenatedStream = concat(stream1, stream2); + const expected = [0, 1, 2, 30, 40, 50, 60]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("buffers asynchronous events of second stream until first stream completes", (done) => { + const sourceStream = Stream.periodic(25); + const stream1 = sourceStream.take(3); + const stream2 = sourceStream.take(3); + const concatenatedStream = concat(stream1, stream2); + const expected = [0, 1, 2, 0, 1, 2]; + + concatenatedStream.addListener({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => { + expect(expected.length).toEqual(0); + done(); + }, + error: done.fail, + }); + }); + + it("unsubscribes and re-subscribes from source streams", (done) => { + // For browsers and CI, clocks and runtimes are very unreliable. + // Especialls Mac+Firefox on Travis is makes big trouble. Thus we need huge intervals. + const intervalDuration = 1000; + const producerActiveLog = new Array(); + + let producerInterval: NodeJS.Timeout; + let producerValue = 0; + const loggingProducer: Producer = { + start: (listener) => { + producerInterval = setInterval(() => listener.next(`event${producerValue++}`), intervalDuration); + producerActiveLog.push(true); + }, + stop: () => { + clearInterval(producerInterval); + producerActiveLog.push(false); + }, + }; + + const stream1 = Stream.create(loggingProducer); + const concatenatedStream = concat(stream1); + const expected = ["event0", "event1", "event2", "event3", "event4", "event5"]; + + expect(producerActiveLog).toEqual([]); + + const subscription = concatenatedStream.subscribe({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => done.fail(), + error: done.fail, + }); + + expect(producerActiveLog).toEqual([true]); + + // unsubscribe + setTimeout(async () => { + expect(producerActiveLog).toEqual([true]); + subscription.unsubscribe(); + await producerIsStopped(); + expect(producerActiveLog).toEqual([true, false]); + }, 3.75 * intervalDuration); + + // re-subscribe + setTimeout(() => { + expect(producerActiveLog).toEqual([true, false]); + + const subscription2 = concatenatedStream.subscribe({ + next: (value) => expect(value).toEqual(expected.shift()!), + complete: () => done.fail(), + error: done.fail, + }); + + expect(producerActiveLog).toEqual([true, false, true]); + + // unsubscribe again + setTimeout(async () => { + expect(producerActiveLog).toEqual([true, false, true]); + subscription2.unsubscribe(); + await producerIsStopped(); + expect(producerActiveLog).toEqual([true, false, true, false]); + + expect(expected.length).toEqual(0); + done(); + }, 3.75 * intervalDuration); + }, 6 * intervalDuration); + }); +}); diff --git a/packages/stream/src/concat.ts b/packages/stream/src/concat.ts new file mode 100644 index 00000000..56befc7c --- /dev/null +++ b/packages/stream/src/concat.ts @@ -0,0 +1,104 @@ +// tslint:disable:readonly-array +import { Producer, Stream, Subscription } from "xstream"; + +/** + * An implementation of concat that buffers all source stream events + * + * Marble diagram: + * + * ```text + * --1--2---3---4-| + * -a--b-c--d-| + * --------X---------Y---------Z- + * concat + * --1--2---3---4-abcdXY-------Z- + * ``` + * + * This is inspired by RxJS's concat as documented at http://rxmarbles.com/#concat and behaves + * differently than xstream's concat as discussed in https://github.com/staltz/xstream/issues/170. + * + */ +export function concat(...streams: Stream[]): Stream { + const subscriptions = new Array(); + const queues = new Array(); // one queue per stream + const completedStreams = new Set(); + let activeStreamIndex = 0; + + function reset(): void { + while (subscriptions.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const subscription = subscriptions.shift()!; + subscription.unsubscribe(); + } + + // tslint:disable-next-line:no-object-mutation + queues.length = 0; + completedStreams.clear(); + activeStreamIndex = 0; + } + + const producer: Producer = { + start: (listener) => { + streams.forEach((_) => queues.push([])); + + function emitAllQueuesEvents(streamIndex: number): void { + // eslint-disable-next-line no-constant-condition + while (true) { + const element = queues[streamIndex].shift(); + if (element === undefined) { + return; + } + listener.next(element); + } + } + + function isDone(): boolean { + return activeStreamIndex >= streams.length; + } + + if (isDone()) { + listener.complete(); + return; + } + + streams.forEach((stream, index) => { + subscriptions.push( + stream.subscribe({ + next: (value) => { + if (index === activeStreamIndex) { + listener.next(value); + } else { + queues[index].push(value); + } + }, + complete: () => { + completedStreams.add(index); + + while (completedStreams.has(activeStreamIndex)) { + // this stream completed: emit all and move on + emitAllQueuesEvents(activeStreamIndex); + activeStreamIndex++; + } + + if (isDone()) { + listener.complete(); + } else { + // now active stream can have some events queued but did not yet complete + emitAllQueuesEvents(activeStreamIndex); + } + }, + error: (error) => { + listener.error(error); + reset(); + }, + }), + ); + }); + }, + stop: () => { + reset(); + }, + }; + + return Stream.create(producer); +} diff --git a/packages/stream/src/defaultvalueproducer.spec.ts b/packages/stream/src/defaultvalueproducer.spec.ts new file mode 100644 index 00000000..2c2de53f --- /dev/null +++ b/packages/stream/src/defaultvalueproducer.spec.ts @@ -0,0 +1,113 @@ +import { Stream } from "xstream"; + +import { DefaultValueProducer } from "./defaultvalueproducer"; + +async function oneTickLater(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + +describe("DefaultValueProducer", () => { + it("can be constructed", () => { + const producer = new DefaultValueProducer(1); + expect(producer.value).toEqual(1); + }); + + it("can be used as a stream backend", (done) => { + const producer = new DefaultValueProducer(42); + const stream = Stream.createWithMemory(producer); + stream.addListener({ + next: (value) => { + expect(value).toEqual(42); + done(); + }, + error: done.fail, + complete: done.fail, + }); + }); + + it("can send updates", (done) => { + const producer = new DefaultValueProducer(42); + const stream = Stream.createWithMemory(producer); + + // tslint:disable-next-line:readonly-array + const events: number[] = []; + stream.addListener({ + next: (value) => { + events.push(value); + + if (events.length === 4) { + expect(events).toEqual([42, 43, 44, 45]); + done(); + } + }, + error: done.fail, + complete: done.fail, + }); + + producer.update(43); + producer.update(44); + producer.update(45); + }); + + it("can send errors", (done) => { + const producer = new DefaultValueProducer(42); + const stream = Stream.createWithMemory(producer); + + stream.addListener({ + error: (error) => { + expect(error).toEqual("oh no :("); + done(); + }, + complete: () => done.fail("Stream must not complete sucessfully"), + }); + + producer.update(1); + producer.update(2); + producer.update(3); + producer.error("oh no :("); + }); + + it("calls callbacks", async () => { + // tslint:disable-next-line:readonly-array + const producerActive: boolean[] = []; + + const producer = new DefaultValueProducer(42, { + onStarted: () => producerActive.push(true), + onStop: () => producerActive.push(false), + }); + const stream = Stream.createWithMemory(producer); + + expect(producerActive).toEqual([]); + + const subscription1 = stream.subscribe({}); + expect(producerActive).toEqual([true]); + + const subscription2 = stream.subscribe({}); + expect(producerActive).toEqual([true]); + + subscription2.unsubscribe(); + expect(producerActive).toEqual([true]); + + subscription1.unsubscribe(); + await oneTickLater(); + expect(producerActive).toEqual([true, false]); + + const subscription3 = stream.subscribe({}); + expect(producerActive).toEqual([true, false, true]); + + subscription3.unsubscribe(); + await oneTickLater(); + expect(producerActive).toEqual([true, false, true, false]); + + const subscriptionA = stream.subscribe({}); + expect(producerActive).toEqual([true, false, true, false, true]); + + // unsubscribe and re-subscribe does not deactivate the producer (which is a xstream feature) + subscriptionA.unsubscribe(); + const subscriptionB = stream.subscribe({}); + expect(producerActive).toEqual([true, false, true, false, true]); + + // cleanup + subscriptionB.unsubscribe(); + }); +}); diff --git a/packages/stream/src/defaultvalueproducer.ts b/packages/stream/src/defaultvalueproducer.ts new file mode 100644 index 00000000..8b04e4ab --- /dev/null +++ b/packages/stream/src/defaultvalueproducer.ts @@ -0,0 +1,72 @@ +import { Listener, Producer } from "xstream"; + +export interface DefaultValueProducerCallsbacks { + readonly onStarted: () => void; + readonly onStop: () => void; +} + +// allows pre-producing values before anyone is listening +export class DefaultValueProducer implements Producer { + public get value(): T { + return this.internalValue; + } + + private readonly callbacks: DefaultValueProducerCallsbacks | undefined; + // tslint:disable-next-line:readonly-keyword + private internalValue: T; + // tslint:disable-next-line:readonly-keyword + private listener: Listener | undefined; + + public constructor(value: T, callbacks?: DefaultValueProducerCallsbacks) { + this.callbacks = callbacks; + this.internalValue = value; + } + + /** + * Update the current value. + * + * If producer is active (i.e. someone is listening), this emits an event. + * If not, just the current value is updated. + */ + public update(value: T): void { + // tslint:disable-next-line:no-object-mutation + this.internalValue = value; + if (this.listener) { + this.listener.next(value); + } + } + + /** + * Produce an error + */ + public error(error: any): void { + if (this.listener) { + this.listener.error(error); + } + } + + /** + * Called by the stream. Do not call this directly. + */ + public start(listener: Listener): void { + // tslint:disable-next-line:no-object-mutation + this.listener = listener; + listener.next(this.internalValue); + + if (this.callbacks) { + this.callbacks.onStarted(); + } + } + + /** + * Called by the stream. Do not call this directly. + */ + public stop(): void { + if (this.callbacks) { + this.callbacks.onStop(); + } + + // tslint:disable-next-line:no-object-mutation + this.listener = undefined; + } +} diff --git a/packages/stream/src/dropduplicates.spec.ts b/packages/stream/src/dropduplicates.spec.ts new file mode 100644 index 00000000..70cabf93 --- /dev/null +++ b/packages/stream/src/dropduplicates.spec.ts @@ -0,0 +1,95 @@ +import { Stream } from "xstream"; + +import { dropDuplicates } from "./dropduplicates"; + +describe("dropDuplicates", () => { + it("can be created", () => { + const operand = dropDuplicates((value) => `${value}`); + expect(operand).toBeTruthy(); + }); + + it("passes unique values", (done) => { + const instream = Stream.fromArray([0, 1, 2, 3]); + const operand = dropDuplicates((value) => `${value}`); + + const events = new Array(); + instream.compose(operand).subscribe({ + next: (value) => events.push(value), + complete: () => { + expect(events).toEqual([0, 1, 2, 3]); + done(); + }, + }); + }); + + it("drops consecutive duplicates", (done) => { + const instream = Stream.fromArray([1, 2, 2, 3, 3, 3, 4, 4, 4, 4]); + const operand = dropDuplicates((value) => `${value}`); + + const events = new Array(); + instream.compose(operand).subscribe({ + next: (value) => events.push(value), + complete: () => { + expect(events).toEqual([1, 2, 3, 4]); + done(); + }, + }); + }); + + it("drops non-consecutive duplicates", (done) => { + const instream = Stream.fromArray([1, 2, 3, 4, 3, 2, 1]); + const operand = dropDuplicates((value) => `${value}`); + + const events = new Array(); + instream.compose(operand).subscribe({ + next: (value) => events.push(value), + complete: () => { + expect(events).toEqual([1, 2, 3, 4]); + done(); + }, + }); + }); + + it("uses value to key method for duplicate checks", (done) => { + const instream = Stream.fromArray([1, 10, 100, 2000, 2, 27, 1337, 3.14, 33]); + // use first character of native string representation + const valueToKey = (value: number): string => `${value}`.charAt(0); + const operand = dropDuplicates(valueToKey); + + const events = new Array(); + instream.compose(operand).subscribe({ + next: (value) => events.push(value), + complete: () => { + expect(events).toEqual([1, 2000, 3.14]); + done(); + }, + }); + }); + + it("works for empty string keys", (done) => { + interface Name { + readonly first: string; + readonly last: string; + } + + const instream = Stream.fromArray([ + { first: "Daria", last: "" }, + { first: "Sam", last: "" }, + { first: "Regina", last: "Mustermann" }, + { first: "Max", last: "Mustermann" }, + ]); + const operand = dropDuplicates((value: Name) => value.last); + + const events = new Array(); + instream.compose(operand).subscribe({ + next: (value) => events.push(value), + complete: () => { + expect(events).toEqual([ + { first: "Daria", last: "" }, + { first: "Regina", last: "Mustermann" }, + ]); + done(); + }, + }); + }); +}); diff --git a/packages/stream/src/dropduplicates.ts b/packages/stream/src/dropduplicates.ts new file mode 100644 index 00000000..b0b53c69 --- /dev/null +++ b/packages/stream/src/dropduplicates.ts @@ -0,0 +1,38 @@ +import { Stream } from "xstream"; + +/** + * The type that fits into Stream.compose() with input stream and output stream + * of the same type. + */ +export type SameTypeStreamOperator = (ins: Stream) => Stream; + +/** + * Drops duplicate values in a stream. + * + * Marble diagram: + * + * ```text + * -1-1-1-2-4-3-3-4-- + * dropDuplicates + * -1-----2-4-3------ + * ``` + * + * Each value must be uniquely identified by a string given by + * valueToKey(value). + * + * Internally this maintains a set of keys that have been processed already, + * i.e. memory consumption and Set lookup times should be considered when + * using this function. + */ +export function dropDuplicates(valueToKey: (x: T) => string): SameTypeStreamOperator { + const operand: SameTypeStreamOperator = (instream: Stream): Stream => { + const emittedKeys = new Set(); + + const deduplicatedStream = instream + .filter((value) => !emittedKeys.has(valueToKey(value))) + .debug((value) => emittedKeys.add(valueToKey(value))); + + return deduplicatedStream; + }; + return operand; +} diff --git a/packages/stream/src/index.ts b/packages/stream/src/index.ts new file mode 100644 index 00000000..562930b4 --- /dev/null +++ b/packages/stream/src/index.ts @@ -0,0 +1,6 @@ +export { DefaultValueProducer, DefaultValueProducerCallsbacks } from "./defaultvalueproducer"; +export { concat } from "./concat"; +export { dropDuplicates, SameTypeStreamOperator } from "./dropduplicates"; +export { firstEvent, fromListPromise, toListPromise } from "./promise"; +export * from "./reducer"; +export { ValueAndUpdates } from "./valueandupdates"; diff --git a/packages/stream/src/promise.spec.ts b/packages/stream/src/promise.spec.ts new file mode 100644 index 00000000..8c6696ea --- /dev/null +++ b/packages/stream/src/promise.spec.ts @@ -0,0 +1,145 @@ +// tslint:disable:readonly-array + +import { Producer, Stream } from "xstream"; + +import { firstEvent, fromListPromise, toListPromise } from "./promise"; +import { asArray, countStream } from "./reducer"; + +async function oneTickLater(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + +describe("promise", () => { + describe("fromListPromise", () => { + it("sends many values on a stream", async () => { + // create a promise that will resolve to an array of strings + const input = ["a", "fd", "fvss", "gs"]; + const prom = Promise.resolve(input); + const stream = fromListPromise(prom); + + // materialize stream into a counter, and wait for stream to complete + const counter = countStream(stream); + await counter.finished(); + expect(counter.value()).toEqual(input.length); + }); + + it("works for iterables like Uint8Array", async () => { + const inputPromise = Promise.resolve(new Uint8Array([0x00, 0x11, 0x22])); + const stream = fromListPromise(inputPromise); + + const reader = asArray(stream); + await reader.finished(); + expect(reader.value()).toEqual([0x00, 0x11, 0x22]); + }); + + it("works for delayed resolution", async () => { + const inputPromise = new Promise((resolve) => { + // resolve after 50 ms + setTimeout(() => resolve([1, 2, 3]), 50); + }); + const stream = fromListPromise(inputPromise); + + const reader = asArray(stream); + await reader.finished(); + expect(reader.value()).toEqual([1, 2, 3]); + }); + + it("sends proper values", async () => { + const input = ["let", "us", "say", "something"]; + const prom = Promise.resolve(input); + const stream = fromListPromise(prom); + + // materialize stream into an array, and wait for stream to complete + const read = asArray(stream); + await read.finished(); + expect(read.value()).toEqual(input); + }); + }); + + describe("toListPromise", () => { + it("works for simple stream with more events than count", async () => { + const events = await toListPromise(Stream.fromArray([1, 6, 92, 2, 9]), 3); + expect(events).toEqual([1, 6, 92]); + }); + + it("works for simple stream with equal events and count", async () => { + const events = await toListPromise(Stream.fromArray([1, 6, 92, 2, 9]), 5); + expect(events).toEqual([1, 6, 92, 2, 9]); + }); + + it("works for simple stream with zero count", async () => { + const events = await toListPromise(Stream.fromArray([1, 6, 92, 2, 9]), 0); + expect(events).toEqual([]); + }); + + it("works for empty stream with zero count", async () => { + const events = await toListPromise(Stream.fromArray([]), 0); + expect(events).toEqual([]); + }); + + it("rejects for simple stream with less events than count", async () => { + await toListPromise(Stream.fromArray([1, 6, 92]), 5) + .then(() => fail("must not resolve")) + .catch((error) => expect(error).toMatch(/stream completed before all events could be collected/i)); + }); + + it("works for async stream", async () => { + let interval: NodeJS.Timeout; + let producerRunning = false; + const producer: Producer = { + start: (listener) => { + producerRunning = true; + let nextElement = 0; + interval = setInterval(() => { + listener.next(nextElement++); + }, 20); + }, + stop: () => { + clearInterval(interval); + producerRunning = false; + }, + }; + + const events = await toListPromise(Stream.create(producer), 3); + expect(events).toEqual([0, 1, 2]); + await oneTickLater(); + expect(producerRunning).toEqual(false); + }); + }); + + describe("firstEvent", () => { + it("works for simple stream with more events than count", async () => { + const event = await firstEvent(Stream.fromArray([1, 6, 92, 2, 9])); + expect(event).toEqual(1); + }); + + it("rejects for stream with no events", async () => { + await firstEvent(Stream.fromArray([])) + .then(() => fail("must not resolve")) + .catch((error) => expect(error).toMatch(/stream completed before all events could be collected/i)); + }); + + it("works for async stream", async () => { + let interval: NodeJS.Timeout; + let producerRunning = false; + const producer: Producer = { + start: (listener) => { + producerRunning = true; + let nextElement = 0; + interval = setInterval(() => { + listener.next(nextElement++); + }, 20); + }, + stop: () => { + clearInterval(interval); + producerRunning = false; + }, + }; + + const event = await firstEvent(Stream.create(producer)); + expect(event).toEqual(0); + await oneTickLater(); + expect(producerRunning).toEqual(false); + }); + }); +}); diff --git a/packages/stream/src/promise.ts b/packages/stream/src/promise.ts new file mode 100644 index 00000000..77724ccb --- /dev/null +++ b/packages/stream/src/promise.ts @@ -0,0 +1,67 @@ +import { Producer, Stream } from "xstream"; + +/** + * Emits one event for each list element as soon as the promise resolves + */ +export function fromListPromise(promise: Promise>): Stream { + const producer: Producer = { + start: (listener) => { + // the code in `start` runs as soon as anyone listens to the stream + promise + .then((iterable) => { + for (const element of iterable) { + listener.next(element); + } + listener.complete(); + }) + .catch((error) => listener.error(error)); + }, + // eslint-disable-next-line @typescript-eslint/no-empty-function + stop: () => {}, + }; + + return Stream.create(producer); +} + +/** + * Listens to stream and collects events. When `count` events are collected, + * the promise resolves with an array of events. + * + * Rejects if stream completes before `count` events are collected. + */ +export async function toListPromise(stream: Stream, count: number): Promise { + return new Promise((resolve, reject) => { + if (count === 0) { + resolve([]); + return; + } + + const events = new Array(); + // take() unsubscribes from source stream automatically + stream.take(count).subscribe({ + next: (event) => { + events.push(event); + + if (events.length === count) { + resolve(events); + } + }, + complete: () => { + reject( + `Stream completed before all events could be collected. ` + + `Collected ${events.length}, expected ${count}`, + ); + }, + error: (error) => reject(error), + }); + }); +} + +/** + * Listens to stream, collects one event and revolves. + * + * Rejects if stream completes before one event was fired. + */ +export async function firstEvent(stream: Stream): Promise { + return (await toListPromise(stream, 1))[0]; +} diff --git a/packages/stream/src/reducer.spec.ts b/packages/stream/src/reducer.spec.ts new file mode 100644 index 00000000..e71755f9 --- /dev/null +++ b/packages/stream/src/reducer.spec.ts @@ -0,0 +1,56 @@ +// tslint:disable:readonly-array +import { Stream } from "xstream"; + +import { asArray, countStream, lastValue } from "./reducer"; + +describe("Test stream helpers", () => { + it("readIntoArray returns input", async () => { + const input = [1, 6, 92, 2, 9]; + const stream = Stream.fromArray(input); + const result = asArray(stream); + await result.finished(); + expect(result.value()).toEqual(input); + + // also handle empty properly + const result2 = asArray(Stream.empty()); + await result2.finished(); + expect(result2.value()).toEqual([]); + }); + + it("countStream returns number of items", async () => { + const input = ["abc", "123", "def", "superstar!", "is"]; + const stream = Stream.fromArray(input); + const result = countStream(stream); + await result.finished(); + expect(result.value()).toEqual(input.length); + + // also handle empty properly + const result2 = countStream(Stream.empty()); + await result2.finished(); + expect(result2.value()).toEqual(0); + }); + + it("lastValue returns input", async () => { + const input = ["Some", "people", "say", "there", "is", "something"]; + const stream = Stream.fromArray(input); + const result = lastValue(stream); + await result.finished(); + expect(result.value()).toEqual("something"); + + // also handle empty properly (undefined) + const result2 = lastValue(Stream.empty()); + await result2.finished(); + expect(result2.value()).toBeUndefined(); + }); + + it("Reducer.finished throws error on stream error", async () => { + const stream = Stream.throw("error"); + try { + const result = asArray(stream); + await result.finished(); + fail("This should have thrown an error"); + } catch (err) { + expect(err).toEqual("error"); + } + }); +}); diff --git a/packages/stream/src/reducer.ts b/packages/stream/src/reducer.ts new file mode 100644 index 00000000..9dd7848a --- /dev/null +++ b/packages/stream/src/reducer.ts @@ -0,0 +1,87 @@ +/* +This file maintains some stream helpers used in iov-bns, but which +may be useful other places, and should consider to be moved. + +Reducer and related code works to maintain a current state +materialized by reducing all events on a stream. Similar +to ValueAndUpdate in keycontrol, but more general. +*/ + +// tslint:disable:readonly-keyword +// tslint:disable:no-object-mutation +import { Stream } from "xstream"; + +export type ReducerFunc = (acc: U, evt: T) => U; + +// Reducer takes a stream of events T and a ReducerFunc, that +// materializes a state of type U. +export class Reducer { + private readonly stream: Stream; + private readonly reducer: ReducerFunc; + private state: U; + // completed maintains state of stream, resolves/rejects + // on complete or error + private readonly completed: Promise; + + public constructor(stream: Stream, reducer: ReducerFunc, initState: U) { + this.stream = stream; + this.reducer = reducer; + this.state = initState; + this.completed = new Promise((resolve, reject) => { + const subscription = this.stream.subscribe({ + next: (evt: T) => { + this.state = this.reducer(this.state, evt); + }, + complete: () => { + resolve(); + // this must happen after resolve, to ensure stream.subscribe() has finished + subscription.unsubscribe(); + }, + error: (err: any) => { + reject(err); + // the stream already closed on error, but unsubscribe to be safe + subscription.unsubscribe(); + }, + }); + }); + } + + // value returns current materialized state + public value(): U { + return this.state; + } + + // finished resolves on completed stream, rejects on stream error + public async finished(): Promise { + return this.completed; + } +} + +function increment(sum: number, _: T): number { + return sum + 1; +} + +// countStream returns a reducer that contains current count +// of events on the stream +export function countStream(stream: Stream): Reducer { + return new Reducer(stream, increment, 0); +} + +function append(list: readonly T[], evt: T): readonly T[] { + return [...list, evt]; +} + +// asArray maintains an array containing all events that have +// occurred on the stream +export function asArray(stream: Stream): Reducer { + return new Reducer(stream, append, []); +} + +function last(_: any, event: T): T { + return event; +} + +// lastValue returns the last value read from the stream, or undefined if no values sent +export function lastValue(stream: Stream): Reducer { + return new Reducer(stream, last, undefined); +} diff --git a/packages/stream/src/valueandupdates.spec.ts b/packages/stream/src/valueandupdates.spec.ts new file mode 100644 index 00000000..2b5d2dc4 --- /dev/null +++ b/packages/stream/src/valueandupdates.spec.ts @@ -0,0 +1,211 @@ +import { Listener } from "xstream"; + +import { DefaultValueProducer } from "./defaultvalueproducer"; +import { ValueAndUpdates } from "./valueandupdates"; + +describe("ValueAndUpdates", () => { + it("can be constructed", () => { + { + const vau = new ValueAndUpdates(new DefaultValueProducer("a")); + expect(vau).toBeTruthy(); + } + { + const vau = new ValueAndUpdates(new DefaultValueProducer(123)); + expect(vau).toBeTruthy(); + } + { + const vau = new ValueAndUpdates(new DefaultValueProducer(true)); + expect(vau).toBeTruthy(); + } + { + const vau = new ValueAndUpdates(new DefaultValueProducer(null)); + expect(vau).toBeTruthy(); + } + }); + + it("contains initial value", () => { + { + const vau = new ValueAndUpdates(new DefaultValueProducer("a")); + expect(vau.value).toEqual("a"); + } + { + const vau = new ValueAndUpdates(new DefaultValueProducer(123)); + expect(vau.value).toEqual(123); + } + { + const vau = new ValueAndUpdates(new DefaultValueProducer(true)); + expect(vau.value).toEqual(true); + } + { + const vau = new ValueAndUpdates(new DefaultValueProducer(null)); + expect(vau.value).toEqual(null); + } + }); + + it("can be updated", () => { + { + const producer = new DefaultValueProducer("a"); + const vau = new ValueAndUpdates(producer); + expect(vau.value).toEqual("a"); + producer.update("b"); + expect(vau.value).toEqual("b"); + } + { + const producer = new DefaultValueProducer(123); + const vau = new ValueAndUpdates(producer); + expect(vau.value).toEqual(123); + producer.update(44); + expect(vau.value).toEqual(44); + } + { + const producer = new DefaultValueProducer(true); + const vau = new ValueAndUpdates(producer); + expect(vau.value).toEqual(true); + producer.update(false); + expect(vau.value).toEqual(false); + } + { + const producer = new DefaultValueProducer(null); + const vau = new ValueAndUpdates(producer); + expect(vau.value).toEqual(null); + producer.update(null); + expect(vau.value).toEqual(null); + } + }); + + it("emits initial value to new listeners", (done) => { + const vau = new ValueAndUpdates(new DefaultValueProducer(123)); + + const listener2: Listener = { + next: (value) => { + expect(value).toEqual(123); + done(); + }, + complete: () => done.fail(".updates stream must not complete"), + error: (e) => done.fail(e), + }; + + const listener1: Listener = { + next: (value) => { + expect(value).toEqual(123); + vau.updates.addListener(listener2); + }, + complete: () => done.fail(".updates stream must not complete"), + error: (e) => done.fail(e), + }; + + vau.updates.addListener(listener1); + }); + + it("emits current value to new listeners", (done) => { + const producer = new DefaultValueProducer(123); + const vau = new ValueAndUpdates(producer); + producer.update(99); + + const listener2: Listener = { + next: (value) => { + expect(value).toEqual(99); + done(); + }, + complete: () => done.fail(".updates stream must not complete"), + error: (e) => done.fail(e), + }; + + const listener1: Listener = { + next: (value) => { + expect(value).toEqual(99); + vau.updates.addListener(listener2); + }, + complete: () => done.fail(".updates stream must not complete"), + error: (e) => done.fail(e), + }; + + vau.updates.addListener(listener1); + }); + + it("emits updates to listener", (done) => { + const producer = new DefaultValueProducer(11); + const vau = new ValueAndUpdates(producer); + + let eventsCount = 0; + const emittedValues = new Array(); + + vau.updates.addListener({ + next: (value) => { + eventsCount++; + emittedValues.push(value); + + if (eventsCount === 4) { + expect(emittedValues).toEqual([11, 22, 33, 44]); + done(); + } + }, + complete: () => done.fail(".updates stream must not complete"), + error: (e) => done.fail(e), + }); + + setTimeout(() => producer.update(22), 10); + setTimeout(() => producer.update(33), 20); + setTimeout(() => producer.update(44), 30); + }); + + it("can wait for value", async () => { + const producer = new DefaultValueProducer(11); + const vau = new ValueAndUpdates(producer); + + setTimeout(() => producer.update(22), 10); + setTimeout(() => producer.update(33), 20); + setTimeout(() => producer.update(44), 30); + + await vau.waitFor(33); + expect(vau.value).toEqual(33); + + await vau.waitFor(44); + expect(vau.value).toEqual(44); + }); + + it("can wait for search function to return true", async () => { + const producer = new DefaultValueProducer(11); + const vau = new ValueAndUpdates(producer); + + setTimeout(() => producer.update(22), 10); + setTimeout(() => producer.update(33), 20); + setTimeout(() => producer.update(44), 30); + + await vau.waitFor((v) => v > 30); + expect(vau.value).toEqual(33); + + await vau.waitFor((v) => v > 40); + expect(vau.value).toEqual(44); + }); + + it("gets the correct return value in waitFor", async () => { + const producer = new DefaultValueProducer(11); + const vau = new ValueAndUpdates(producer); + + setTimeout(() => producer.update(22), 10); + setTimeout(() => producer.update(33), 20); + + { + const result = await vau.waitFor(22); + expect(result).toEqual(22); + } + + { + const result = await vau.waitFor((v) => v > 30); + expect(result).toEqual(33); + } + }); + + describe("waitFor", () => { + it("propagates error from stream", async () => { + const producer = new DefaultValueProducer(1); + const vau = new ValueAndUpdates(producer); + setTimeout(() => producer.error(new Error("something went wrong")), 10); + await vau.waitFor(3).then( + () => fail("must not resolve"), + (error) => expect(error).toMatch(/something went wrong/), + ); + }); + }); +}); diff --git a/packages/stream/src/valueandupdates.ts b/packages/stream/src/valueandupdates.ts new file mode 100644 index 00000000..d42317a4 --- /dev/null +++ b/packages/stream/src/valueandupdates.ts @@ -0,0 +1,57 @@ +import { MemoryStream } from "xstream"; + +import { DefaultValueProducer } from "./defaultvalueproducer"; + +export type SearchFunction = (value: T) => boolean; + +/** + * A read only wrapper around DefaultValueProducer that allows + * to synchronously get the current value using the .value property + * and listen to to updates by suscribing to the .updates stream + */ +export class ValueAndUpdates { + public readonly updates: MemoryStream; + + public get value(): T { + return this.producer.value; + } + + private readonly producer: DefaultValueProducer; + + public constructor(producer: DefaultValueProducer) { + this.producer = producer; + this.updates = MemoryStream.createWithMemory(this.producer); + } + + /** + * Resolves as soon as search value is found. + * + * @param search either a value or a function that must return true when found + * @returns the value of the update that caused the search match + */ + public async waitFor(search: SearchFunction | T): Promise { + const searchImplementation: SearchFunction = + typeof search === "function" ? (search as SearchFunction) : (value: T): boolean => value === search; + + return new Promise((resolve, reject) => { + const subscription = this.updates.subscribe({ + next: (newValue) => { + if (searchImplementation(newValue)) { + resolve(newValue); + + // MemoryStream.subscribe() calls next with the last value. + // Make async to ensure the subscription exists + setTimeout(() => subscription.unsubscribe(), 0); + } + }, + complete: () => { + subscription.unsubscribe(); + reject("Update stream completed without expected value"); + }, + error: (error) => { + reject(error); + }, + }); + }); + } +} diff --git a/packages/stream/tsconfig.json b/packages/stream/tsconfig.json new file mode 100644 index 00000000..167e8c02 --- /dev/null +++ b/packages/stream/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "baseUrl": ".", + "outDir": "build", + "declarationDir": "build/types", + "rootDir": "src" + }, + "include": [ + "src/**/*" + ] +} diff --git a/packages/stream/tslint.json b/packages/stream/tslint.json new file mode 100644 index 00000000..0946f209 --- /dev/null +++ b/packages/stream/tslint.json @@ -0,0 +1,3 @@ +{ + "extends": "../../tslint.json" +} diff --git a/packages/stream/typedoc.js b/packages/stream/typedoc.js new file mode 100644 index 00000000..e2387c7d --- /dev/null +++ b/packages/stream/typedoc.js @@ -0,0 +1,14 @@ +const packageJson = require("./package.json"); + +module.exports = { + src: ["./src"], + out: "docs", + exclude: "**/*.spec.ts", + target: "es6", + name: `${packageJson.name} Documentation`, + readme: "README.md", + mode: "file", + excludeExternals: true, + excludeNotExported: true, + excludePrivate: true, +}; diff --git a/packages/stream/types/concat.d.ts b/packages/stream/types/concat.d.ts new file mode 100644 index 00000000..cd1ea4a5 --- /dev/null +++ b/packages/stream/types/concat.d.ts @@ -0,0 +1,19 @@ +import { Stream } from "xstream"; +/** + * An implementation of concat that buffers all source stream events + * + * Marble diagram: + * + * ```text + * --1--2---3---4-| + * -a--b-c--d-| + * --------X---------Y---------Z- + * concat + * --1--2---3---4-abcdXY-------Z- + * ``` + * + * This is inspired by RxJS's concat as documented at http://rxmarbles.com/#concat and behaves + * differently than xstream's concat as discussed in https://github.com/staltz/xstream/issues/170. + * + */ +export declare function concat(...streams: Stream[]): Stream; diff --git a/packages/stream/types/defaultvalueproducer.d.ts b/packages/stream/types/defaultvalueproducer.d.ts new file mode 100644 index 00000000..e8359790 --- /dev/null +++ b/packages/stream/types/defaultvalueproducer.d.ts @@ -0,0 +1,31 @@ +import { Listener, Producer } from "xstream"; +export interface DefaultValueProducerCallsbacks { + readonly onStarted: () => void; + readonly onStop: () => void; +} +export declare class DefaultValueProducer implements Producer { + get value(): T; + private readonly callbacks; + private internalValue; + private listener; + constructor(value: T, callbacks?: DefaultValueProducerCallsbacks); + /** + * Update the current value. + * + * If producer is active (i.e. someone is listening), this emits an event. + * If not, just the current value is updated. + */ + update(value: T): void; + /** + * Produce an error + */ + error(error: any): void; + /** + * Called by the stream. Do not call this directly. + */ + start(listener: Listener): void; + /** + * Called by the stream. Do not call this directly. + */ + stop(): void; +} diff --git a/packages/stream/types/dropduplicates.d.ts b/packages/stream/types/dropduplicates.d.ts new file mode 100644 index 00000000..3a31cdd0 --- /dev/null +++ b/packages/stream/types/dropduplicates.d.ts @@ -0,0 +1,25 @@ +import { Stream } from "xstream"; +/** + * The type that fits into Stream.compose() with input stream and output stream + * of the same type. + */ +export declare type SameTypeStreamOperator = (ins: Stream) => Stream; +/** + * Drops duplicate values in a stream. + * + * Marble diagram: + * + * ```text + * -1-1-1-2-4-3-3-4-- + * dropDuplicates + * -1-----2-4-3------ + * ``` + * + * Each value must be uniquely identified by a string given by + * valueToKey(value). + * + * Internally this maintains a set of keys that have been processed already, + * i.e. memory consumption and Set lookup times should be considered when + * using this function. + */ +export declare function dropDuplicates(valueToKey: (x: T) => string): SameTypeStreamOperator; diff --git a/packages/stream/types/index.d.ts b/packages/stream/types/index.d.ts new file mode 100644 index 00000000..562930b4 --- /dev/null +++ b/packages/stream/types/index.d.ts @@ -0,0 +1,6 @@ +export { DefaultValueProducer, DefaultValueProducerCallsbacks } from "./defaultvalueproducer"; +export { concat } from "./concat"; +export { dropDuplicates, SameTypeStreamOperator } from "./dropduplicates"; +export { firstEvent, fromListPromise, toListPromise } from "./promise"; +export * from "./reducer"; +export { ValueAndUpdates } from "./valueandupdates"; diff --git a/packages/stream/types/promise.d.ts b/packages/stream/types/promise.d.ts new file mode 100644 index 00000000..7c29bb1a --- /dev/null +++ b/packages/stream/types/promise.d.ts @@ -0,0 +1,18 @@ +import { Stream } from "xstream"; +/** + * Emits one event for each list element as soon as the promise resolves + */ +export declare function fromListPromise(promise: Promise>): Stream; +/** + * Listens to stream and collects events. When `count` events are collected, + * the promise resolves with an array of events. + * + * Rejects if stream completes before `count` events are collected. + */ +export declare function toListPromise(stream: Stream, count: number): Promise; +/** + * Listens to stream, collects one event and revolves. + * + * Rejects if stream completes before one event was fired. + */ +export declare function firstEvent(stream: Stream): Promise; diff --git a/packages/stream/types/reducer.d.ts b/packages/stream/types/reducer.d.ts new file mode 100644 index 00000000..731c709d --- /dev/null +++ b/packages/stream/types/reducer.d.ts @@ -0,0 +1,14 @@ +import { Stream } from "xstream"; +export declare type ReducerFunc = (acc: U, evt: T) => U; +export declare class Reducer { + private readonly stream; + private readonly reducer; + private state; + private readonly completed; + constructor(stream: Stream, reducer: ReducerFunc, initState: U); + value(): U; + finished(): Promise; +} +export declare function countStream(stream: Stream): Reducer; +export declare function asArray(stream: Stream): Reducer; +export declare function lastValue(stream: Stream): Reducer; diff --git a/packages/stream/types/valueandupdates.d.ts b/packages/stream/types/valueandupdates.d.ts new file mode 100644 index 00000000..1745b020 --- /dev/null +++ b/packages/stream/types/valueandupdates.d.ts @@ -0,0 +1,21 @@ +import { MemoryStream } from "xstream"; +import { DefaultValueProducer } from "./defaultvalueproducer"; +export declare type SearchFunction = (value: T) => boolean; +/** + * A read only wrapper around DefaultValueProducer that allows + * to synchronously get the current value using the .value property + * and listen to to updates by suscribing to the .updates stream + */ +export declare class ValueAndUpdates { + readonly updates: MemoryStream; + get value(): T; + private readonly producer; + constructor(producer: DefaultValueProducer); + /** + * Resolves as soon as search value is found. + * + * @param search either a value or a function that must return true when found + * @returns the value of the update that caused the search match + */ + waitFor(search: SearchFunction | T): Promise; +} diff --git a/packages/stream/webpack.web.config.js b/packages/stream/webpack.web.config.js new file mode 100644 index 00000000..9d5836a8 --- /dev/null +++ b/packages/stream/webpack.web.config.js @@ -0,0 +1,17 @@ +const glob = require("glob"); +const path = require("path"); + +const target = "web"; +const distdir = path.join(__dirname, "dist", "web"); + +module.exports = [ + { + // bundle used for Karma tests + target: target, + entry: glob.sync("./build/**/*.spec.js"), + output: { + path: distdir, + filename: "tests.js", + }, + }, +];