All files / src/consumers MultiplexConsumer.js

96.29% Statements 156/162
75% Branches 15/20
100% Functions 6/6
96.29% Lines 156/162

Press n or j to go to the next uncovered block, b, p or k for the previous block.

x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 1x 1x 1x 3x 3x 3x 3x 3x 1x 1x 3x 3x 3x 1x 1x 5x 3x 3x 3x 3x 3x 3x 5x 3x 3x 3x 3x 5x 1x 1x 1x 1x 1x 1x 12x 12x 12x 12x 12x       12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 1x 1x 1x 1x 1x 1x 12x 12x 12x 12x 12x 12x 12x 11x 11x 11x 3x 3x 3x 8x 8x 8x 8x 8x       8x 8x 8x 8x 8x 11x 12x 12x 1x 1x 1x 1x 1x 1x 1x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 1x 1x 1x 1x  
// @ts-check
// SPDX-License-Identifier: Zlib
// SPDX-FileCopyrightText: 2024 CERN (home.cern)
 
import { EventEmitter } from "node:events";
import { isNil } from "@cern/nodash";
 
/**
 * @typedef {{ camera: CamExpress.Camera, duration?: number }} MuxCamera
 */
 
const DEFAULT_DURATION_MULTIPLEX = 1000;
 
/**
 * @implements {CamExpress.Consumer}
 * @extends {EventEmitter}
 * @emits :
 *  - 'frame' when a new jpeg frame is received from source
 *  - 'error' when an error occurs
 *  - 'close' once the connection with source is closed
 */
class MultiplexConsumer extends EventEmitter {
 
  /**
   * @param {CamExpress.MultiplexConsumerOptions} options
   */
  constructor(options) {
    super();
 
    this._connected = false;
 
    this._service = options.service;
    this._cameras = options.cameras;
    this._index = options?.index || 0;
 
    /** @type {MuxCamera|undefined} */
    this._reference = undefined;
    /** @type {number|undefined} */
    this._subscription = undefined;
 
    /** @type {NodeJS.Timeout|undefined} */
    this._timeoutHandler = undefined;
 
    this.emitError = (err) => {
      if (this._connected) {
        this.emit("error", err);
      }
    };
    this.emitFrame = this.emit.bind(this, "frame");
 
    this.rotateChannels();
  }
 
  connect() {
    if (this._connected) { return; }
    this._connected = true;
  }
 
  disconnect() {
    if (!this._connected) { return; }
    this._connected = false;
 
    // clear the force rotation timeout
    clearTimeout(this._timeoutHandler);
    this._timeoutHandler = undefined;
 
    if (!isNil(this._reference) && !isNil(this._subscription)) {
      this._service.unsubscribe(this._reference.camera, this._subscription);
    }
 
    this.emit("close");
  }
 
  /**
   * Rotate between the cameras references provided in the multiplex.
   * Handle errors and check for camera timeout (max allowed duration).
   */
  rotateChannels() {
    const startTime = Date.now();
 
    this._reference = this._cameras[this._index];
 
    if (isNil(this._reference)) {
      this.emitError(new Error("The referenced camera was not found"));
      return;
    }
 
    const reference = this._reference; // pointer to the current camera reference
 
    // If no duration, or badly declared: replace with default
    const duration =
      (reference.duration !== undefined &&
        Number.isFinite(reference.duration)) ?
        reference.duration :
        DEFAULT_DURATION_MULTIPLEX;
 
    // To avoid stalling, we force rotation if it took too long
    const MAX_DURATION = duration * 2;
 
    this._timeoutHandler = setTimeout(() => {
      this.emitError(new Error(
        `Camera '${reference.camera.name}' is stalling.` +
        " Force rotation to the next one"
      ));
 
      this.rotateNext(reference.camera);
    }, MAX_DURATION);
 
    // Because we only subscribe / unsubscribe, if a unsubscribeTimeout is
    // specified ( > duration), even if the multiplex duration is small,
    // there will be no opening delay for cameras websockets: they will not be closed.
    this._subscription = this._service.subscribe(
      reference.camera, undefined, (data) => {
 
        // Check if we want to close the socket
        if (!data) {
          this.disconnect();
          return;
        }
 
        // Otherwise, we send the datas
        if (Buffer.isBuffer(data)) {
          this.emitFrame(data);
        }
        else if (data instanceof Error) {
          this.emitError(data);
        }
 
        // Rotate to the next camera if the duration has passed
        if (Date.now() > startTime + duration) {
          this.rotateNext(reference.camera);
        }
      }
    );
  }
 
  /**
   * Increment the index, clean the current camera handlers
   * and connect to the next camera in the mutliplex
   * @param {CamExpress.Camera} camera
   */
  rotateNext(camera) {
 
    // clear the force rotation timeout
    clearTimeout(this._timeoutHandler);
    this._timeoutHandler = undefined;
 
    // unsuscribe from current camera
    if (!isNil(this._subscription)) {
      this._service.unsubscribe(camera, this._subscription);
    }
 
    // increment the index to the next one (rotation)
    this._index = (this._index + 1) % (this._cameras?.length ?? 0);
 
    // recursive call
    this.rotateChannels();
  }
 
}
 
export default MultiplexConsumer;