Home Manual Reference Source Test

test/src/lib/server/QueueStream.spec.js

  1. import { StatusCodes } from 'node-opcua';
  2. import { spy, stub } from 'sinon';
  3. import expect from '../../../expect';
  4. import QueueStream from '../../../../src/lib/server/QueueStream';
  5.  
  6. function fakeQueueStream(
  7. err = null,
  8. status = StatusCodes.Good,
  9. onSuccess = (done) => done(),
  10. options = {}
  11. ) {
  12. return new (class FakeQueueStream extends QueueStream {
  13. processErrorMessage(chunk) {
  14. return `Error processing ${chunk}`;
  15. }
  16. processChunk(chunk, handle) {
  17. handle(err, status, onSuccess);
  18. }
  19. })(options);
  20. }
  21.  
  22. /** @test {QueueStream} */
  23. describe('QueueStream', function () {
  24. /** @test {QueueStream#constructor} */
  25. describe('#constructor', function () {
  26. it('should work without options', function () {
  27. expect(() => new QueueStream(), 'not to throw');
  28. });
  29.  
  30. it('should store maxParallel option', function () {
  31. expect(new QueueStream({ maxParallel: 99 })._maxParallel, 'to equal', 99);
  32. });
  33.  
  34. it('should store start date', function () {
  35. expect(new QueueStream()._start, 'to be a', 'number');
  36. });
  37.  
  38. it('should listen for processed-chunk events', function () {
  39. expect(new QueueStream().listenerCount('processed-chunk'), 'to be', 1);
  40. });
  41. });
  42.  
  43. /** @test {QueueStream#hasPending} */
  44. describe('#hasPending', function () {
  45. it('should return true if there are queued operations', function () {
  46. const stream = new QueueStream();
  47. stream._queued.push({});
  48.  
  49. expect(stream.hasPending, 'to be', true);
  50. });
  51.  
  52. it('should return true if there are running operations', function () {
  53. const stream = new QueueStream();
  54. stream._processing = 1;
  55.  
  56. expect(stream.hasPending, 'to be', true);
  57. });
  58.  
  59. it('should return false otherwise', function () {
  60. expect(new QueueStream().hasPending, 'to be', false);
  61. });
  62. });
  63.  
  64. /** @test {QueueStream#queueEmpty} */
  65. describe('#queueEmpty', function () {
  66. it('should return true if not operations are queued', function () {
  67. expect(new QueueStream().queueEmpty, 'to be', true);
  68. });
  69.  
  70. it('should return false if there are queued operations', function () {
  71. const stream = new QueueStream();
  72. stream._queued.push({});
  73.  
  74. expect(stream.queueEmpty, 'to be', false);
  75. });
  76. });
  77.  
  78. /** @test {QueueStream#processed} */
  79. describe('#processed', function () {
  80. it('should return the number of processed items', function () {
  81. const stream = new QueueStream();
  82. stream._processed = 13;
  83.  
  84. expect(stream.processed, 'to equal', 13);
  85. });
  86. });
  87.  
  88. /** @test {QueueStream#opsPerSecond} */
  89. describe('#opsPerSecond', function () {
  90. it('should return 0 right after creating the stream', function () {
  91. expect(new QueueStream().opsPerSecond, 'to be', 0);
  92. });
  93.  
  94. it('should return the number of processed items after one second', function () {
  95. const stream = new QueueStream();
  96. stream._start -= 1000;
  97. stream._processed = 13;
  98.  
  99. expect(stream.opsPerSecond, 'to be close to', 13, 0.5);
  100. });
  101. });
  102.  
  103. /** @test {QueueStream#processErrorMessage} */
  104. describe('#processErrorMessage', function () {
  105. it('should throw if not overridden', function () {
  106. expect(() => new QueueStream().processErrorMessage({}), 'to throw', /must be implemented/);
  107. });
  108. });
  109.  
  110. /** @test {QueueStream#processChunk} */
  111. describe('#processChunk', function () {
  112. it('should throw if not overridden', function () {
  113. expect(
  114. (cb) => new QueueStream().processChunk({}, cb),
  115. 'to call the callback with error',
  116. /must be implemented/
  117. );
  118. });
  119. });
  120.  
  121. /** @test {QueueStream#_processChunk} */
  122. describe('#_processChunk', function () {
  123. it('should increase #_processing', function () {
  124. const stream = fakeQueueStream();
  125.  
  126. expect(stream._processing, 'to equal', 0);
  127. stream._processChunk('item');
  128. expect(stream._processing, 'to equal', 0);
  129. });
  130.  
  131. it('should emit processed-chunk event', function () {
  132. const stream = fakeQueueStream();
  133. const listener = spy();
  134. stream.on('processed-chunk', listener);
  135.  
  136. stream._processChunk('item');
  137. expect(listener, 'was called once');
  138. expect(listener.lastCall.args[0], 'to equal', 'item');
  139. });
  140.  
  141. it('should emit errors', function () {
  142. const stream = fakeQueueStream(new Error('Test'));
  143. const listener = spy();
  144. stream.on('error', listener);
  145.  
  146. stream._processChunk('item');
  147. expect(listener, 'was called once');
  148. expect(listener.lastCall, 'to satisfy', [/Error processing item: Test/]);
  149. });
  150.  
  151. it('should emit error on invalid status', function () {
  152. const stream = fakeQueueStream(null, StatusCodes.BadAggregateInvalidInputs);
  153.  
  154. const listener = spy();
  155. stream.on('error', listener);
  156.  
  157. stream._processChunk('item');
  158. expect(listener, 'was called once');
  159. expect(listener.lastCall, 'to satisfy', [/invalid data inputs/]);
  160. });
  161. });
  162.  
  163. /** @test {QueueStream#_enqueueChunk} */
  164. describe('#_enqueueChunk', function () {
  165. it('should call _processChunk if allowed', function () {
  166. const stream = fakeQueueStream();
  167. stub(stream, '_processChunk');
  168.  
  169. stream._enqueueChunk('item');
  170. expect(stream._processChunk, 'was called once');
  171. expect(stream._processChunk.lastCall, 'to satisfy', ['item']);
  172. });
  173.  
  174. it('should add chunk to queue if maxParallel is reached', function () {
  175. const stream = fakeQueueStream();
  176. stream._processing = stream._maxParallel;
  177.  
  178. stream._enqueueChunk('item');
  179. expect(stream._queued, 'to have length', 1);
  180. expect(stream._queued, 'to contain', 'item');
  181. });
  182. });
  183.  
  184. /** @test {QueueStream#_transform} */
  185. describe('#_transform', function () {
  186. it('should wait for session to open', function () {
  187. const stream = fakeQueueStream();
  188. stub(stream, '_enqueueChunk');
  189.  
  190. stream._transform('item', 'utf8', () => {});
  191. expect(stream._enqueueChunk, 'was not called');
  192. });
  193.  
  194. it('should enqueue item once session is open', function (done) {
  195. const stream = fakeQueueStream();
  196. stub(stream, '_enqueueChunk');
  197.  
  198. stream.once('session-open', () => {
  199. stream._transform('item', 'utf8', () => {});
  200. expect(stream._enqueueChunk, 'was called once');
  201. expect(stream._enqueueChunk.lastCall, 'to satisfy', ['item']);
  202. done();
  203. });
  204. });
  205. });
  206.  
  207. /** @test {QueueStream#_flush} */
  208. describe('#_flush', function () {
  209. it('should wait for queue to drain', function () {
  210. const stream = fakeQueueStream();
  211. stream._processing = 1;
  212. const listener = spy();
  213. stream._flush(listener);
  214.  
  215. expect(listener, 'was not called');
  216.  
  217. stream.emit('drained');
  218. expect(listener, 'was called once');
  219. });
  220.  
  221. it('should flush instantly if queue is empty', function () {
  222. const stream = fakeQueueStream();
  223. const listener = spy();
  224. stream._flush(listener);
  225.  
  226. expect(listener, 'was called once');
  227. });
  228. });
  229.  
  230. context('when chunk has been processed', function () {
  231. context('and queue is empty', function () {
  232. it('should emit drained if not processing any items', function () {
  233. const stream = new QueueStream();
  234. const listener = spy();
  235. stream.on('drained', listener);
  236.  
  237. stream.emit('processed-chunk');
  238. expect(listener, 'was called once');
  239. });
  240.  
  241. it('should not emit drained if processing an items', function () {
  242. const stream = new QueueStream();
  243. stream._processing = 1;
  244. const listener = spy();
  245. stream.on('drained', listener);
  246.  
  247. stream.emit('processed-chunk');
  248. expect(listener, 'was not called');
  249. });
  250. });
  251.  
  252. context('and queue is not empty', function () {
  253. it('should call _processChunk with queued chunk', function () {
  254. const stream = new QueueStream();
  255. const item = {};
  256. stub(stream, '_processChunk');
  257. stream._queued = [item];
  258.  
  259. stream.emit('processed-chunk');
  260. expect(stream._processChunk, 'was called once');
  261. expect(stream._processChunk, 'was called with', item);
  262. });
  263. });
  264. });
  265. });