Click here to Skip to main content
15,884,917 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more: , +
Im trying since a couple of weeks to create a chain of WebSockets clients, to form a middleware pattern. Like in express/connect where you call `app.use(() => {...})` but with WebSocket clients.

Illustration about the data/events flow: data-flow.jpg

The flow chart of the WebSocket server, respectively the dispatcher looks like something like this: draw.io schema



The problem is, as soon as i connect a second WebSocket client, the second client receives only "pong" events which is wrong. It should received as seen on the image "ping" events.

What gain more complexity is, that even each client can trigger the "local hook chain" of the other connected clients. But i didnt get that far.

What I have tried:

index.js:
JavaScript
const { PassThrough, Transform, Duplex } = require("stream");
const WebSocket = require('ws');
const dispatcher = require("./dispatcher");


const wss = new WebSocket.Server({
    port: 8080
});

const streams = new Set();
const link = new Map();

const input = new PassThrough();
const output = new PassThrough();
const trans = new Transform({
    transform(chunk, encoding, cb) {

        chunk = chunk.toString("utf8");
        chunk = JSON.parse(chunk);

        chunk.type = "pong";

        cb(null, JSON.stringify(chunk));

    }
});

const duplex = Duplex.from({
    readable: output,
    writable: input
});


function createPipeline() {

    input.unpipe();
    output.unpipe();

    // cleanup before creating new pipeline
    link.forEach((value, key) => {
        key.unpipe();
        value.unpipe();
    });

    Array.from(streams).forEach((stream) => {
        stream.unpipe();
    });

    if (wss.clients.size <= 0) {
        //if (false) {

        console.log("Link local pipe, no clients to pipe through");

        let writer = dispatcher(trans, true);
        input.pipe(writer).pipe(output);

        link.set(input, writer);
        link.set(writer, output);

        //} else if (streams.size == 2) {
    } else {

        console.log("Create pipline with participant", streams.size);

        let clients = Array.from(streams);

        input.unpipe();
        output.unpipe();


        clients.reduce((prev, cur, i, arr) => {

            cur.index = i;
            let disp = dispatcher(cur, i + 1 === arr.length);

            prev.pipe(disp);
            link.set(prev, disp);

            return disp;

        }, input).pipe(output);


        // for test purpose only, to check if "building the chain" is the problem
        //input.pipe(dispatcher(clients[0])).pipe(dispatcher(clients[1], true)).pipe(output);

    }

}

createPipeline();

wss.on('connection', (ws) => {

    let stream = WebSocket.createWebSocketStream(ws);

    streams.add(stream);

    ws.on("close", () => {

        console.log("Stream disconnected");

        stream.unpipe();
        stream.end();
        stream.destroy();

        streams.delete(stream);

        createPipeline();

    });


    createPipeline();

});


let interval = null;




function loop() {

    function triggerWsHooks(cb) {

        let msg = {
            timestamp: Date.now(),
            component: "test",
            method: "add",
            hooks: "pre",
            args: ["input", { bar: "baz" }]
        };

        let listener = (data) => {

            data = JSON.parse(data.toString());

            if (data.type === "pong") {
                cb(null, data.args)
            }

            duplex.removeListener("data", listener);

        };


        let more = duplex.write(JSON.stringify(msg), (err) => {
            if (err) {

                console.log("2) Not flushed, stop call, clearInterval");

                clearInterval(interval);
                console.error(err)

            } else {
                duplex.once("data", listener);
            }
        });


        if (!more) {
            duplex.once('drain', () => {

                console.log("Drained")

                loop();

            });
        }

    }


    triggerWsHooks((err, data) => {

        console.log(err || "Chain done", data)

        setTimeout(loop, 1000);

    });

}

loop();


dispatcher.js:
JavaScript
const { Transform, pipeline, Duplex } = require("stream");
const { randomUUID } = require("crypto");

const _timeout = require("./timeout.js");

// queue contains message id where we store uncompleted chains
// is a chain is completed it gets removed from the set
// a chain is completed when:
// last = true & msg.type = pong
let queue = new Set();

module.exports = (stream, last = false) => {

    stream.unpipe();

    //console.log("stream objects:", Object.getOwnPropertySymbols(stream))
    console.log("Dispatcher created, for stream index: %s, last:", stream["index"], last)

    let bypass = null;
    let output = null;
    let input = null;
    let caller = null;


    input = new Transform({
        transform(chunk, encoding, cb) {

            chunk = chunk.toString("utf8");
            chunk = JSON.parse(chunk);

            console.log("[input] RAW input:", chunk, queue);
            debugger;


            //console.log("queue size", queue.size)


            // create timeout for output stream
            // if caller is not called in x, push the origitinal input chunk
            // to the output chain & tell the stream: to f*** off, you are to slow
            caller = _timeout(1000 * 1, (timedout, duration, [chunk, cb]) => {
                if (timedout) {

                    debugger;

                    console.log("> Timedout", duration, stream.index)

                    // stream has to slow answered
                    // push original message to output chain
                    // set type=pong to "fake" client answer
                    if (last) {
                        bypass.type = "pong";
                    }

                    output.push(JSON.stringify(bypass));

                    // cleanup
                    bypass = null;

                } else {

                    // stream has answered in right time
                    // call transform callback on output stream
                    // with original chunk from middleware stream
                    cb(null, JSON.stringify(chunk));

                }
            });


            // dispatcher logic
            // if its not working
            // probably because of sh*t happens here...
            (() => {

                if (!chunk.uuid) {
                    chunk.uuid = randomUUID();
                    queue.add(chunk.uuid);
                }

                debugger;

                if (!chunk.type) {

                    chunk.type = "ping";

                } else {

                    debugger;

                    if (chunk.type === "ping") {
                        // invalid
                    } else if (chunk.type === "pong") {

                        debugger;

                        if (queue.has(chunk.uuid)) {

                            if (last) {
                                console.log("[input] remove uuid: %s, index: %d", chunk.uuid, stream.index);
                                queue.delete(chunk.uuid);
                            } else {
                                chunk.type = "ping";
                            }

                        } else {

                            // invalid
                            console.log("[input] invalid uuid for type pong received!", queue);

                        }

                        debugger;

                    } else {
                        // invalid
                    }

                }

            })();

            debugger;

            bypass = chunk;

            // pass to middleware stream
            cb(null, JSON.stringify(chunk));

        }
    });

    output = new Transform({
        transform(chunk, encoding, cb) {

            chunk = chunk.toString("utf8");
            chunk = JSON.parse(chunk);


            // dispatcher logic
            // if its not working
            // probably because of sh*t happens here...
            (() => {


                console.log("[output] RAW input:", chunk, queue);
                debugger;


                if (chunk.type == "ping") {
                    if (queue.has(chunk.uuid)) {

                        // invalid, type from clieitn not changed
                        console.log("[output] invalid type received, not changed?!");

                        //stream.emit("error", new Error("INVALID_TYPE_RECEIVED"));
                        //stream.emit("close")?!
                        // how to tell the webesocket client the reason for the closing?
                        // in combinantion with node.js streams: 
                        // https://github.com/websockets/ws/blob/HEAD/doc/ws.md#createwebsocketstreamwebsocket-options
                        stream.end();

                    } else {

                        // proceed local hooks
                        console.log("proceed local hooks");

                    }
                } else if (chunk.type === "pong") {

                    if (queue.has(chunk.uuid)) {

                        if (last) {
                            console.log("[output] delete uuid out of queue, index: %d,", stream.index, chunk.uuid);
                            //queue.delete(chunk.uuid);
                        }

                        caller(chunk, cb);

                    } else {
                        // invalid, answered to slow?
                        // perhaps timedout
                        console.log("[output] invalid uuid, to slow answered?!");
                    }

                } else {
                    // invalid
                    console.log("[output] invalid type received: %s", chunk.type)
                }


            })();


            //cb(null, JSON.stringify(chunk));
            //caller(chunk, cb);

        }
    });


    input.pipe(stream).pipe(output);


    return Duplex.from({
        writable: input,
        readable: output
    });

};


timeout.js:
JavaScript
/**
 * Wait n ms befor execute the callback function
 * @param {Number} time Time in ms to wait before calling the callback function
 * @param {function} cb Callback function
 * @returns {function} Function to call before timeout is reached/triggerd
 */
function timeout(time, cb) {

    let called = false;
    let start = Date.now();

    let timer = setTimeout(() => {
        if (!called) {

            called = true;
            cb(true, Date.now() - start, []);

        }
    }, time);


    return (...args) => {

        clearTimeout(timer);
        timer = null;

        if (!called) {

            called = true;
            cb(false, Date.now() - start, [...args]);

        }

    };

}

module.exports = timeout;


The clients are very simple & modify only the message they receive and send it then back to the server.

client-1.js:
JavaScript
const { Transform } = require("stream");
const WebSocket = require('ws');

const ws = new WebSocket('ws://127.0.0.1:8080');
const duplex = WebSocket.createWebSocketStream(ws);

const transform = new Transform({
    transform(chunk, enc, cb) {

        chunk = chunk.toString("utf8");
        chunk = JSON.parse(chunk);

        if (chunk.type === "ping") {

            console.log(chunk)

            chunk.args[0] = "ws";
            chunk.args[1] = Object.assign(chunk.args[1], {
                websocket: true
            })

            chunk.type = "pong";

            //setTimeout(() => {
            console.log("Cb called, send back")
            cb(null, JSON.stringify(chunk));
            //}, 3000);

        } else {

            console.log("invalid type pong", chunk);
            cb(null, JSON.stringify(chunk));

        }

    }
});

duplex.pipe(transform).pipe(duplex);


client-2.js:
JavaScript
const { Transform } = require("stream");
const WebSocket = require('ws');

const ws = new WebSocket('ws://127.0.0.1:8080');
const duplex = WebSocket.createWebSocketStream(ws);

const transform = new Transform({
    transform(chunk, enc, cb) {

        chunk = chunk.toString("utf8");
        chunk = JSON.parse(chunk);

        if (chunk.type === "ping") {

            console.log(chunk)

            chunk.args[1] = Object.assign(chunk.args[1], {
                override: "yes",
                arr: [0, 1, 2, 3]
            });

            chunk.type = "pong";

            //setTimeout(() => {
            console.log("Cb called, send back")
            cb(null, JSON.stringify(chunk));
            //}, 3000);

        } else {

            console.log("invalid type pong", chunk);
            cb(null, JSON.stringify(chunk));

        }

    }
});

duplex.pipe(transform).pipe(duplex);




It works fine with one client connected, or no client.
But as soon as the second or third clients are connecting, they only receive pong events.

To run the code, create files like i named them, and execute `node index.js`. The two clients run seperatly, `node client-1.js` & `node client-2.js`

Im sorry for the long post, but thats the smalest reproduceable exmaple i can provide.
Posted

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900