Home Manual Reference Source Test
import QueueStream from 'atscm/src/lib/server/QueueStream.js'
public class | source

QueueStream

Extends:

Stream → QueueStream

Direct Subclass:

CallMethodStream

A stream that process atvise server requests in parallel.

Test:

Constructor Summary

Public Constructor
public

constructor(options: Object)

Creates a new QueueStream with the given options.

Member Summary

Public Members
public get

true if there are queued operations or an operation is running right now.

public get

The number of processed chunks per second.

public get

The number of chunks already processed.

public get

true if there are no queued operations.

Private Members
private

The maximum of parallel tasks to execute

private

The number of chunks processed so far.

private

The number of running operations.

private

_queued: *[]

The queued chunks.

private

The timestamp of the date when the stream was created.

Method Summary

Public Methods
public abstract

processChunk(chunk: *, handleErrors: function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function))

The function to call when a chunk is ready to be processed.

public abstract

The error message to use when processing a chunk fails.

Private Methods
private

_enqueueChunk(chunk: *)

Enqueues the given chunk for processing.

private

_flush(callback: Function)

Waits for pending operations to complete.

private

_processChunk(chunk: *)

Calls QueueStream#processChunk and handles errors and invalid status codes.

private

_transform(chunk: *, enc: string, callback: Function)

Calls QueueStream#_enqueueChunk as soon as the stream's session is opened.

Inherited Summary

From class Stream
private

true if the stream's atvise server session should be kept alive once the stream ends.

private

_flush(callback: function(err: ?Error, data: Object))

Called just before the stream is closed: Closes the open session.

Public Constructors

public constructor(options: Object) source

Creates a new QueueStream with the given options.

Override:

Stream#constructor

Params:

NameTypeAttributeDescription
options Object
  • optional

The options to use.

options.maxParallel number
  • optional

The maximum of parallel tasks to execute.

Test:

Public Members

public get hasPending: boolean source

true if there are queued operations or an operation is running right now.

Test:

public get opsPerSecond: number source

The number of processed chunks per second.

Test:

public get processed: number source

The number of chunks already processed.

Test:

public get queueEmpty: boolean source

true if there are no queued operations.

Test:

Private Members

private _maxParallel: number source

The maximum of parallel tasks to execute

private _processed: Number source

The number of chunks processed so far.

private _processing: Number source

The number of running operations.

private _queued: *[] source

The queued chunks.

private _start: Number source

The timestamp of the date when the stream was created.

Public Methods

public abstract processChunk(chunk: *, handleErrors: function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function)) source

The function to call when a chunk is ready to be processed. Must be overridden by all subclasses..

Params:

NameTypeAttributeDescription
chunk *

The chunk to process.

handleErrors function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function)

Call this function to handle errors and bad status codes. When no error occured and the status code received is fine, onSuccess is called. Further processing of valid chunks, for example Recursions should happen in onSuccess. Note that onSuccess is an asynchronous function with a callback as an argument..

Example:

Basic implementation
class MyQueueStream extends QueueStream {
  ...
  processChunk(chunk, handle) {
    client.session.doSomething((err, result, statusCode) => handle(err, statusCode, done => {
      // This is called if err is falsy and status code is node-opcua~StatusCodes.Good
      doSomethingWith(result);
      done();
    }));
  }
  ...
}
Implement a recursion
class RecursiveQueueStream extends QueueStream {
  ...
  processChunk(chunk, handle) {
    client.session.doSomething((err, result, statusCode) => handle(err, statusCode, done => {
      // Write the result back to the stream.
      // This means, that `result` will be queued and, as soon as possible, #processChunk will
      // be called with `result` as the `chunk` argument.
      this.write(result, null, done);
    }));
  }
  ...
}
Allowing some invalid status codes
import { StatusCodes } from 'node-opcua';

class FriendlyQueueStream extends QueueStream {
  ...
  processChunk(chunk, handle) {
    client.session.doSomething((err, result, statusCode) => {
      if (statusCode === StatusCodes.BadUserAccessDenied) {
        Logger.warn(`Ignored invalid status: ${statusCode.description}`);
        handle(err, StatusCodes.Good, done => done());
      } else {
        handle(err, statusCode, done => done());
      }
    });
  }
  ...
}

Test:

public abstract processErrorMessage(chunk: *): string source

The error message to use when processing a chunk fails. Must be overridden by all subclasses!.

Params:

NameTypeAttributeDescription
chunk *

The chunk being processed.

Return:

string

The error message to use.

Test:

Private Methods

private _enqueueChunk(chunk: *) source

Enqueues the given chunk for processing.

Params:

NameTypeAttributeDescription
chunk *

The chunk to enqueue.

Test:

private _flush(callback: Function) source

Waits for pending operations to complete.

Override:

Stream#_flush

Params:

NameTypeAttributeDescription
callback Function

Called once all queued chunks have been processed.

Test:

private _processChunk(chunk: *) source

Calls QueueStream#processChunk and handles errors and invalid status codes.

Params:

NameTypeAttributeDescription
chunk *

The chunk to process.

Emit:

*

Emits a processed-chunk event once a chunk was processed.

Test:

private _transform(chunk: *, enc: string, callback: Function) source

Calls QueueStream#_enqueueChunk as soon as the stream's session is opened.

Params:

NameTypeAttributeDescription
chunk *

The chunk to transform.

enc string

The encoding used.

callback Function

Called once the chunk has been enqueued.

Test: