test/src/lib/server/QueueStream.spec.js
import { StatusCodes } from 'node-opcua';
import { spy, stub } from 'sinon';
import expect from '../../../expect';
import QueueStream from '../../../../src/lib/server/QueueStream';
function fakeQueueStream(
err = null,
status = StatusCodes.Good,
onSuccess = (done) => done(),
options = {}
) {
return new (class FakeQueueStream extends QueueStream {
processErrorMessage(chunk) {
return `Error processing ${chunk}`;
}
processChunk(chunk, handle) {
handle(err, status, onSuccess);
}
})(options);
}
/** @test {QueueStream} */
describe('QueueStream', function () {
/** @test {QueueStream#constructor} */
describe('#constructor', function () {
it('should work without options', function () {
expect(() => new QueueStream(), 'not to throw');
});
it('should store maxParallel option', function () {
expect(new QueueStream({ maxParallel: 99 })._maxParallel, 'to equal', 99);
});
it('should store start date', function () {
expect(new QueueStream()._start, 'to be a', 'number');
});
it('should listen for processed-chunk events', function () {
expect(new QueueStream().listenerCount('processed-chunk'), 'to be', 1);
});
});
/** @test {QueueStream#hasPending} */
describe('#hasPending', function () {
it('should return true if there are queued operations', function () {
const stream = new QueueStream();
stream._queued.push({});
expect(stream.hasPending, 'to be', true);
});
it('should return true if there are running operations', function () {
const stream = new QueueStream();
stream._processing = 1;
expect(stream.hasPending, 'to be', true);
});
it('should return false otherwise', function () {
expect(new QueueStream().hasPending, 'to be', false);
});
});
/** @test {QueueStream#queueEmpty} */
describe('#queueEmpty', function () {
it('should return true if not operations are queued', function () {
expect(new QueueStream().queueEmpty, 'to be', true);
});
it('should return false if there are queued operations', function () {
const stream = new QueueStream();
stream._queued.push({});
expect(stream.queueEmpty, 'to be', false);
});
});
/** @test {QueueStream#processed} */
describe('#processed', function () {
it('should return the number of processed items', function () {
const stream = new QueueStream();
stream._processed = 13;
expect(stream.processed, 'to equal', 13);
});
});
/** @test {QueueStream#opsPerSecond} */
describe('#opsPerSecond', function () {
it('should return 0 right after creating the stream', function () {
expect(new QueueStream().opsPerSecond, 'to be', 0);
});
it('should return the number of processed items after one second', function () {
const stream = new QueueStream();
stream._start -= 1000;
stream._processed = 13;
expect(stream.opsPerSecond, 'to be close to', 13, 0.5);
});
});
/** @test {QueueStream#processErrorMessage} */
describe('#processErrorMessage', function () {
it('should throw if not overridden', function () {
expect(() => new QueueStream().processErrorMessage({}), 'to throw', /must be implemented/);
});
});
/** @test {QueueStream#processChunk} */
describe('#processChunk', function () {
it('should throw if not overridden', function () {
expect(
(cb) => new QueueStream().processChunk({}, cb),
'to call the callback with error',
/must be implemented/
);
});
});
/** @test {QueueStream#_processChunk} */
describe('#_processChunk', function () {
it('should increase #_processing', function () {
const stream = fakeQueueStream();
expect(stream._processing, 'to equal', 0);
stream._processChunk('item');
expect(stream._processing, 'to equal', 0);
});
it('should emit processed-chunk event', function () {
const stream = fakeQueueStream();
const listener = spy();
stream.on('processed-chunk', listener);
stream._processChunk('item');
expect(listener, 'was called once');
expect(listener.lastCall.args[0], 'to equal', 'item');
});
it('should emit errors', function () {
const stream = fakeQueueStream(new Error('Test'));
const listener = spy();
stream.on('error', listener);
stream._processChunk('item');
expect(listener, 'was called once');
expect(listener.lastCall, 'to satisfy', [/Error processing item: Test/]);
});
it('should emit error on invalid status', function () {
const stream = fakeQueueStream(null, StatusCodes.BadAggregateInvalidInputs);
const listener = spy();
stream.on('error', listener);
stream._processChunk('item');
expect(listener, 'was called once');
expect(listener.lastCall, 'to satisfy', [/invalid data inputs/]);
});
});
/** @test {QueueStream#_enqueueChunk} */
describe('#_enqueueChunk', function () {
it('should call _processChunk if allowed', function () {
const stream = fakeQueueStream();
stub(stream, '_processChunk');
stream._enqueueChunk('item');
expect(stream._processChunk, 'was called once');
expect(stream._processChunk.lastCall, 'to satisfy', ['item']);
});
it('should add chunk to queue if maxParallel is reached', function () {
const stream = fakeQueueStream();
stream._processing = stream._maxParallel;
stream._enqueueChunk('item');
expect(stream._queued, 'to have length', 1);
expect(stream._queued, 'to contain', 'item');
});
});
/** @test {QueueStream#_transform} */
describe('#_transform', function () {
it('should wait for session to open', function () {
const stream = fakeQueueStream();
stub(stream, '_enqueueChunk');
stream._transform('item', 'utf8', () => {});
expect(stream._enqueueChunk, 'was not called');
});
it('should enqueue item once session is open', function (done) {
const stream = fakeQueueStream();
stub(stream, '_enqueueChunk');
stream.once('session-open', () => {
stream._transform('item', 'utf8', () => {});
expect(stream._enqueueChunk, 'was called once');
expect(stream._enqueueChunk.lastCall, 'to satisfy', ['item']);
done();
});
});
});
/** @test {QueueStream#_flush} */
describe('#_flush', function () {
it('should wait for queue to drain', function () {
const stream = fakeQueueStream();
stream._processing = 1;
const listener = spy();
stream._flush(listener);
expect(listener, 'was not called');
stream.emit('drained');
expect(listener, 'was called once');
});
it('should flush instantly if queue is empty', function () {
const stream = fakeQueueStream();
const listener = spy();
stream._flush(listener);
expect(listener, 'was called once');
});
});
context('when chunk has been processed', function () {
context('and queue is empty', function () {
it('should emit drained if not processing any items', function () {
const stream = new QueueStream();
const listener = spy();
stream.on('drained', listener);
stream.emit('processed-chunk');
expect(listener, 'was called once');
});
it('should not emit drained if processing an items', function () {
const stream = new QueueStream();
stream._processing = 1;
const listener = spy();
stream.on('drained', listener);
stream.emit('processed-chunk');
expect(listener, 'was not called');
});
});
context('and queue is not empty', function () {
it('should call _processChunk with queued chunk', function () {
const stream = new QueueStream();
const item = {};
stub(stream, '_processChunk');
stream._queued = [item];
stream.emit('processed-chunk');
expect(stream._processChunk, 'was called once');
expect(stream._processChunk, 'was called with', item);
});
});
});
});