QueueStream
Extends:
Direct Subclass:
Indirect Subclass:
A stream that process atvise server requests in parallel.
Test:
Constructor Summary
Public Constructor | ||
public |
constructor(options: Object) Creates a new QueueStream with the given options. |
Member Summary
Public Members | ||
public get |
|
|
public get |
The number of processed chunks per second. |
|
public get |
The number of chunks already processed. |
|
public get |
|
Private Members | ||
private |
The maximum of parallel tasks to execute |
|
private |
The number of chunks processed so far. |
|
private |
The number of running operations. |
|
private |
_queued: *[] The queued chunks. |
|
private |
The timestamp of the date when the stream was created. |
Method Summary
Public Methods | ||
public abstract |
processChunk(chunk: *, handleErrors: function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function)) The function to call when a chunk is ready to be processed. |
|
public abstract |
processErrorMessage(chunk: *): string The error message to use when processing a chunk fails. |
Private Methods | ||
private |
_enqueueChunk(chunk: *) Enqueues the given chunk for processing. |
|
private |
Waits for pending operations to complete. |
|
private |
_processChunk(chunk: *) Calls QueueStream#processChunk and handles errors and invalid status codes. |
|
private |
_transform(chunk: *, enc: string, callback: Function) Calls QueueStream#_enqueueChunk as soon as the stream's session is opened. |
Inherited Summary
From class Stream | ||
private |
|
|
private |
Called just before the stream is closed: Closes the open session. |
Public Constructors
public constructor(options: Object) source
Creates a new QueueStream with the given options.
Override:
Stream#constructorTest:
Public Members
public get hasPending: boolean source
true
if there are queued operations or an operation is running right now.
Test:
Private Members
Public Methods
public abstract processChunk(chunk: *, handleErrors: function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function)) source
The function to call when a chunk is ready to be processed. Must be overridden by all subclasses..
Params:
Name | Type | Attribute | Description |
chunk | * | The chunk to process. |
|
handleErrors | function(err: Error, statusCode: node-opcua~StatusCodes, onSuccess: function) | Call this function to handle errors and bad status codes. When no error occured
and the status code received is fine, |
Example:
class MyQueueStream extends QueueStream {
...
processChunk(chunk, handle) {
client.session.doSomething((err, result, statusCode) => handle(err, statusCode, done => {
// This is called if err is falsy and status code is node-opcua~StatusCodes.Good
doSomethingWith(result);
done();
}));
}
...
}
class RecursiveQueueStream extends QueueStream {
...
processChunk(chunk, handle) {
client.session.doSomething((err, result, statusCode) => handle(err, statusCode, done => {
// Write the result back to the stream.
// This means, that `result` will be queued and, as soon as possible, #processChunk will
// be called with `result` as the `chunk` argument.
this.write(result, null, done);
}));
}
...
}
import { StatusCodes } from 'node-opcua';
class FriendlyQueueStream extends QueueStream {
...
processChunk(chunk, handle) {
client.session.doSomething((err, result, statusCode) => {
if (statusCode === StatusCodes.BadUserAccessDenied) {
Logger.warn(`Ignored invalid status: ${statusCode.description}`);
handle(err, StatusCodes.Good, done => done());
} else {
handle(err, statusCode, done => done());
}
});
}
...
}
public abstract processErrorMessage(chunk: *): string source
The error message to use when processing a chunk fails. Must be overridden by all subclasses!.
Params:
Name | Type | Attribute | Description |
chunk | * | The chunk being processed. |
Private Methods
private _enqueueChunk(chunk: *) source
Enqueues the given chunk for processing.
Params:
Name | Type | Attribute | Description |
chunk | * | The chunk to enqueue. |
private _flush(callback: Function) source
Waits for pending operations to complete.
Override:
Stream#_flushParams:
Name | Type | Attribute | Description |
callback | Function | Called once all queued chunks have been processed. |
Test:
private _processChunk(chunk: *) source
Calls QueueStream#processChunk and handles errors and invalid status codes.
Params:
Name | Type | Attribute | Description |
chunk | * | The chunk to process. |
Emit:
* |
Emits a |
private _transform(chunk: *, enc: string, callback: Function) source
Calls QueueStream#_enqueueChunk as soon as the stream's session is opened.