Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 1x 1x 1x 3x 3x 3x 3x 3x 1x 1x 3x 3x 3x 1x 1x 5x 3x 3x 3x 3x 3x 3x 5x 3x 3x 3x 3x 5x 1x 1x 1x 1x 1x 1x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 1x 1x 1x 1x 1x 1x 12x 12x 12x 12x 12x 12x 12x 11x 11x 11x 3x 3x 3x 8x 8x 8x 8x 8x 8x 8x 8x 8x 8x 11x 12x 12x 1x 1x 1x 1x 1x 1x 1x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 1x 1x 1x 1x | // @ts-check // SPDX-License-Identifier: Zlib // SPDX-FileCopyrightText: 2024 CERN (home.cern) import { EventEmitter } from "node:events"; import { isNil } from "@cern/nodash"; /** * @typedef {{ camera: CamExpress.Camera, duration?: number }} MuxCamera */ const DEFAULT_DURATION_MULTIPLEX = 1000; /** * @implements {CamExpress.Consumer} * @extends {EventEmitter} * @emits : * - 'frame' when a new jpeg frame is received from source * - 'error' when an error occurs * - 'close' once the connection with source is closed */ class MultiplexConsumer extends EventEmitter { /** * @param {CamExpress.MultiplexConsumerOptions} options */ constructor(options) { super(); this._connected = false; this._service = options.service; this._cameras = options.cameras; this._index = options?.index || 0; /** @type {MuxCamera|undefined} */ this._reference = undefined; /** @type {number|undefined} */ this._subscription = undefined; /** @type {NodeJS.Timeout|undefined} */ this._timeoutHandler = undefined; this.emitError = (err) => { if (this._connected) { this.emit("error", err); } }; this.emitFrame = this.emit.bind(this, "frame"); this.rotateChannels(); } connect() { if (this._connected) { return; } this._connected = true; } disconnect() { if (!this._connected) { return; } this._connected = false; // clear the force rotation timeout clearTimeout(this._timeoutHandler); this._timeoutHandler = undefined; if (!isNil(this._reference) && !isNil(this._subscription)) { this._service.unsubscribe(this._reference.camera, this._subscription); } this.emit("close"); } /** * Rotate between the cameras references provided in the multiplex. * Handle errors and check for camera timeout (max allowed duration). */ rotateChannels() { const startTime = Date.now(); this._reference = this._cameras[this._index]; if (isNil(this._reference)) { this.emitError(new Error("The referenced camera was not found")); return; } const reference = this._reference; // pointer to the current camera reference // If no duration, or badly declared: replace with default const duration = (reference.duration !== undefined && Number.isFinite(reference.duration)) ? reference.duration : DEFAULT_DURATION_MULTIPLEX; // To avoid stalling, we force rotation if it took too long const MAX_DURATION = duration * 2; this._timeoutHandler = setTimeout(() => { this.emitError(new Error( `Camera '${reference.camera.name}' is stalling.` + " Force rotation to the next one" )); this.rotateNext(reference.camera); }, MAX_DURATION); // Because we only subscribe / unsubscribe, if a unsubscribeTimeout is // specified ( > duration), even if the multiplex duration is small, // there will be no opening delay for cameras websockets: they will not be closed. this._subscription = this._service.subscribe( reference.camera, undefined, (data) => { // Check if we want to close the socket if (!data) { this.disconnect(); return; } // Otherwise, we send the datas if (Buffer.isBuffer(data)) { this.emitFrame(data); } else if (data instanceof Error) { this.emitError(data); } // Rotate to the next camera if the duration has passed if (Date.now() > startTime + duration) { this.rotateNext(reference.camera); } } ); } /** * Increment the index, clean the current camera handlers * and connect to the next camera in the mutliplex * @param {CamExpress.Camera} camera */ rotateNext(camera) { // clear the force rotation timeout clearTimeout(this._timeoutHandler); this._timeoutHandler = undefined; // unsuscribe from current camera if (!isNil(this._subscription)) { this._service.unsubscribe(camera, this._subscription); } // increment the index to the next one (rotation) this._index = (this._index + 1) % (this._cameras?.length ?? 0); // recursive call this.rotateChannels(); } } export default MultiplexConsumer; |