Home Manual Reference Source Test

test/src/lib/server/WaitingStream.spec.js

import Emitter from 'events';
import { spy } from 'sinon';
import expect from '../../../expect';
import WaitingStream, { waitForDependencies } from '../../../../src/lib/server/WaitingStream';
import QueueStream from '../../../../src/lib/server/QueueStream';
import NodeId from '../../../../src/lib/model/opcua/NodeId';

class StubStream extends Emitter {
  constructor() {
    super();

    this.spies = {
      _enqueueChunk: spy(),
      _flush: spy(),
    };
  }

  _enqueueChunk(...args) {
    this.spies._enqueueChunk(...args);
    this.emit('processed-chunk', ...args);
  }

  _flush(callback) {
    this.spies._flush(callback);

    callback(null);
  }
}

class NoDeps extends waitForDependencies(StubStream) {
  dependenciesFor() {
    return [];
  }
}

class WithDeps extends waitForDependencies(StubStream) {
  constructor(deps = [new NodeId('Testing')]) {
    super();
    this._deps = deps;
  }

  dependenciesFor() {
    const deps = this._deps;

    this._deps = [];

    return deps;
  }
}

/** @test {WaitingStream} */
describe('WaitingStream', function () {
  /** @test {WaitingStream#constructor} */
  describe('#constructor', function () {
    it('should return a QueueStream, mixed with waitForDependencies', function () {
      const stream = new WaitingStream();

      expect(stream, 'to be a', QueueStream);
      expect(stream.dependenciesFor, 'to be defined');
    });
  });

  /** @test {WaitingStream#dependenciesFor} */
  describe('#dependenciesFor', function () {
    it('should throw if not overridden', function () {
      return expect(WaitingStream.prototype.dependenciesFor, 'to throw', /must be implemented/);
    });
  });

  /** @test {WaitingStream#_enqueueChunk} */
  describe('#_enqueueChunk', function () {
    context('when a dependency-free file is pushed', function () {
      it('should just call super', function () {
        const stream = new NoDeps();
        stream._enqueueChunk({ nodeId: new NodeId('chunk') });

        return expect(stream.spies._enqueueChunk, 'was called once');
      });
    });

    context('when a file with dependencies is pushed', function () {
      it('should not call super', function () {
        const stream = new WithDeps();
        stream._enqueueChunk({ nodeId: new NodeId('Source') });

        return expect(stream.spies._enqueueChunk, 'was not called');
      });

      it('should add it to the dependencies map', function () {
        const stream = new WithDeps();
        const chunk = { nodeId: new NodeId('Source') };
        stream._enqueueChunk(chunk);

        expect(stream._waitingFor['ns=1;s=Testing'], 'to be defined');
        expect(stream._waitingFor['ns=1;s=Testing'], 'to have an item satisfying', 'to be', chunk);
      });

      it('should ignore system types', function () {
        const stream = new WithDeps([new NodeId(NodeId.NodeIdType.STRING, 'System', 0)]);
        stream._enqueueChunk({ nodeId: new NodeId('Source') });

        expect(stream._waitingFor['ns=1;s=Testing'], 'to be undefined');
        return expect(stream.spies._enqueueChunk, 'was called');
      });

      it('should ignore atserver types', function () {
        const stream = new WithDeps([new NodeId('VariableTypes.ATVISE.Display')]);
        stream._enqueueChunk({ nodeId: new NodeId('Source') });

        expect(stream._waitingFor['ns=1;s=Testing'], 'to be undefined');
        return expect(stream.spies._enqueueChunk, 'was called');
      });

      it('should ignore already processed nodes', function () {
        const dep = new NodeId('TestDep');
        const stream = new WithDeps([dep]);
        stream._finishedProcessing[dep.toString()] = true;
        stream._enqueueChunk({ nodeId: new NodeId('Source') });

        expect(stream._waitingFor['ns=1;s=Testing'], 'to be undefined');
        return expect(stream.spies._enqueueChunk, 'was called');
      });

      it('should process files once dependencies are finished', function (done) {
        const dep = new NodeId('TestDep');
        const dep2 = new NodeId('TestDep2');
        const source = new NodeId('Source');
        const stream = new WithDeps([dep, dep2]);

        stream._enqueueChunk({ nodeId: source });
        expect(stream.spies._enqueueChunk, 'was not called');

        stream._enqueueChunk({ nodeId: dep });
        expect(stream.spies._enqueueChunk, 'was called once');

        stream.once('processed-chunk', async ({ nodeId }) => {
          try {
            await expect(nodeId, 'to be', source);
            await expect(stream.spies._enqueueChunk, 'to have calls satisfying', [
              { args: [{ nodeId: dep }] },
              { args: [{ nodeId: dep2 }] },
              { args: [{ nodeId: source }] },
            ]);
            done();
          } catch (e) {
            done(e);
          }
        });

        stream._enqueueChunk({ nodeId: dep2 });
      });
    });
  });

  /** @test {WaitingStream#_flush} */
  describe('#_flush', function () {
    context('without pending operations', function () {
      it('should call super', function () {
        const stream = new NoDeps();
        stream._flush(() => {});
        return expect(stream.spies._flush, 'was called once');
      });
    });

    context('with pending operations', function () {
      it('should try to process dependents', function () {
        const stream = new WithDeps([new NodeId('Dep')]);
        stream._enqueueChunk({ nodeId: new NodeId('Depending') });

        return expect((cb) => stream._flush(cb), 'to call the callback without error');
      });
    });
  });
});