stream: Fork @iov/stream

This commit is contained in:
willclarktech 2020-06-24 17:50:47 +02:00
parent 200a0f7fe6
commit ea68244a5f
No known key found for this signature in database
GPG Key ID: 551A86E2E398ADF7
83 changed files with 1571 additions and 0 deletions

View File

@ -0,0 +1 @@
../../.eslintignore

3
packages/stream/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build/
dist/
docs/

16
packages/stream/README.md Normal file
View File

@ -0,0 +1,16 @@
# @iov/stream
[![npm version](https://img.shields.io/npm/v/@iov/stream.svg)](https://www.npmjs.com/package/@iov/stream)
@iov/stream are some helper methods and classes to ceal with stream processing.
As BCP's `BlockchainConnection` exposes a Stream based event API, and
tendermint/RpcClient exposes Stream based subscriptions, there is common useful
functionality for creating and consuming streams both in applications as well as
in the test code.
## License
This package is part of the IOV-Core repository, licensed under the Apache
License 2.0 (see
[NOTICE](https://github.com/iov-one/iov-core/blob/master/NOTICE) and
[LICENSE](https://github.com/iov-one/iov-core/blob/master/LICENSE)).

View File

@ -0,0 +1,26 @@
#!/usr/bin/env node
require("source-map-support").install();
const defaultSpecReporterConfig = require("../../jasmine-spec-reporter.config.json");
// setup Jasmine
const Jasmine = require("jasmine");
const jasmine = new Jasmine();
jasmine.loadConfig({
spec_dir: "build",
spec_files: ["**/*.spec.js"],
helpers: [],
random: false,
seed: null,
stopSpecOnExpectationFailure: false,
});
jasmine.jasmine.DEFAULT_TIMEOUT_INTERVAL = 15 * 1000;
// setup reporter
const { SpecReporter } = require("jasmine-spec-reporter");
const reporter = new SpecReporter({ ...defaultSpecReporterConfig });
// initialize and execute
jasmine.env.clearReporters();
jasmine.addReporter(reporter);
jasmine.execute();

View File

@ -0,0 +1,47 @@
module.exports = function (config) {
config.set({
// base path that will be used to resolve all patterns (eg. files, exclude)
basePath: ".",
// frameworks to use
// available frameworks: https://npmjs.org/browse/keyword/karma-adapter
frameworks: ["jasmine"],
// list of files / patterns to load in the browser
files: ["dist/web/tests.js"],
client: {
jasmine: {
random: false,
timeoutInterval: 15000,
},
},
// test results reporter to use
// possible values: 'dots', 'progress'
// available reporters: https://npmjs.org/browse/keyword/karma-reporter
reporters: ["progress", "kjhtml"],
// web server port
port: 9876,
// enable / disable colors in the output (reporters and logs)
colors: true,
// level of logging
// possible values: config.LOG_DISABLE || config.LOG_ERROR || config.LOG_WARN || config.LOG_INFO || config.LOG_DEBUG
logLevel: config.LOG_INFO,
// enable / disable watching file and executing tests whenever any file changes
autoWatch: false,
// start these browsers
// available browser launchers: https://npmjs.org/browse/keyword/karma-launcher
browsers: ["Firefox"],
browserNoActivityTimeout: 90000,
// Keep brower open for debugging. This is overridden by yarn scripts
singleRun: false,
});
};

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

View File

@ -0,0 +1 @@
Directory used to trigger lerna package updates for all packages

View File

@ -0,0 +1,43 @@
{
"name": "@iov/stream",
"version": "2.3.2",
"description": "Utility functions for producing and consuming Streams",
"author": "IOV SAS <admin@iov.one>",
"license": "Apache-2.0",
"main": "build/index.js",
"types": "types/index.d.ts",
"files": [
"build/",
"types/",
"*.md",
"!*.spec.*",
"!**/testdata/"
],
"repository": {
"type": "git",
"url": "https://github.com/iov-one/iov-core/tree/master/packages/iov-stream"
},
"publishConfig": {
"access": "public"
},
"scripts": {
"docs": "shx rm -rf docs && typedoc --options typedoc.js",
"lint": "eslint --max-warnings 0 \"**/*.{js,ts}\" && tslint -t verbose --project .",
"format": "prettier --write --loglevel warn \"./src/**/*.ts\"",
"format-text": "prettier --write --prose-wrap always --print-width 80 \"./*.md\"",
"test-node": "node jasmine-testrunner.js",
"test-edge": "yarn pack-web && karma start --single-run --browsers Edge",
"test-firefox": "yarn pack-web && karma start --single-run --browsers Firefox",
"test-chrome": "yarn pack-web && karma start --single-run --browsers ChromeHeadless",
"test-safari": "yarn pack-web && karma start --single-run --browsers Safari",
"test": "yarn build-or-skip && yarn test-node",
"move-types": "shx rm -r ./types/* && shx mv build/types/* ./types && rm -rf ./types/testdata && shx rm -f ./types/*.spec.d.ts",
"format-types": "prettier --write --loglevel warn \"./types/**/*.d.ts\"",
"build": "shx rm -rf ./build && tsc && yarn move-types && yarn format-types",
"build-or-skip": "[ -n \"$SKIP_BUILD\" ] || yarn build",
"pack-web": "yarn build-or-skip && webpack --mode development --config webpack.web.config.js"
},
"dependencies": {
"xstream": "^11.10.0"
}
}

View File

@ -0,0 +1,203 @@
// tslint:disable:readonly-array no-submodule-imports
import { Producer, Stream } from "xstream";
import { concat } from "./concat";
async function producerIsStopped(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 50));
}
describe("concat", () => {
it("can concat 0 streams", (done) => {
const concatenatedStream = concat();
const expected: string[] = [];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("can concat 1 streams", (done) => {
const stream1 = Stream.of("1", "2", "3");
const concatenatedStream = concat(stream1);
const expected = ["1", "2", "3"];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("can concat 2 streams", (done) => {
const stream1 = Stream.of("1", "2", "3");
const stream2 = Stream.of("a", "b", "c");
const concatenatedStream = concat(stream1, stream2);
const expected = ["1", "2", "3", "a", "b", "c"];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("can concat 3 streams", (done) => {
const stream1 = Stream.of("1", "2", "3");
const stream2 = Stream.of("a", "b", "c");
const stream3 = Stream.of("X", "Y", "Z");
const concatenatedStream = concat(stream1, stream2, stream3);
const expected = ["1", "2", "3", "a", "b", "c", "X", "Y", "Z"];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("changes output order when order of streams switch", (done) => {
const stream1 = Stream.of("1", "2", "3");
const stream2 = Stream.of("a", "b", "c");
const concatenatedStream = concat(stream2, stream1);
const expected = ["a", "b", "c", "1", "2", "3"];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("should concat two asynchronous short streams together", (done) => {
const stream1 = Stream.periodic(25).take(3);
const stream2 = Stream.periodic(50).take(2);
const concatenatedStream = concat(stream1, stream2);
const expected = [0, 1, 2, 0, 1];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("should append a synchronous stream after an asynchronous stream", (done) => {
const stream1 = Stream.periodic(25).take(3);
const stream2 = Stream.of(30, 40, 50, 60);
const concatenatedStream = concat(stream1, stream2);
const expected = [0, 1, 2, 30, 40, 50, 60];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("buffers asynchronous events of second stream until first stream completes", (done) => {
const sourceStream = Stream.periodic(25);
const stream1 = sourceStream.take(3);
const stream2 = sourceStream.take(3);
const concatenatedStream = concat(stream1, stream2);
const expected = [0, 1, 2, 0, 1, 2];
concatenatedStream.addListener({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => {
expect(expected.length).toEqual(0);
done();
},
error: done.fail,
});
});
it("unsubscribes and re-subscribes from source streams", (done) => {
// For browsers and CI, clocks and runtimes are very unreliable.
// Especialls Mac+Firefox on Travis is makes big trouble. Thus we need huge intervals.
const intervalDuration = 1000;
const producerActiveLog = new Array<boolean>();
let producerInterval: NodeJS.Timeout;
let producerValue = 0;
const loggingProducer: Producer<string> = {
start: (listener) => {
producerInterval = setInterval(() => listener.next(`event${producerValue++}`), intervalDuration);
producerActiveLog.push(true);
},
stop: () => {
clearInterval(producerInterval);
producerActiveLog.push(false);
},
};
const stream1 = Stream.create(loggingProducer);
const concatenatedStream = concat(stream1);
const expected = ["event0", "event1", "event2", "event3", "event4", "event5"];
expect(producerActiveLog).toEqual([]);
const subscription = concatenatedStream.subscribe({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => done.fail(),
error: done.fail,
});
expect(producerActiveLog).toEqual([true]);
// unsubscribe
setTimeout(async () => {
expect(producerActiveLog).toEqual([true]);
subscription.unsubscribe();
await producerIsStopped();
expect(producerActiveLog).toEqual([true, false]);
}, 3.75 * intervalDuration);
// re-subscribe
setTimeout(() => {
expect(producerActiveLog).toEqual([true, false]);
const subscription2 = concatenatedStream.subscribe({
next: (value) => expect(value).toEqual(expected.shift()!),
complete: () => done.fail(),
error: done.fail,
});
expect(producerActiveLog).toEqual([true, false, true]);
// unsubscribe again
setTimeout(async () => {
expect(producerActiveLog).toEqual([true, false, true]);
subscription2.unsubscribe();
await producerIsStopped();
expect(producerActiveLog).toEqual([true, false, true, false]);
expect(expected.length).toEqual(0);
done();
}, 3.75 * intervalDuration);
}, 6 * intervalDuration);
});
});

View File

@ -0,0 +1,104 @@
// tslint:disable:readonly-array
import { Producer, Stream, Subscription } from "xstream";
/**
* An implementation of concat that buffers all source stream events
*
* Marble diagram:
*
* ```text
* --1--2---3---4-|
* -a--b-c--d-|
* --------X---------Y---------Z-
* concat
* --1--2---3---4-abcdXY-------Z-
* ```
*
* This is inspired by RxJS's concat as documented at http://rxmarbles.com/#concat and behaves
* differently than xstream's concat as discussed in https://github.com/staltz/xstream/issues/170.
*
*/
export function concat<T>(...streams: Stream<T>[]): Stream<T> {
const subscriptions = new Array<Subscription>();
const queues = new Array<T[]>(); // one queue per stream
const completedStreams = new Set<number>();
let activeStreamIndex = 0;
function reset(): void {
while (subscriptions.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const subscription = subscriptions.shift()!;
subscription.unsubscribe();
}
// tslint:disable-next-line:no-object-mutation
queues.length = 0;
completedStreams.clear();
activeStreamIndex = 0;
}
const producer: Producer<T> = {
start: (listener) => {
streams.forEach((_) => queues.push([]));
function emitAllQueuesEvents(streamIndex: number): void {
// eslint-disable-next-line no-constant-condition
while (true) {
const element = queues[streamIndex].shift();
if (element === undefined) {
return;
}
listener.next(element);
}
}
function isDone(): boolean {
return activeStreamIndex >= streams.length;
}
if (isDone()) {
listener.complete();
return;
}
streams.forEach((stream, index) => {
subscriptions.push(
stream.subscribe({
next: (value) => {
if (index === activeStreamIndex) {
listener.next(value);
} else {
queues[index].push(value);
}
},
complete: () => {
completedStreams.add(index);
while (completedStreams.has(activeStreamIndex)) {
// this stream completed: emit all and move on
emitAllQueuesEvents(activeStreamIndex);
activeStreamIndex++;
}
if (isDone()) {
listener.complete();
} else {
// now active stream can have some events queued but did not yet complete
emitAllQueuesEvents(activeStreamIndex);
}
},
error: (error) => {
listener.error(error);
reset();
},
}),
);
});
},
stop: () => {
reset();
},
};
return Stream.create(producer);
}

View File

@ -0,0 +1,113 @@
import { Stream } from "xstream";
import { DefaultValueProducer } from "./defaultvalueproducer";
async function oneTickLater(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 0));
}
describe("DefaultValueProducer", () => {
it("can be constructed", () => {
const producer = new DefaultValueProducer(1);
expect(producer.value).toEqual(1);
});
it("can be used as a stream backend", (done) => {
const producer = new DefaultValueProducer(42);
const stream = Stream.createWithMemory(producer);
stream.addListener({
next: (value) => {
expect(value).toEqual(42);
done();
},
error: done.fail,
complete: done.fail,
});
});
it("can send updates", (done) => {
const producer = new DefaultValueProducer(42);
const stream = Stream.createWithMemory(producer);
// tslint:disable-next-line:readonly-array
const events: number[] = [];
stream.addListener({
next: (value) => {
events.push(value);
if (events.length === 4) {
expect(events).toEqual([42, 43, 44, 45]);
done();
}
},
error: done.fail,
complete: done.fail,
});
producer.update(43);
producer.update(44);
producer.update(45);
});
it("can send errors", (done) => {
const producer = new DefaultValueProducer(42);
const stream = Stream.createWithMemory(producer);
stream.addListener({
error: (error) => {
expect(error).toEqual("oh no :(");
done();
},
complete: () => done.fail("Stream must not complete sucessfully"),
});
producer.update(1);
producer.update(2);
producer.update(3);
producer.error("oh no :(");
});
it("calls callbacks", async () => {
// tslint:disable-next-line:readonly-array
const producerActive: boolean[] = [];
const producer = new DefaultValueProducer(42, {
onStarted: () => producerActive.push(true),
onStop: () => producerActive.push(false),
});
const stream = Stream.createWithMemory(producer);
expect(producerActive).toEqual([]);
const subscription1 = stream.subscribe({});
expect(producerActive).toEqual([true]);
const subscription2 = stream.subscribe({});
expect(producerActive).toEqual([true]);
subscription2.unsubscribe();
expect(producerActive).toEqual([true]);
subscription1.unsubscribe();
await oneTickLater();
expect(producerActive).toEqual([true, false]);
const subscription3 = stream.subscribe({});
expect(producerActive).toEqual([true, false, true]);
subscription3.unsubscribe();
await oneTickLater();
expect(producerActive).toEqual([true, false, true, false]);
const subscriptionA = stream.subscribe({});
expect(producerActive).toEqual([true, false, true, false, true]);
// unsubscribe and re-subscribe does not deactivate the producer (which is a xstream feature)
subscriptionA.unsubscribe();
const subscriptionB = stream.subscribe({});
expect(producerActive).toEqual([true, false, true, false, true]);
// cleanup
subscriptionB.unsubscribe();
});
});

View File

@ -0,0 +1,72 @@
import { Listener, Producer } from "xstream";
export interface DefaultValueProducerCallsbacks {
readonly onStarted: () => void;
readonly onStop: () => void;
}
// allows pre-producing values before anyone is listening
export class DefaultValueProducer<T> implements Producer<T> {
public get value(): T {
return this.internalValue;
}
private readonly callbacks: DefaultValueProducerCallsbacks | undefined;
// tslint:disable-next-line:readonly-keyword
private internalValue: T;
// tslint:disable-next-line:readonly-keyword
private listener: Listener<T> | undefined;
public constructor(value: T, callbacks?: DefaultValueProducerCallsbacks) {
this.callbacks = callbacks;
this.internalValue = value;
}
/**
* Update the current value.
*
* If producer is active (i.e. someone is listening), this emits an event.
* If not, just the current value is updated.
*/
public update(value: T): void {
// tslint:disable-next-line:no-object-mutation
this.internalValue = value;
if (this.listener) {
this.listener.next(value);
}
}
/**
* Produce an error
*/
public error(error: any): void {
if (this.listener) {
this.listener.error(error);
}
}
/**
* Called by the stream. Do not call this directly.
*/
public start(listener: Listener<T>): void {
// tslint:disable-next-line:no-object-mutation
this.listener = listener;
listener.next(this.internalValue);
if (this.callbacks) {
this.callbacks.onStarted();
}
}
/**
* Called by the stream. Do not call this directly.
*/
public stop(): void {
if (this.callbacks) {
this.callbacks.onStop();
}
// tslint:disable-next-line:no-object-mutation
this.listener = undefined;
}
}

View File

@ -0,0 +1,95 @@
import { Stream } from "xstream";
import { dropDuplicates } from "./dropduplicates";
describe("dropDuplicates", () => {
it("can be created", () => {
const operand = dropDuplicates<number>((value) => `${value}`);
expect(operand).toBeTruthy();
});
it("passes unique values", (done) => {
const instream = Stream.fromArray([0, 1, 2, 3]);
const operand = dropDuplicates<number>((value) => `${value}`);
const events = new Array<number>();
instream.compose(operand).subscribe({
next: (value) => events.push(value),
complete: () => {
expect(events).toEqual([0, 1, 2, 3]);
done();
},
});
});
it("drops consecutive duplicates", (done) => {
const instream = Stream.fromArray([1, 2, 2, 3, 3, 3, 4, 4, 4, 4]);
const operand = dropDuplicates<number>((value) => `${value}`);
const events = new Array<number>();
instream.compose(operand).subscribe({
next: (value) => events.push(value),
complete: () => {
expect(events).toEqual([1, 2, 3, 4]);
done();
},
});
});
it("drops non-consecutive duplicates", (done) => {
const instream = Stream.fromArray([1, 2, 3, 4, 3, 2, 1]);
const operand = dropDuplicates<number>((value) => `${value}`);
const events = new Array<number>();
instream.compose(operand).subscribe({
next: (value) => events.push(value),
complete: () => {
expect(events).toEqual([1, 2, 3, 4]);
done();
},
});
});
it("uses value to key method for duplicate checks", (done) => {
const instream = Stream.fromArray([1, 10, 100, 2000, 2, 27, 1337, 3.14, 33]);
// use first character of native string representation
const valueToKey = (value: number): string => `${value}`.charAt(0);
const operand = dropDuplicates(valueToKey);
const events = new Array<number>();
instream.compose(operand).subscribe({
next: (value) => events.push(value),
complete: () => {
expect(events).toEqual([1, 2000, 3.14]);
done();
},
});
});
it("works for empty string keys", (done) => {
interface Name {
readonly first: string;
readonly last: string;
}
const instream = Stream.fromArray<Name>([
{ first: "Daria", last: "" },
{ first: "Sam", last: "" },
{ first: "Regina", last: "Mustermann" },
{ first: "Max", last: "Mustermann" },
]);
const operand = dropDuplicates((value: Name) => value.last);
const events = new Array<Name>();
instream.compose(operand).subscribe({
next: (value) => events.push(value),
complete: () => {
expect(events).toEqual([
{ first: "Daria", last: "" },
{ first: "Regina", last: "Mustermann" },
]);
done();
},
});
});
});

View File

@ -0,0 +1,38 @@
import { Stream } from "xstream";
/**
* The type that fits into Stream.compose() with input stream and output stream
* of the same type.
*/
export type SameTypeStreamOperator<T> = (ins: Stream<T>) => Stream<T>;
/**
* Drops duplicate values in a stream.
*
* Marble diagram:
*
* ```text
* -1-1-1-2-4-3-3-4--
* dropDuplicates
* -1-----2-4-3------
* ```
*
* Each value must be uniquely identified by a string given by
* valueToKey(value).
*
* Internally this maintains a set of keys that have been processed already,
* i.e. memory consumption and Set lookup times should be considered when
* using this function.
*/
export function dropDuplicates<T>(valueToKey: (x: T) => string): SameTypeStreamOperator<T> {
const operand: SameTypeStreamOperator<T> = (instream: Stream<T>): Stream<T> => {
const emittedKeys = new Set<string>();
const deduplicatedStream = instream
.filter((value) => !emittedKeys.has(valueToKey(value)))
.debug((value) => emittedKeys.add(valueToKey(value)));
return deduplicatedStream;
};
return operand;
}

View File

@ -0,0 +1,6 @@
export { DefaultValueProducer, DefaultValueProducerCallsbacks } from "./defaultvalueproducer";
export { concat } from "./concat";
export { dropDuplicates, SameTypeStreamOperator } from "./dropduplicates";
export { firstEvent, fromListPromise, toListPromise } from "./promise";
export * from "./reducer";
export { ValueAndUpdates } from "./valueandupdates";

View File

@ -0,0 +1,145 @@
// tslint:disable:readonly-array
import { Producer, Stream } from "xstream";
import { firstEvent, fromListPromise, toListPromise } from "./promise";
import { asArray, countStream } from "./reducer";
async function oneTickLater(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 0));
}
describe("promise", () => {
describe("fromListPromise", () => {
it("sends many values on a stream", async () => {
// create a promise that will resolve to an array of strings
const input = ["a", "fd", "fvss", "gs"];
const prom = Promise.resolve(input);
const stream = fromListPromise(prom);
// materialize stream into a counter, and wait for stream to complete
const counter = countStream(stream);
await counter.finished();
expect(counter.value()).toEqual(input.length);
});
it("works for iterables like Uint8Array", async () => {
const inputPromise = Promise.resolve(new Uint8Array([0x00, 0x11, 0x22]));
const stream = fromListPromise(inputPromise);
const reader = asArray<number>(stream);
await reader.finished();
expect(reader.value()).toEqual([0x00, 0x11, 0x22]);
});
it("works for delayed resolution", async () => {
const inputPromise = new Promise<number[]>((resolve) => {
// resolve after 50 ms
setTimeout(() => resolve([1, 2, 3]), 50);
});
const stream = fromListPromise(inputPromise);
const reader = asArray<number>(stream);
await reader.finished();
expect(reader.value()).toEqual([1, 2, 3]);
});
it("sends proper values", async () => {
const input = ["let", "us", "say", "something"];
const prom = Promise.resolve(input);
const stream = fromListPromise(prom);
// materialize stream into an array, and wait for stream to complete
const read = asArray<string>(stream);
await read.finished();
expect(read.value()).toEqual(input);
});
});
describe("toListPromise", () => {
it("works for simple stream with more events than count", async () => {
const events = await toListPromise(Stream.fromArray([1, 6, 92, 2, 9]), 3);
expect(events).toEqual([1, 6, 92]);
});
it("works for simple stream with equal events and count", async () => {
const events = await toListPromise(Stream.fromArray([1, 6, 92, 2, 9]), 5);
expect(events).toEqual([1, 6, 92, 2, 9]);
});
it("works for simple stream with zero count", async () => {
const events = await toListPromise(Stream.fromArray([1, 6, 92, 2, 9]), 0);
expect(events).toEqual([]);
});
it("works for empty stream with zero count", async () => {
const events = await toListPromise(Stream.fromArray([]), 0);
expect(events).toEqual([]);
});
it("rejects for simple stream with less events than count", async () => {
await toListPromise(Stream.fromArray([1, 6, 92]), 5)
.then(() => fail("must not resolve"))
.catch((error) => expect(error).toMatch(/stream completed before all events could be collected/i));
});
it("works for async stream", async () => {
let interval: NodeJS.Timeout;
let producerRunning = false;
const producer: Producer<number> = {
start: (listener) => {
producerRunning = true;
let nextElement = 0;
interval = setInterval(() => {
listener.next(nextElement++);
}, 20);
},
stop: () => {
clearInterval(interval);
producerRunning = false;
},
};
const events = await toListPromise(Stream.create(producer), 3);
expect(events).toEqual([0, 1, 2]);
await oneTickLater();
expect(producerRunning).toEqual(false);
});
});
describe("firstEvent", () => {
it("works for simple stream with more events than count", async () => {
const event = await firstEvent(Stream.fromArray([1, 6, 92, 2, 9]));
expect(event).toEqual(1);
});
it("rejects for stream with no events", async () => {
await firstEvent(Stream.fromArray([]))
.then(() => fail("must not resolve"))
.catch((error) => expect(error).toMatch(/stream completed before all events could be collected/i));
});
it("works for async stream", async () => {
let interval: NodeJS.Timeout;
let producerRunning = false;
const producer: Producer<number> = {
start: (listener) => {
producerRunning = true;
let nextElement = 0;
interval = setInterval(() => {
listener.next(nextElement++);
}, 20);
},
stop: () => {
clearInterval(interval);
producerRunning = false;
},
};
const event = await firstEvent(Stream.create(producer));
expect(event).toEqual(0);
await oneTickLater();
expect(producerRunning).toEqual(false);
});
});
});

View File

@ -0,0 +1,67 @@
import { Producer, Stream } from "xstream";
/**
* Emits one event for each list element as soon as the promise resolves
*/
export function fromListPromise<T>(promise: Promise<Iterable<T>>): Stream<T> {
const producer: Producer<T> = {
start: (listener) => {
// the code in `start` runs as soon as anyone listens to the stream
promise
.then((iterable) => {
for (const element of iterable) {
listener.next(element);
}
listener.complete();
})
.catch((error) => listener.error(error));
},
// eslint-disable-next-line @typescript-eslint/no-empty-function
stop: () => {},
};
return Stream.create(producer);
}
/**
* Listens to stream and collects events. When `count` events are collected,
* the promise resolves with an array of events.
*
* Rejects if stream completes before `count` events are collected.
*/
export async function toListPromise<T>(stream: Stream<T>, count: number): Promise<readonly T[]> {
return new Promise<readonly T[]>((resolve, reject) => {
if (count === 0) {
resolve([]);
return;
}
const events = new Array<T>();
// take() unsubscribes from source stream automatically
stream.take(count).subscribe({
next: (event) => {
events.push(event);
if (events.length === count) {
resolve(events);
}
},
complete: () => {
reject(
`Stream completed before all events could be collected. ` +
`Collected ${events.length}, expected ${count}`,
);
},
error: (error) => reject(error),
});
});
}
/**
* Listens to stream, collects one event and revolves.
*
* Rejects if stream completes before one event was fired.
*/
export async function firstEvent<T>(stream: Stream<T>): Promise<T> {
return (await toListPromise(stream, 1))[0];
}

View File

@ -0,0 +1,56 @@
// tslint:disable:readonly-array
import { Stream } from "xstream";
import { asArray, countStream, lastValue } from "./reducer";
describe("Test stream helpers", () => {
it("readIntoArray returns input", async () => {
const input = [1, 6, 92, 2, 9];
const stream = Stream.fromArray(input);
const result = asArray<number>(stream);
await result.finished();
expect(result.value()).toEqual(input);
// also handle empty properly
const result2 = asArray<number>(Stream.empty());
await result2.finished();
expect(result2.value()).toEqual([]);
});
it("countStream returns number of items", async () => {
const input = ["abc", "123", "def", "superstar!", "is"];
const stream = Stream.fromArray(input);
const result = countStream(stream);
await result.finished();
expect(result.value()).toEqual(input.length);
// also handle empty properly
const result2 = countStream(Stream.empty());
await result2.finished();
expect(result2.value()).toEqual(0);
});
it("lastValue returns input", async () => {
const input = ["Some", "people", "say", "there", "is", "something"];
const stream = Stream.fromArray(input);
const result = lastValue<string>(stream);
await result.finished();
expect(result.value()).toEqual("something");
// also handle empty properly (undefined)
const result2 = lastValue<number>(Stream.empty());
await result2.finished();
expect(result2.value()).toBeUndefined();
});
it("Reducer.finished throws error on stream error", async () => {
const stream = Stream.throw("error");
try {
const result = asArray<number>(stream);
await result.finished();
fail("This should have thrown an error");
} catch (err) {
expect(err).toEqual("error");
}
});
});

View File

@ -0,0 +1,87 @@
/*
This file maintains some stream helpers used in iov-bns, but which
may be useful other places, and should consider to be moved.
Reducer and related code works to maintain a current state
materialized by reducing all events on a stream. Similar
to ValueAndUpdate in keycontrol, but more general.
*/
// tslint:disable:readonly-keyword
// tslint:disable:no-object-mutation
import { Stream } from "xstream";
export type ReducerFunc<T, U> = (acc: U, evt: T) => U;
// Reducer takes a stream of events T and a ReducerFunc, that
// materializes a state of type U.
export class Reducer<T, U> {
private readonly stream: Stream<T>;
private readonly reducer: ReducerFunc<T, U>;
private state: U;
// completed maintains state of stream, resolves/rejects
// on complete or error
private readonly completed: Promise<void>;
public constructor(stream: Stream<T>, reducer: ReducerFunc<T, U>, initState: U) {
this.stream = stream;
this.reducer = reducer;
this.state = initState;
this.completed = new Promise<void>((resolve, reject) => {
const subscription = this.stream.subscribe({
next: (evt: T) => {
this.state = this.reducer(this.state, evt);
},
complete: () => {
resolve();
// this must happen after resolve, to ensure stream.subscribe() has finished
subscription.unsubscribe();
},
error: (err: any) => {
reject(err);
// the stream already closed on error, but unsubscribe to be safe
subscription.unsubscribe();
},
});
});
}
// value returns current materialized state
public value(): U {
return this.state;
}
// finished resolves on completed stream, rejects on stream error
public async finished(): Promise<void> {
return this.completed;
}
}
function increment<T>(sum: number, _: T): number {
return sum + 1;
}
// countStream returns a reducer that contains current count
// of events on the stream
export function countStream<T>(stream: Stream<T>): Reducer<T, number> {
return new Reducer(stream, increment, 0);
}
function append<T>(list: readonly T[], evt: T): readonly T[] {
return [...list, evt];
}
// asArray maintains an array containing all events that have
// occurred on the stream
export function asArray<T>(stream: Stream<T>): Reducer<T, readonly T[]> {
return new Reducer<T, readonly T[]>(stream, append, []);
}
function last<T>(_: any, event: T): T {
return event;
}
// lastValue returns the last value read from the stream, or undefined if no values sent
export function lastValue<T>(stream: Stream<T>): Reducer<T, T | undefined> {
return new Reducer<T, T | undefined>(stream, last, undefined);
}

View File

@ -0,0 +1,211 @@
import { Listener } from "xstream";
import { DefaultValueProducer } from "./defaultvalueproducer";
import { ValueAndUpdates } from "./valueandupdates";
describe("ValueAndUpdates", () => {
it("can be constructed", () => {
{
const vau = new ValueAndUpdates(new DefaultValueProducer("a"));
expect(vau).toBeTruthy();
}
{
const vau = new ValueAndUpdates(new DefaultValueProducer(123));
expect(vau).toBeTruthy();
}
{
const vau = new ValueAndUpdates(new DefaultValueProducer(true));
expect(vau).toBeTruthy();
}
{
const vau = new ValueAndUpdates(new DefaultValueProducer(null));
expect(vau).toBeTruthy();
}
});
it("contains initial value", () => {
{
const vau = new ValueAndUpdates(new DefaultValueProducer("a"));
expect(vau.value).toEqual("a");
}
{
const vau = new ValueAndUpdates(new DefaultValueProducer(123));
expect(vau.value).toEqual(123);
}
{
const vau = new ValueAndUpdates(new DefaultValueProducer(true));
expect(vau.value).toEqual(true);
}
{
const vau = new ValueAndUpdates(new DefaultValueProducer(null));
expect(vau.value).toEqual(null);
}
});
it("can be updated", () => {
{
const producer = new DefaultValueProducer("a");
const vau = new ValueAndUpdates(producer);
expect(vau.value).toEqual("a");
producer.update("b");
expect(vau.value).toEqual("b");
}
{
const producer = new DefaultValueProducer(123);
const vau = new ValueAndUpdates(producer);
expect(vau.value).toEqual(123);
producer.update(44);
expect(vau.value).toEqual(44);
}
{
const producer = new DefaultValueProducer(true);
const vau = new ValueAndUpdates(producer);
expect(vau.value).toEqual(true);
producer.update(false);
expect(vau.value).toEqual(false);
}
{
const producer = new DefaultValueProducer(null);
const vau = new ValueAndUpdates(producer);
expect(vau.value).toEqual(null);
producer.update(null);
expect(vau.value).toEqual(null);
}
});
it("emits initial value to new listeners", (done) => {
const vau = new ValueAndUpdates(new DefaultValueProducer(123));
const listener2: Listener<number> = {
next: (value) => {
expect(value).toEqual(123);
done();
},
complete: () => done.fail(".updates stream must not complete"),
error: (e) => done.fail(e),
};
const listener1: Listener<number> = {
next: (value) => {
expect(value).toEqual(123);
vau.updates.addListener(listener2);
},
complete: () => done.fail(".updates stream must not complete"),
error: (e) => done.fail(e),
};
vau.updates.addListener(listener1);
});
it("emits current value to new listeners", (done) => {
const producer = new DefaultValueProducer(123);
const vau = new ValueAndUpdates(producer);
producer.update(99);
const listener2: Listener<number> = {
next: (value) => {
expect(value).toEqual(99);
done();
},
complete: () => done.fail(".updates stream must not complete"),
error: (e) => done.fail(e),
};
const listener1: Listener<number> = {
next: (value) => {
expect(value).toEqual(99);
vau.updates.addListener(listener2);
},
complete: () => done.fail(".updates stream must not complete"),
error: (e) => done.fail(e),
};
vau.updates.addListener(listener1);
});
it("emits updates to listener", (done) => {
const producer = new DefaultValueProducer(11);
const vau = new ValueAndUpdates(producer);
let eventsCount = 0;
const emittedValues = new Array<number>();
vau.updates.addListener({
next: (value) => {
eventsCount++;
emittedValues.push(value);
if (eventsCount === 4) {
expect(emittedValues).toEqual([11, 22, 33, 44]);
done();
}
},
complete: () => done.fail(".updates stream must not complete"),
error: (e) => done.fail(e),
});
setTimeout(() => producer.update(22), 10);
setTimeout(() => producer.update(33), 20);
setTimeout(() => producer.update(44), 30);
});
it("can wait for value", async () => {
const producer = new DefaultValueProducer(11);
const vau = new ValueAndUpdates(producer);
setTimeout(() => producer.update(22), 10);
setTimeout(() => producer.update(33), 20);
setTimeout(() => producer.update(44), 30);
await vau.waitFor(33);
expect(vau.value).toEqual(33);
await vau.waitFor(44);
expect(vau.value).toEqual(44);
});
it("can wait for search function to return true", async () => {
const producer = new DefaultValueProducer(11);
const vau = new ValueAndUpdates(producer);
setTimeout(() => producer.update(22), 10);
setTimeout(() => producer.update(33), 20);
setTimeout(() => producer.update(44), 30);
await vau.waitFor((v) => v > 30);
expect(vau.value).toEqual(33);
await vau.waitFor((v) => v > 40);
expect(vau.value).toEqual(44);
});
it("gets the correct return value in waitFor", async () => {
const producer = new DefaultValueProducer(11);
const vau = new ValueAndUpdates(producer);
setTimeout(() => producer.update(22), 10);
setTimeout(() => producer.update(33), 20);
{
const result = await vau.waitFor(22);
expect(result).toEqual(22);
}
{
const result = await vau.waitFor((v) => v > 30);
expect(result).toEqual(33);
}
});
describe("waitFor", () => {
it("propagates error from stream", async () => {
const producer = new DefaultValueProducer(1);
const vau = new ValueAndUpdates(producer);
setTimeout(() => producer.error(new Error("something went wrong")), 10);
await vau.waitFor(3).then(
() => fail("must not resolve"),
(error) => expect(error).toMatch(/something went wrong/),
);
});
});
});

View File

@ -0,0 +1,57 @@
import { MemoryStream } from "xstream";
import { DefaultValueProducer } from "./defaultvalueproducer";
export type SearchFunction<T> = (value: T) => boolean;
/**
* A read only wrapper around DefaultValueProducer that allows
* to synchronously get the current value using the .value property
* and listen to to updates by suscribing to the .updates stream
*/
export class ValueAndUpdates<T> {
public readonly updates: MemoryStream<T>;
public get value(): T {
return this.producer.value;
}
private readonly producer: DefaultValueProducer<T>;
public constructor(producer: DefaultValueProducer<T>) {
this.producer = producer;
this.updates = MemoryStream.createWithMemory(this.producer);
}
/**
* Resolves as soon as search value is found.
*
* @param search either a value or a function that must return true when found
* @returns the value of the update that caused the search match
*/
public async waitFor(search: SearchFunction<T> | T): Promise<T> {
const searchImplementation: SearchFunction<T> =
typeof search === "function" ? (search as SearchFunction<T>) : (value: T): boolean => value === search;
return new Promise((resolve, reject) => {
const subscription = this.updates.subscribe({
next: (newValue) => {
if (searchImplementation(newValue)) {
resolve(newValue);
// MemoryStream.subscribe() calls next with the last value.
// Make async to ensure the subscription exists
setTimeout(() => subscription.unsubscribe(), 0);
}
},
complete: () => {
subscription.unsubscribe();
reject("Update stream completed without expected value");
},
error: (error) => {
reject(error);
},
});
});
}
}

View File

@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"baseUrl": ".",
"outDir": "build",
"declarationDir": "build/types",
"rootDir": "src"
},
"include": [
"src/**/*"
]
}

View File

@ -0,0 +1,3 @@
{
"extends": "../../tslint.json"
}

View File

@ -0,0 +1,14 @@
const packageJson = require("./package.json");
module.exports = {
src: ["./src"],
out: "docs",
exclude: "**/*.spec.ts",
target: "es6",
name: `${packageJson.name} Documentation`,
readme: "README.md",
mode: "file",
excludeExternals: true,
excludeNotExported: true,
excludePrivate: true,
};

19
packages/stream/types/concat.d.ts vendored Normal file
View File

@ -0,0 +1,19 @@
import { Stream } from "xstream";
/**
* An implementation of concat that buffers all source stream events
*
* Marble diagram:
*
* ```text
* --1--2---3---4-|
* -a--b-c--d-|
* --------X---------Y---------Z-
* concat
* --1--2---3---4-abcdXY-------Z-
* ```
*
* This is inspired by RxJS's concat as documented at http://rxmarbles.com/#concat and behaves
* differently than xstream's concat as discussed in https://github.com/staltz/xstream/issues/170.
*
*/
export declare function concat<T>(...streams: Stream<T>[]): Stream<T>;

View File

@ -0,0 +1,31 @@
import { Listener, Producer } from "xstream";
export interface DefaultValueProducerCallsbacks {
readonly onStarted: () => void;
readonly onStop: () => void;
}
export declare class DefaultValueProducer<T> implements Producer<T> {
get value(): T;
private readonly callbacks;
private internalValue;
private listener;
constructor(value: T, callbacks?: DefaultValueProducerCallsbacks);
/**
* Update the current value.
*
* If producer is active (i.e. someone is listening), this emits an event.
* If not, just the current value is updated.
*/
update(value: T): void;
/**
* Produce an error
*/
error(error: any): void;
/**
* Called by the stream. Do not call this directly.
*/
start(listener: Listener<T>): void;
/**
* Called by the stream. Do not call this directly.
*/
stop(): void;
}

View File

@ -0,0 +1,25 @@
import { Stream } from "xstream";
/**
* The type that fits into Stream.compose() with input stream and output stream
* of the same type.
*/
export declare type SameTypeStreamOperator<T> = (ins: Stream<T>) => Stream<T>;
/**
* Drops duplicate values in a stream.
*
* Marble diagram:
*
* ```text
* -1-1-1-2-4-3-3-4--
* dropDuplicates
* -1-----2-4-3------
* ```
*
* Each value must be uniquely identified by a string given by
* valueToKey(value).
*
* Internally this maintains a set of keys that have been processed already,
* i.e. memory consumption and Set lookup times should be considered when
* using this function.
*/
export declare function dropDuplicates<T>(valueToKey: (x: T) => string): SameTypeStreamOperator<T>;

6
packages/stream/types/index.d.ts vendored Normal file
View File

@ -0,0 +1,6 @@
export { DefaultValueProducer, DefaultValueProducerCallsbacks } from "./defaultvalueproducer";
export { concat } from "./concat";
export { dropDuplicates, SameTypeStreamOperator } from "./dropduplicates";
export { firstEvent, fromListPromise, toListPromise } from "./promise";
export * from "./reducer";
export { ValueAndUpdates } from "./valueandupdates";

18
packages/stream/types/promise.d.ts vendored Normal file
View File

@ -0,0 +1,18 @@
import { Stream } from "xstream";
/**
* Emits one event for each list element as soon as the promise resolves
*/
export declare function fromListPromise<T>(promise: Promise<Iterable<T>>): Stream<T>;
/**
* Listens to stream and collects events. When `count` events are collected,
* the promise resolves with an array of events.
*
* Rejects if stream completes before `count` events are collected.
*/
export declare function toListPromise<T>(stream: Stream<T>, count: number): Promise<readonly T[]>;
/**
* Listens to stream, collects one event and revolves.
*
* Rejects if stream completes before one event was fired.
*/
export declare function firstEvent<T>(stream: Stream<T>): Promise<T>;

14
packages/stream/types/reducer.d.ts vendored Normal file
View File

@ -0,0 +1,14 @@
import { Stream } from "xstream";
export declare type ReducerFunc<T, U> = (acc: U, evt: T) => U;
export declare class Reducer<T, U> {
private readonly stream;
private readonly reducer;
private state;
private readonly completed;
constructor(stream: Stream<T>, reducer: ReducerFunc<T, U>, initState: U);
value(): U;
finished(): Promise<void>;
}
export declare function countStream<T>(stream: Stream<T>): Reducer<T, number>;
export declare function asArray<T>(stream: Stream<T>): Reducer<T, readonly T[]>;
export declare function lastValue<T>(stream: Stream<T>): Reducer<T, T | undefined>;

View File

@ -0,0 +1,21 @@
import { MemoryStream } from "xstream";
import { DefaultValueProducer } from "./defaultvalueproducer";
export declare type SearchFunction<T> = (value: T) => boolean;
/**
* A read only wrapper around DefaultValueProducer that allows
* to synchronously get the current value using the .value property
* and listen to to updates by suscribing to the .updates stream
*/
export declare class ValueAndUpdates<T> {
readonly updates: MemoryStream<T>;
get value(): T;
private readonly producer;
constructor(producer: DefaultValueProducer<T>);
/**
* Resolves as soon as search value is found.
*
* @param search either a value or a function that must return true when found
* @returns the value of the update that caused the search match
*/
waitFor(search: SearchFunction<T> | T): Promise<T>;
}

View File

@ -0,0 +1,17 @@
const glob = require("glob");
const path = require("path");
const target = "web";
const distdir = path.join(__dirname, "dist", "web");
module.exports = [
{
// bundle used for Karma tests
target: target,
entry: glob.sync("./build/**/*.spec.js"),
output: {
path: distdir,
filename: "tests.js",
},
},
];