Home Manual Reference Source Test

test/src/lib/server/Watcher.spec.js

import Emitter from 'events';
import { spy, stub } from 'sinon';
import proxyquire from 'proxyquire';
import { resolveNodeId, NodeClass } from 'node-opcua';
import { ctor as throughStreamClass } from 'through2';
import expect from '../../../expect';
import Watcher, { SubscribeStream } from '../../../../src/lib/server/Watcher';

class StubMonitoredItem extends Emitter {
  constructor(error = false) {
    super();

    // Simulate first notification or error
    setTimeout(() => this.emit(error ? 'err' : 'changed', error || {}), 10);
  }
}

const StubWatcher = proxyquire('../../../../src/lib/server/Watcher', {
  './NodeStream': {
    __esModule: true,
    default: class ServerStream extends throughStreamClass({ objectMode: true }) {
      constructor() {
        super();

        setTimeout(() => this.end(), 10);
      }
    },
  },
}).default;

const FailingSubscribeStream = proxyquire('../../../../src/lib/server/Watcher', {
  'node-opcua/lib/client/client_subscription': {
    ClientSubscription: class StubClientSubscription extends Emitter {
      constructor() {
        super();

        this.monitor = spy(() => new StubMonitoredItem());

        setTimeout(() => this.emit('failure', new Error('ClientSubscription failure')), 10);
      }
    },
  },
}).SubscribeStream;

/** @test {SubscribeStream} */
describe.skip('SubscribeStream', function () {
  /** @test {SubscribeStream#constructor} */
  describe('#constructor', function () {
    it('should apply keepSessionAlive option', function () {
      const stream = new SubscribeStream();
      stream.end();

      expect(stream._keepSessionAlive, 'to be', true);
    });

    it('should not track changes instantly', function () {
      const stream = new SubscribeStream();
      stream.end();

      expect(new SubscribeStream()._trackChanges, 'to be', false);
    });

    context('once session is opened', function () {
      it('should create subscription once session is opened', function (done) {
        const stream = new SubscribeStream();
        spy(stream, 'createSubscription');

        stream.once('session-open', () => {
          expect(stream.createSubscription, 'was called once');
          done();
        });
      });
    });
  });

  /** @test {SubscribeStream#createSubscription} */
  describe('#createSubscription', function () {
    it('should forward errors while creating subscription', function () {
      const stream = new FailingSubscribeStream();

      return expect(stream, 'to error with', /ClientSubscription failure/);
    });

    it('should emit `subscription-started`', function (done) {
      const stream = new SubscribeStream();

      stream.on('subscription-started', () => done());
    });

    it('should set subscription property', function (done) {
      const stream = new SubscribeStream();

      stream.on('subscription-started', (subscription) => {
        expect(stream.subscription, 'to be defined');
        expect(stream.subscription, 'to be', subscription);
        done();
      });
    });
  });

  /** @test {SubscribeStream#processErrorMessage} */
  describe('#processErrorMessage', function () {
    it('should contain node id', function () {
      const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');
      expect(
        SubscribeStream.prototype.processErrorMessage({ nodeId }),
        'to contain',
        nodeId.toString()
      );
    });
  });

  /** @test {SubscribeStream#processChunk} */
  describe('#processChunk', function () {
    it('should call ClientSubscription#monitor', function () {
      const stream = new SubscribeStream();
      const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');

      stream.once('subscription-started', (subscription) => {
        stub(subscription, 'monitor').callsFake(() => new StubMonitoredItem());
      });

      return expect(
        [
          {
            specialNodeId: nodeId,
            nodeClass: NodeClass.Variable,
          },
        ],
        'when piped through',
        stream,
        'to yield objects satisfying',
        'to have length',
        0
      ).then(() => {
        expect(stream.subscription.monitor, 'was called once');
        expect(stream.subscription.monitor.lastCall.args[0], 'to have properties', { nodeId });
      });
    });

    it('should forward MonitoredItem errors', function () {
      const stream = new SubscribeStream();
      const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');

      stream.once('subscription-started', (subscription) => {
        stub(subscription, 'monitor').callsFake(
          () => new StubMonitoredItem(new Error('item error'))
        );
      });

      return expect(
        [
          {
            nodeId,
            nodeClass: NodeClass.Variable,
          },
        ],
        'when piped through',
        stream,
        'to error with',
        /item error/
      );
    });

    it('should forward MonitoredItem errors when given as string', function () {
      const stream = new SubscribeStream();
      const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');

      stream.once('subscription-started', (subscription) => {
        stub(subscription, 'monitor').callsFake(() => new StubMonitoredItem('item error'));
      });

      return expect(
        [
          {
            nodeId,
            nodeClass: NodeClass.Variable,
          },
        ],
        'when piped through',
        stream,
        'to error with',
        /item error/
      );
    });

    it('should forward change events', function () {
      const stream = new SubscribeStream();
      const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');
      const listener = spy();
      stream.on('change', listener);

      const changeData = {
        value: 'value',
        serverTimestamp: new Date(),
      };

      let item;

      stream.once('subscription-started', (subscription) => {
        stub(subscription, 'monitor').callsFake(() => {
          item = new StubMonitoredItem(false);
          return item;
        });
      });

      return expect(
        [
          {
            nodeId,
            nodeClass: NodeClass.Variable,
            references: {},
          },
        ],
        'when piped through',
        stream,
        'to yield objects satisfying',
        'to have length',
        0
      )
        .then(() => item.emit('changed', changeData))
        .then(() => {
          expect(listener, 'was called once');
          expect(listener.lastCall, 'to satisfy', [
            {
              nodeId,
              value: changeData.value,
              references: {},
              mtime: changeData.serverTimestamp,
            },
          ]);
        });
    });

    it('should emit delete events without a value', function () {
      const stream = new SubscribeStream();
      const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');
      const listener = spy();
      stream.on('delete', listener);

      const event = {
        serverTimestamp: new Date(),
      };

      let item;

      stream.once('subscription-started', (subscription) => {
        stub(subscription, 'monitor').callsFake(() => {
          item = new StubMonitoredItem(false);
          return item;
        });
      });

      return expect(
        [
          {
            nodeId,
            nodeClass: NodeClass.Variable,
            references: {},
          },
        ],
        'when piped through',
        stream,
        'to yield objects satisfying',
        'to have length',
        0
      )
        .then(() => item.emit('changed', event))
        .then(() => {
          expect(listener, 'was called once');
          expect(listener.lastCall, 'to satisfy', [
            {
              nodeId,
              references: {},
            },
          ]);
        });
    });
  });

  /** @test {SubscribeStream#_transform} */
  describe('#_transform', function () {
    it('should skip non-variable nodes', function (done) {
      const stream = new SubscribeStream();
      stub(stream, '_enqueueChunk').callsFake(() => {});
      stream.once('subscription-started', (subscription) => {
        expect(stream.subscription, 'to be', subscription);

        stream._transform({ nodeClass: NodeClass.Object }, 'utf8', () => {});

        expect(stream._enqueueChunk, 'was not called');
        done();
      });
    });

    const chunk = { nodeId: 'chunk', nodeClass: NodeClass.Variable };

    it('should call enqueue immediately if subscription started', function (done) {
      const stream = new SubscribeStream();
      stub(stream, '_enqueueChunk').callsFake(() => {});
      stream.once('subscription-started', (subscription) => {
        expect(stream.subscription, 'to be', subscription);

        stream._transform(chunk, 'utf8', () => {});

        expect(stream._enqueueChunk, 'was called once');
        expect(stream._enqueueChunk.lastCall, 'to satisfy', [chunk]);
        done();
      });
    });

    it('should wait for subscription to start before calling enqueue', function (done) {
      const stream = new SubscribeStream();
      stub(stream, '_enqueueChunk').callsFake(() => {});
      stream._transform(chunk, 'utf8', () => {});

      expect(stream._enqueueChunk, 'was not called');
      stream.once('subscription-started', () => {
        expect(stream._enqueueChunk, 'was called once');
        expect(stream._enqueueChunk.lastCall, 'to satisfy', [chunk]);
        done();
      });
    });
  });
});

/** @test {Watcher} */
describe.skip('Watcher', function () {
  /** @test {Watcher#constructor} */
  describe('#constructor', function () {
    it('should work without arguments', function () {
      let watcher;
      expect(() => (watcher = new StubWatcher()), 'not to throw');

      watcher.close();
    });

    it('should emit ready event once subscribe stream finished', function (done) {
      const watcher = new Watcher([resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main')]);

      watcher.once('ready', () => {
        watcher.close();
        done();
      });
    });

    it.skip('should forward change events', function (done) {
      const watcher = new Watcher([resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main')]);

      watcher.on('ready', () => {
        const event = {};

        watcher.on('change', (e) => {
          expect(e, 'to be', event);
          watcher.close();
          done();
        });

        watcher._subscribeStream.emit('change', event);
      });
    });

    it.skip('should forward NodeStream errors', function (done) {
      const watcher = new Watcher([resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main')]);

      watcher.on('error', (err) => {
        expect(err, 'to have message', 'Test');
        done();
      });

      watcher.on('ready', () => watcher._nodeStream.emit('error', new Error('Test')));
    });

    it.skip('should forward SubscribeStream errors', function (done) {
      const watcher = new Watcher([resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main')]);

      watcher.on('error', (err) => {
        expect(err, 'to have message', 'Test');
        done();
      });

      watcher.on('ready', () => watcher._subscribeStream.emit('error', new Error('Test')));
    });
  });

  /** @test {Watcher#close} */
  describe('#close', function () {
    it('should forward errors', function (done) {
      const watcher = new Watcher([resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main')]);

      watcher.on('error', (err) => {
        expect(err, 'to have message', 'session is required');
        done();
      });

      watcher.on('ready', () => {
        watcher._session = {};

        watcher.close();
      });
    });
  });
});