Im trying since a couple of weeks to create a chain of WebSockets clients, to form a middleware pattern. Like in express/connect where you call `app.use(() => {...})` but with WebSocket clients.
Illustration about the data/events flow:
data-flow.jpg
The flow chart of the WebSocket server, respectively the dispatcher looks like something like this:
draw.io schema
The problem is, as soon as i connect a second WebSocket client, the second client receives only "pong" events which is wrong. It should received as seen on the image "ping" events.
What gain more complexity is, that even each client can trigger the "local hook chain" of the other connected clients. But i didnt get that far.
What I have tried:
index.js:
const { PassThrough, Transform, Duplex } = require("stream");
const WebSocket = require('ws');
const dispatcher = require("./dispatcher");
const wss = new WebSocket.Server({
port: 8080
});
const streams = new Set();
const link = new Map();
const input = new PassThrough();
const output = new PassThrough();
const trans = new Transform({
transform(chunk, encoding, cb) {
chunk = chunk.toString("utf8");
chunk = JSON.parse(chunk);
chunk.type = "pong";
cb(null, JSON.stringify(chunk));
}
});
const duplex = Duplex.from({
readable: output,
writable: input
});
function createPipeline() {
input.unpipe();
output.unpipe();
link.forEach((value, key) => {
key.unpipe();
value.unpipe();
});
Array.from(streams).forEach((stream) => {
stream.unpipe();
});
if (wss.clients.size <= 0) {
console.log("Link local pipe, no clients to pipe through");
let writer = dispatcher(trans, true);
input.pipe(writer).pipe(output);
link.set(input, writer);
link.set(writer, output);
} else {
console.log("Create pipline with participant", streams.size);
let clients = Array.from(streams);
input.unpipe();
output.unpipe();
clients.reduce((prev, cur, i, arr) => {
cur.index = i;
let disp = dispatcher(cur, i + 1 === arr.length);
prev.pipe(disp);
link.set(prev, disp);
return disp;
}, input).pipe(output);
}
}
createPipeline();
wss.on('connection', (ws) => {
let stream = WebSocket.createWebSocketStream(ws);
streams.add(stream);
ws.on("close", () => {
console.log("Stream disconnected");
stream.unpipe();
stream.end();
stream.destroy();
streams.delete(stream);
createPipeline();
});
createPipeline();
});
let interval = null;
function loop() {
function triggerWsHooks(cb) {
let msg = {
timestamp: Date.now(),
component: "test",
method: "add",
hooks: "pre",
args: ["input", { bar: "baz" }]
};
let listener = (data) => {
data = JSON.parse(data.toString());
if (data.type === "pong") {
cb(null, data.args)
}
duplex.removeListener("data", listener);
};
let more = duplex.write(JSON.stringify(msg), (err) => {
if (err) {
console.log("2) Not flushed, stop call, clearInterval");
clearInterval(interval);
console.error(err)
} else {
duplex.once("data", listener);
}
});
if (!more) {
duplex.once('drain', () => {
console.log("Drained")
loop();
});
}
}
triggerWsHooks((err, data) => {
console.log(err || "Chain done", data)
setTimeout(loop, 1000);
});
}
loop();
dispatcher.js:
const { Transform, pipeline, Duplex } = require("stream");
const { randomUUID } = require("crypto");
const _timeout = require("./timeout.js");
let queue = new Set();
module.exports = (stream, last = false) => {
stream.unpipe();
console.log("Dispatcher created, for stream index: %s, last:", stream["index"], last)
let bypass = null;
let output = null;
let input = null;
let caller = null;
input = new Transform({
transform(chunk, encoding, cb) {
chunk = chunk.toString("utf8");
chunk = JSON.parse(chunk);
console.log("[input] RAW input:", chunk, queue);
debugger;
caller = _timeout(1000 * 1, (timedout, duration, [chunk, cb]) => {
if (timedout) {
debugger;
console.log("> Timedout", duration, stream.index)
if (last) {
bypass.type = "pong";
}
output.push(JSON.stringify(bypass));
bypass = null;
} else {
cb(null, JSON.stringify(chunk));
}
});
(() => {
if (!chunk.uuid) {
chunk.uuid = randomUUID();
queue.add(chunk.uuid);
}
debugger;
if (!chunk.type) {
chunk.type = "ping";
} else {
debugger;
if (chunk.type === "ping") {
} else if (chunk.type === "pong") {
debugger;
if (queue.has(chunk.uuid)) {
if (last) {
console.log("[input] remove uuid: %s, index: %d", chunk.uuid, stream.index);
queue.delete(chunk.uuid);
} else {
chunk.type = "ping";
}
} else {
console.log("[input] invalid uuid for type pong received!", queue);
}
debugger;
} else {
}
}
})();
debugger;
bypass = chunk;
cb(null, JSON.stringify(chunk));
}
});
output = new Transform({
transform(chunk, encoding, cb) {
chunk = chunk.toString("utf8");
chunk = JSON.parse(chunk);
(() => {
console.log("[output] RAW input:", chunk, queue);
debugger;
if (chunk.type == "ping") {
if (queue.has(chunk.uuid)) {
console.log("[output] invalid type received, not changed?!");
stream.end();
} else {
console.log("proceed local hooks");
}
} else if (chunk.type === "pong") {
if (queue.has(chunk.uuid)) {
if (last) {
console.log("[output] delete uuid out of queue, index: %d,", stream.index, chunk.uuid);
}
caller(chunk, cb);
} else {
console.log("[output] invalid uuid, to slow answered?!");
}
} else {
console.log("[output] invalid type received: %s", chunk.type)
}
})();
}
});
input.pipe(stream).pipe(output);
return Duplex.from({
writable: input,
readable: output
});
};
timeout.js:
function timeout(time, cb) {
let called = false;
let start = Date.now();
let timer = setTimeout(() => {
if (!called) {
called = true;
cb(true, Date.now() - start, []);
}
}, time);
return (...args) => {
clearTimeout(timer);
timer = null;
if (!called) {
called = true;
cb(false, Date.now() - start, [...args]);
}
};
}
module.exports = timeout;
The clients are very simple & modify only the message they receive and send it then back to the server.
client-1.js:
const { Transform } = require("stream");
const WebSocket = require('ws');
const ws = new WebSocket('ws://127.0.0.1:8080');
const duplex = WebSocket.createWebSocketStream(ws);
const transform = new Transform({
transform(chunk, enc, cb) {
chunk = chunk.toString("utf8");
chunk = JSON.parse(chunk);
if (chunk.type === "ping") {
console.log(chunk)
chunk.args[0] = "ws";
chunk.args[1] = Object.assign(chunk.args[1], {
websocket: true
})
chunk.type = "pong";
console.log("Cb called, send back")
cb(null, JSON.stringify(chunk));
} else {
console.log("invalid type pong", chunk);
cb(null, JSON.stringify(chunk));
}
}
});
duplex.pipe(transform).pipe(duplex);
client-2.js:
const { Transform } = require("stream");
const WebSocket = require('ws');
const ws = new WebSocket('ws://127.0.0.1:8080');
const duplex = WebSocket.createWebSocketStream(ws);
const transform = new Transform({
transform(chunk, enc, cb) {
chunk = chunk.toString("utf8");
chunk = JSON.parse(chunk);
if (chunk.type === "ping") {
console.log(chunk)
chunk.args[1] = Object.assign(chunk.args[1], {
override: "yes",
arr: [0, 1, 2, 3]
});
chunk.type = "pong";
console.log("Cb called, send back")
cb(null, JSON.stringify(chunk));
} else {
console.log("invalid type pong", chunk);
cb(null, JSON.stringify(chunk));
}
}
});
duplex.pipe(transform).pipe(duplex);
It works fine with one client connected, or no client.
But as soon as the second or third clients are connecting, they only receive pong events.
To run the code, create files like i named them, and execute `node index.js`. The two clients run seperatly, `node client-1.js` & `node client-2.js`
Im sorry for the long post, but thats the smalest reproduceable exmaple i can provide.