All files / src/consumers MultiplexConsumer.js

96.29% Statements 156/162
75% Branches 15/20
100% Functions 6/6
96.29% Lines 156/162

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 1631x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 3x 1x 1x 1x 3x 3x 3x 3x 3x 1x 1x 3x 3x 3x 1x 1x 5x 3x 3x 3x 3x 3x 3x 5x 3x 3x 3x 3x 5x 1x 1x 1x 1x 1x 1x 12x 12x 12x 12x 12x       12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 12x 1x 1x 1x 1x 1x 1x 12x 12x 12x 12x 12x 12x 12x 11x 11x 11x 3x 3x 3x 8x 8x 8x 8x 8x       8x 8x 8x 8x 8x 11x 12x 12x 1x 1x 1x 1x 1x 1x 1x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 9x 1x 1x 1x 1x  
// @ts-check
// SPDX-License-Identifier: Zlib
// SPDX-FileCopyrightText: 2024 CERN (home.cern)
 
import { EventEmitter } from "node:events";
import { isNil } from "@cern/nodash";
 
/**
 * @typedef {{ camera: CamExpress.Camera, duration?: number }} MuxCamera
 */
 
const DEFAULT_DURATION_MULTIPLEX = 1000;
 
/**
 * @implements {CamExpress.Consumer}
 * @extends {EventEmitter}
 * @emits :
 *  - 'frame' when a new jpeg frame is received from source
 *  - 'error' when an error occurs
 *  - 'close' once the connection with source is closed
 */
class MultiplexConsumer extends EventEmitter {
 
  /**
   * @param {CamExpress.MultiplexConsumerOptions} options
   */
  constructor(options) {
    super();
 
    this._connected = false;
 
    this._service = options.service;
    this._cameras = options.cameras;
    this._index = options?.index || 0;
 
    /** @type {MuxCamera|undefined} */
    this._reference = undefined;
    /** @type {number|undefined} */
    this._subscription = undefined;
 
    /** @type {NodeJS.Timeout|undefined} */
    this._timeoutHandler = undefined;
 
    this.emitError = (err) => {
      if (this._connected) {
        this.emit("error", err);
      }
    };
    this.emitFrame = this.emit.bind(this, "frame");
 
    this.rotateChannels();
  }
 
  connect() {
    if (this._connected) { return; }
    this._connected = true;
  }
 
  disconnect() {
    if (!this._connected) { return; }
    this._connected = false;
 
    // clear the force rotation timeout
    clearTimeout(this._timeoutHandler);
    this._timeoutHandler = undefined;
 
    if (!isNil(this._reference) && !isNil(this._subscription)) {
      this._service.unsubscribe(this._reference.camera, this._subscription);
    }
 
    this.emit("close");
  }
 
  /**
   * Rotate between the cameras references provided in the multiplex.
   * Handle errors and check for camera timeout (max allowed duration).
   */
  rotateChannels() {
    const startTime = Date.now();
 
    this._reference = this._cameras[this._index];
 
    if (isNil(this._reference)) {
      this.emitError(new Error("The referenced camera was not found"));
      return;
    }
 
    const reference = this._reference; // pointer to the current camera reference
 
    // If no duration, or badly declared: replace with default
    const duration =
      (reference.duration !== undefined &&
        Number.isFinite(reference.duration)) ?
        reference.duration :
        DEFAULT_DURATION_MULTIPLEX;
 
    // To avoid stalling, we force rotation if it took too long
    const MAX_DURATION = duration * 2;
 
    this._timeoutHandler = setTimeout(() => {
      this.emitError(new Error(
        `Camera '${reference.camera.name}' is stalling.` +
        " Force rotation to the next one"
      ));
 
      this.rotateNext(reference.camera);
    }, MAX_DURATION);
 
    // Because we only subscribe / unsubscribe, if a unsubscribeTimeout is
    // specified ( > duration), even if the multiplex duration is small,
    // there will be no opening delay for cameras websockets: they will not be closed.
    this._subscription = this._service.subscribe(
      reference.camera, undefined, (data) => {
 
        // Check if we want to close the socket
        if (!data) {
          this.disconnect();
          return;
        }
 
        // Otherwise, we send the datas
        if (Buffer.isBuffer(data)) {
          this.emitFrame(data);
        }
        else if (data instanceof Error) {
          this.emitError(data);
        }
 
        // Rotate to the next camera if the duration has passed
        if (Date.now() > startTime + duration) {
          this.rotateNext(reference.camera);
        }
      }
    );
  }
 
  /**
   * Increment the index, clean the current camera handlers
   * and connect to the next camera in the mutliplex
   * @param {CamExpress.Camera} camera
   */
  rotateNext(camera) {
 
    // clear the force rotation timeout
    clearTimeout(this._timeoutHandler);
    this._timeoutHandler = undefined;
 
    // unsuscribe from current camera
    if (!isNil(this._subscription)) {
      this._service.unsubscribe(camera, this._subscription);
    }
 
    // increment the index to the next one (rotation)
    this._index = (this._index + 1) % (this._cameras?.length ?? 0);
 
    // recursive call
    this.rotateChannels();
  }
 
}
 
export default MultiplexConsumer;