mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-07 20:08:06 +00:00
32c78a907c
* Add support for setting gossipsub direct peers * Use string array for direct peers config * Handle empty nitro and consensus config * Increase consensus write retry attempts * Rename config option for watcher party peers file * Handle directPeers array being passed undefined * Upgrade package versions --------- Co-authored-by: Shreerang Kale <shreerangkale@gmail.com>
239 lines
7.1 KiB
TypeScript
239 lines
7.1 KiB
TypeScript
//
|
|
// Copyright 2023 Vulcanize, Inc.
|
|
//
|
|
|
|
import fs from 'fs';
|
|
import path from 'path';
|
|
import assert from 'assert';
|
|
import debug from 'debug';
|
|
import * as bunyan from 'bunyan';
|
|
import { Mokka } from 'mokka';
|
|
import * as MokkaStates from 'mokka/dist/consensus/constants/NodeStates';
|
|
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
|
|
import type { Pushable } from 'it-pushable';
|
|
|
|
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
|
|
import type { Peer } from '@cerc-io/peer';
|
|
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
|
|
import type { Stream as P2PStream } from '@libp2p/interface-connection';
|
|
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
|
|
import type { PeerId } from '@libp2p/interface-peer-id';
|
|
|
|
const LOG_NAMESPACE = 'laconic:consensus';
|
|
const CONSENSUS_LOG_LEVEL = 'debug';
|
|
const log = debug(LOG_NAMESPACE);
|
|
|
|
const CONSENSUS_PROTOCOL = '/consensus/1.0.0';
|
|
|
|
const NUM_WRITE_ATTEMPTS = 25;
|
|
const RETRY_SLEEP_DURATION = 15 * 1000; // 15 seconds
|
|
|
|
const DEFAULT_HEARTBEAT = 300;
|
|
const DEFAULT_ELECTION_TIMEOUT = 1000;
|
|
const DEFAULT_PROOF_EXPIRATION = 20000;
|
|
const DEFAULT_CRASH_MODEL = 'BFT';
|
|
|
|
const consensusStates: Record<number, string> = {
|
|
[MokkaStates.default.STOPPED]: 'STOPPED',
|
|
[MokkaStates.default.LEADER]: 'LEADER',
|
|
[MokkaStates.default.CANDIDATE]: 'CANDIDATE',
|
|
[MokkaStates.default.FOLLOWER]: 'FOLLOWER'
|
|
};
|
|
|
|
export interface PartyPeer {
|
|
peerId: string;
|
|
publicKey: string;
|
|
}
|
|
|
|
export interface ConsensusOptions {
|
|
peer: Peer;
|
|
|
|
publicKey: string;
|
|
privateKey: string;
|
|
|
|
partyPeers: PartyPeer[];
|
|
|
|
// For Mokka options (ISettingsInterface)
|
|
heartbeat?: number;
|
|
electionTimeout?: number;
|
|
proofExpiration?: number;
|
|
crashModel?: 'CFT' | 'BFT';
|
|
}
|
|
|
|
export class Consensus extends Mokka {
|
|
peer: Peer;
|
|
partyPeers: PartyPeer[];
|
|
|
|
private messageStreamMap: Map<string, Pushable<any>> = new Map();
|
|
|
|
constructor (options: ConsensusOptions) {
|
|
const heartbeat = options.heartbeat ?? DEFAULT_HEARTBEAT;
|
|
const electionTimeout = options.electionTimeout ?? DEFAULT_ELECTION_TIMEOUT;
|
|
const proofExpiration = options.proofExpiration ?? DEFAULT_PROOF_EXPIRATION;
|
|
const crashModel = options.crashModel ?? DEFAULT_CRASH_MODEL;
|
|
|
|
// address format: 'libp2p_peerid/node_publickey'
|
|
// See:
|
|
// https://github.com/ega-forever/mokka#new-mokka-options
|
|
// https://github.com/ega-forever/mokka/blob/master/src/consensus/models/NodeModel.ts#L46
|
|
const peerId = options.peer.peerId;
|
|
const address = `${peerId?.toString()}/${options.publicKey}`;
|
|
|
|
const logger = bunyan.createLogger({ name: LOG_NAMESPACE, level: CONSENSUS_LOG_LEVEL });
|
|
|
|
super({
|
|
address,
|
|
privateKey: options.privateKey,
|
|
heartbeat,
|
|
electionTimeout,
|
|
proofExpiration,
|
|
crashModel,
|
|
// TODO: Improve logging
|
|
logger
|
|
});
|
|
|
|
// Subscribe to state changes
|
|
this.on('state', () => {
|
|
log(`State changed to ${this.state} (${consensusStates[this.state]}) with term ${this.term}`);
|
|
});
|
|
|
|
this.peer = options.peer;
|
|
this.partyPeers = options.partyPeers;
|
|
|
|
// Add peer nodes in the party
|
|
// TODO: Skip initialization if party not provided?
|
|
for (const partyPeer of options.partyPeers) {
|
|
const address = `${partyPeer.peerId}/${partyPeer.publicKey}`;
|
|
this.nodeApi.join(address);
|
|
}
|
|
}
|
|
|
|
isLeader (): boolean {
|
|
return this.state === MokkaStates.default.LEADER;
|
|
}
|
|
|
|
initialize (): void {
|
|
assert(this.peer.node);
|
|
|
|
this.peer.node.handle(CONSENSUS_PROTOCOL, async ({ stream, connection }) => {
|
|
// Setup send and receive pipes
|
|
this.handleStream(connection.remotePeer, stream);
|
|
});
|
|
}
|
|
|
|
connect (): void {
|
|
this.initialize();
|
|
|
|
// TODO: Handle errors when peers don't join
|
|
super.connect();
|
|
}
|
|
|
|
async write (address: string, packet: Buffer): Promise<void> {
|
|
assert(this.peer.node);
|
|
|
|
// TODO: Use a different strategy for retries?
|
|
for (let i = 0; i < NUM_WRITE_ATTEMPTS; i += 1) {
|
|
try {
|
|
let messageStream = this.messageStreamMap.get(address);
|
|
if (!messageStream) {
|
|
const { peerIdFromString } = await import('@libp2p/peer-id');
|
|
const peerId = peerIdFromString(address);
|
|
|
|
// Dial to the peer over consensus protocol
|
|
const p2pStream = await this.peer.node.dialProtocol(peerId, CONSENSUS_PROTOCOL);
|
|
|
|
// Setup send and receive pipes
|
|
messageStream = await this.handleStream(peerId, p2pStream);
|
|
}
|
|
|
|
messageStream.push(packet);
|
|
return;
|
|
} catch (err) {
|
|
log(`Attempt ${i} - Could not open consensus stream to ${address}: ${err}`);
|
|
if (i === NUM_WRITE_ATTEMPTS) {
|
|
log('Write attempts exhausted');
|
|
}
|
|
|
|
// Retry after a set interval
|
|
await new Promise((resolve) => { setTimeout(resolve, RETRY_SLEEP_DURATION); });
|
|
}
|
|
}
|
|
}
|
|
|
|
async disconnect (): Promise<void> {
|
|
assert(this.peer.node);
|
|
await super.disconnect();
|
|
|
|
const { peerIdFromString } = await import('@libp2p/peer-id');
|
|
|
|
// Close all consensus protocol streams
|
|
for (const partyPeer of this.partyPeers) {
|
|
for (const conn of this.peer.node.getConnections(peerIdFromString(partyPeer.peerId))) {
|
|
conn.streams.forEach(stream => {
|
|
if (stream.stat.protocol === CONSENSUS_PROTOCOL) {
|
|
stream.close();
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
private async handleStream (peerId: PeerId, stream: P2PStream): Promise<Pushable<any>> {
|
|
const { pushable } = await import('it-pushable');
|
|
const { pipe } = await import('it-pipe');
|
|
const lp = await import('it-length-prefixed');
|
|
|
|
const messageStream = pushable({});
|
|
|
|
try {
|
|
// Send message to stream
|
|
pipe(
|
|
// Read from messageStream (the source)
|
|
messageStream,
|
|
// Encode with length prefix (so receiving side knows how much data is coming)
|
|
lp.encode(),
|
|
// Write to the stream (the sink)
|
|
stream.sink
|
|
);
|
|
} catch (err) {
|
|
// TODO: Implement retries / handle breakage
|
|
log(`Could not send consensus message to ${peerId.toString()}: ${err}`);
|
|
}
|
|
|
|
try {
|
|
// Handle message from stream
|
|
pipe(
|
|
// Read from the stream (the source)
|
|
stream.source,
|
|
// Decode length-prefixed data
|
|
lp.decode(),
|
|
// Sink function
|
|
async (source) => {
|
|
// For each chunk of data
|
|
for await (const msg of source) {
|
|
await this.emitPacket(Buffer.from(msg.subarray()));
|
|
}
|
|
}
|
|
);
|
|
} catch (err) {
|
|
log(`Error on consensus message from ${peerId.toString()}: ${err}`);
|
|
}
|
|
|
|
this.messageStreamMap.set(peerId.toString(), messageStream);
|
|
return messageStream;
|
|
}
|
|
}
|
|
|
|
export const readParty = (filePath: string): PartyPeer[] => {
|
|
if (!filePath || filePath === '') {
|
|
log('Party peers file path not provided');
|
|
return [];
|
|
}
|
|
|
|
const partyFilePath = path.resolve(filePath);
|
|
log(`Reading party peers from file ${partyFilePath}`);
|
|
|
|
const partyJson = fs.readFileSync(partyFilePath, 'utf-8');
|
|
return JSON.parse(partyJson);
|
|
};
|