Home Manual Reference Source Test

src/lib/server/Watcher.js

/* eslint-plugin-jsdoc does not recognize the "emits" tag */
/* eslint-disable jsdoc/check-tag-names */

/* Needed as long as https://github.com/gajus/eslint-plugin-jsdoc/issues/56 is open */
/* eslint-disable jsdoc/check-param-names */

import Emitter from 'events';
import { ClientSubscription } from 'node-opcua/lib/client/client_subscription';
import { AttributeIds } from 'node-opcua/lib/services/read_service';
import { NodeClass } from 'node-opcua/lib/datamodel/nodeclass';
import Logger from 'gulplog';
import ProjectConfig from '../../config/ProjectConfig';
import { reportProgress } from '../helpers/log';
import Session from './Session';
import NodeBrowser from './NodeBrowser';

/**
 * Watches the given nodes for value changes.
 * @emit {ReadStream.ReadResult} Emits `change` events when a watched node changes.
 */
export default class Watcher extends Emitter {
  /**
   * Creates a new Watcher with the given nodes.
   * @param {NodeId[]} nodes The nodes to watch (recursively).
   */
  constructor(nodes = ProjectConfig.nodesToWatch) {
    super();

    /**
     * The browser used to subscribe to server nodes.
     * @type {NodeBrowser}
     */
    this._nodeBrowser = new NodeBrowser({
      handleNode: this._subscribe.bind(this),
    });

    reportProgress(this._nodeBrowser.browse(nodes), {
      getter: () => this._nodeBrowser._pushed,
      formatter: (count) => `Subscribed to ${count} nodes`,
    })
      .then(() => this.emit('ready'))
      .catch((err) => this.emit('error', err));

    /**
     * Resolved once the server subscription is set up.
     * @type {Promise<any>}
     */
    this.subscriptionStarted = this._setupSubscription().catch((err) => this.emit('error', err));
  }

  /**
   * Initializes a server subscription.
   * @return {Promise<node-opcua~ClientSubscription>} A setup subscription.
   */
  _setupSubscription() {
    return Session.create().then(
      (session) =>
        new Promise((resolve, reject) => {
          /** The current session, if connected @type {Session} */
          this._session = session;

          const subscription = new ClientSubscription(session, {
            requestedPublishingInterval: 100,
            requestedLifetimeCount: 1000,
            requestedMaxKeepAliveCount: 12,
            maxNotificationsPerPublish: 10,
            publishingEnabled: true,
            priority: 10,
          });

          subscription.on('started', () => resolve(subscription));
          subscription.on('failure', (err) => reject(err));
        })
    );
  }

  /**
   * Subscribes to value changes of a single node.
   * @param {BrowsedNode} node A browsed node.
   */
  async _subscribe(node) {
    if (node.nodeClass !== NodeClass.Variable) {
      return undefined;
    }
    const subscription = await this.subscriptionStarted;

    const nodeId = node.id;

    const item = subscription.monitor(
      {
        nodeId,
        attributeId: AttributeIds.Value,
      },
      {
        clientHandle: 13,
        samplingInterval: 250,
        queueSize: 123,
        discardOldest: true,
      }
    );

    return new Promise((resolve, reject) => {
      // Sometimes the changed event is not emitted...
      // Fixes #202
      const timeout = setTimeout(() => {
        Logger.debug(`Error monitoring '${nodeId.value}': Did not receive initial value. Retry...`);

        try {
          item.terminate();
        } catch (e) {
          Logger.warn('Failed to terminate subscription', e);
        }

        return this._subscribe(node).then(resolve, reject);
      }, 1000);

      item.once('changed', () => {
        clearTimeout(timeout);
        item.on(
          'changed',
          this._handleChange.bind(this, {
            nodeId,
          })
        );

        resolve();
      });
      item.on('err', (err) => {
        clearTimeout(timeout);
        reject(err instanceof Error ? err : new Error(err));
      });
    }).catch((err) => {
      throw Object.assign(err, { node });
    });
  }

  /**
   * Called once a change has been detected and emits a 'change' or 'delete' event.
   * @param {Object} node The node that changed.
   * @param {?node-opcua~Variant} dataValue The current value.
   */
  _handleChange({ nodeId }, dataValue) {
    this.emit(dataValue.value ? 'change' : 'delete', {
      // nodeClass,
      nodeId,
      value: dataValue.value,
      mtime: dataValue.serverTimestamp,
    });
  }

  /**
   * Ends monitoring nodes.
   */
  close() {
    if (this._session) {
      Session.close(this._session).catch((err) => this.emit('error', err));
    }
  }
}