test/src/lib/server/NodeStream.spec.js
import { resolveNodeId, StatusCodes, NodeClass, ReferenceTypeIds } from 'node-opcua';
import { spy } from 'sinon';
import Logger from 'gulplog';
import expect from '../../../expect';
import { waitForEvent } from '../../../helpers/Emitter';
import NodeStream from '../../../../src/lib/server/NodeStream';
import NodeId from '../../../../src/lib/model/opcua/NodeId';
class StubNodeStream extends NodeStream {
_transform(chunk, enc, callback) {
callback(null, chunk);
}
}
class NoWriteNodeStream extends StubNodeStream {
// eslint-disable-next-line @typescript-eslint/no-empty-function
_writeNodesToBrowse() {}
}
class NonRecursive extends NodeStream {
processChunk(chunk, handleErrors) {
setImmediate(() => {
handleErrors(null, StatusCodes.Good, (done) => {
this.push(chunk);
done();
});
});
}
}
/** @test {NodeStream} */
describe.skip('NodeStream', function () {
const testNodes = [new NodeId('ns=1;s=AGENT.DISPLAYS'), new NodeId('ns=1;s=AGENT.OBJECTS')];
function referenceTypeId(referenceType) {
return new NodeId(NodeId.NodeIdType.NUMERIC, referenceType);
}
/** @test {NodeStream#constructor} */
describe('#constructor', function () {
it('should fail without nodesToBrowse', function () {
expect(() => new NodeStream(), 'to throw', /nodesToBrowse is required/);
});
it('should throw with invalid ignoreNodes', function () {
expect(
() => new NodeStream(testNodes, { ignoreNodes: 'test' }),
'to throw',
/ignoreNodes must be an array/
);
});
it('should store "recursive" option', function () {
expect(new NoWriteNodeStream(testNodes, { recursive: false }).recursive, 'to be', false);
});
it('should create ignoredRexExp', function () {
expect(
new NoWriteNodeStream(testNodes, {
ignoreNodes: [new NodeId('Test.Node')],
}).ignoredRegExp,
'to equal',
/^(ns=1;s=Test.Node)/
);
});
it('should warn on ignored nodes', function (done) {
Logger.once('warn', (message) => {
expect(message, 'to match', /ignored/);
done();
});
expect(
new NoWriteNodeStream(testNodes, {
ignoreNodes: [testNodes[0]],
}),
'to be defined'
);
});
it('should error without nodes to browse', function () {
expect(
() => new StubNodeStream(testNodes, { ignoreNodes: testNodes }),
'to throw error',
/Nothing to browse/
);
});
/* it('should listen to drained events', function() {
expect((new StubNodeStream(testNodes)).listenerCount('drained'), 'to equal', 1);
}); */
});
/* @test {NodeStream#_writeNodesToBrowse} */
describe('#_writeNodesToBrowse', function () {
it('should forward read errors', function () {
const stream = new StubNodeStream(testNodes);
stream.prependOnceListener('session-open', () => {
stream.session.read = (_, cb) => cb(new Error('Read error'));
});
return expect(stream, 'to error with', 'Read error');
});
it('should error on bad status code', function () {
const node = new NodeId('Does.Not.Exist');
const stream = new StubNodeStream([testNodes[0], node, testNodes[1]]);
return expect(stream, 'to error with', new RegExp(`^Error reading ${node}`));
});
it('should write read results', async function () {
const stream = new NonRecursive(testNodes);
spy(stream, 'write');
await expect(stream, 'to yield objects satisfying', 'to have length', 2);
return expect(stream.write, 'was called times', 2);
});
it('should call #end on drained', async function () {
const stream = new NonRecursive(testNodes);
spy(stream, 'end');
await waitForEvent(stream, 'initial-read-complete');
await waitForEvent(stream, 'drained');
return expect(stream.end, 'was called once');
});
});
/** @test {NodeStream#processErrorMessage} */
describe('#processErrorMessage', function () {
it('should decorate error message', function () {
const nodeId = resolveNodeId('ns=1;s=AGENT.DISPLAYS.Main');
expect(NodeStream.prototype.processErrorMessage({ nodeId }), 'to contain', nodeId.toString());
});
});
/** @test {NodeStream#processChunk} */
describe('#processChunk', function () {
it('should forward browse errors', function () {
const stream = new NodeStream(testNodes).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(new Error('Browse error'));
};
});
return expect(stream, 'to error with', /Browse error/);
});
it('should emit error without results', function () {
const stream = new NodeStream(testNodes).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(null, null);
};
});
return expect(stream, 'to error with', /No results/);
});
it('should emit error with empty results', function () {
const stream = new NodeStream(testNodes).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(null, []);
};
});
return expect(stream, 'to error with', /No results/);
});
it('should emit error with empty results after initial write', function () {
const stream = new NodeStream(testNodes).prependOnceListener('initial-read-complete', () => {
stream.session.browse = (options, callback) => {
callback(null, []);
};
});
return expect(stream, 'to error with', /No results/);
});
it('should not push parent nodes', function () {
const stream = new NodeStream(testNodes, { recursive: false }).prependOnceListener(
'session-open',
() => {
stream.session.browse = (options, callback) => {
callback(null, [
{
statusCode: StatusCodes.Good,
references: [
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent), // anything valid
$nodeClass: NodeClass.Variable,
nodeId: new NodeId('ns=1;s=AGENT'),
},
],
},
]);
};
}
);
return expect(stream, 'to yield objects satisfying', 'to have length', testNodes.length);
});
it('should not push ignored nodes', function () {
const stream = new NodeStream(testNodes, {
recursive: false,
ignoreNodes: [new NodeId('ns=1;s=AGENT.DISPLAYS.Main')],
}).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(null, [
{
statusCode: StatusCodes.Good,
references: [
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent),
$nodeClass: NodeClass.Variable,
nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.Main'),
},
],
},
]);
};
});
return expect(stream, 'to yield objects satisfying', 'to have length', testNodes.length);
});
it('should push nodes to browse', async function () {
const stream = new NodeStream(testNodes, { recursive: false });
return expect(
stream,
'to yield objects satisfying',
'to have items satisfying',
(item, i) => {
expect(item, 'to have property', 'nodeId', testNodes[i]);
expect(item, 'to have property', 'nodeClass', NodeClass.Object);
}
);
});
it('should browse discovered nodes', async function () {
const stream = new NodeStream([testNodes[0]]).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(null, [
{
statusCode: StatusCodes.Good,
references: [
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent),
$nodeClass: NodeClass.Object,
nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.MAIN'),
},
],
},
]);
};
});
return expect(stream, 'to yield objects satisfying', 'to have length', 2);
});
it('should push discovered object nodes', function () {
const stream = new NodeStream(testNodes).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(null, [
{
statusCode: StatusCodes.Good,
references: [
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent),
$nodeClass: NodeClass.Object,
nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.MAIN'),
},
],
},
]);
};
});
return expect(stream, 'to yield objects satisfying', 'to have length', 3);
});
it('should push discovered variable nodes', async function () {
const stream = new NodeStream(testNodes).prependOnceListener('session-open', () => {
stream.session.browse = (options, callback) => {
callback(null, [
{
statusCode: StatusCodes.Good,
references: [
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent),
$nodeClass: NodeClass.Variable,
nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.Main'),
},
],
},
]);
};
});
spy(stream, 'write');
await expect(stream, 'to yield objects satisfying', 'to have length', 3);
return expect(stream.write, 'was called times', 3);
});
it('should write discovered nodes if recursive', function () {
let called = 0;
const stream = new NodeStream([testNodes[0]], { recursive: true }).prependOnceListener(
'session-open',
() => {
stream.session.browse = (options, callback) => {
if (called === 2) {
callback(null, [
{
statusCode: StatusCodes.Good,
references: [],
},
]);
} else {
called++;
callback(null, [
{
statusCode: StatusCodes.Good,
references: [
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent),
$nodeClass: NodeClass.Variable,
nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.Main'),
},
{
referenceTypeId: referenceTypeId(ReferenceTypeIds.HasComponent),
$nodeClass: NodeClass.Object,
nodeId: new NodeId('ns=1;s=AGENT.DISPLAYS.MAIN'),
},
],
},
]);
}
};
}
);
spy(stream, 'write');
return expect(stream, 'to yield objects satisfying', 'to have length', 3).then(() =>
expect(stream.write, 'was called times', 3)
);
});
});
});