src/lib/server/NodeBrowser.js
import { ObjectIds } from 'node-opcua/lib/opcua_node_ids.js';
import { BrowseDirection } from 'node-opcua/lib/services/browse_service.js';
import { AttributeIds } from 'node-opcua/lib/services/read_service';
import { VariantArrayType, DataType } from 'node-opcua/lib/datamodel/variant';
import Logger from 'gulplog';
import PromiseQueue from 'p-queue';
import ProjectConfig from '../../config/ProjectConfig';
import NodeId from '../model/opcua/NodeId';
import { ServerNode, ReferenceTypeIds, ReferenceTypeNames } from '../model/Node';
import Session from './Session';
/**
* A set of all hierarchical reference types.
* @type {Set<number>}
*/
const HierachicalReferencesTypeIds = new Set([
ReferenceTypeIds.HasChild,
ReferenceTypeIds.Aggregates,
ReferenceTypeIds.HasComponent,
ReferenceTypeIds.HasOrderedComponent,
ReferenceTypeIds.HasHistoricalConfiguration,
ReferenceTypeIds.HasProperty,
ReferenceTypeIds.HasSubtype,
ReferenceTypeIds.HasEventSource,
ReferenceTypeIds.HasNotifier,
ReferenceTypeIds.Organizes,
]);
/**
* A node discovered while browsing the server's database.
*/
export class BrowsedNode extends ServerNode {
/**
* Creates a new node.
* @param {Object} options The options to use.
* @param {?BrowsedNode} options.parent The parent node.
* @param {Object} options.reference The reference to pick metadata from.
*/
constructor({ parent, reference, nodeClass, name }) {
super({
parent,
nodeClass: reference ? reference.nodeClass : nodeClass,
name: reference ? reference.browseName.name : name,
});
if (reference) {
// NOTE: You should always provide reference, this only for #createChild
this.addReference(ReferenceTypeIds.toParent, reference.referenceTypeId.value);
/** The node's id. @type {NodeId} */
this.id = reference.nodeId;
}
/** The node's value
* @type {node-opcua~Variant} */
this.value = {};
}
/**
* Add multiple references at once.
* @param {Object[]} references The references to add.
*/
addReferences(references) {
references.forEach((reference) => {
this.addReference(reference.referenceTypeId.value, reference.nodeId.value);
});
}
/**
* Creates new child node.
* @param {Object} options The options to use.
* @see {Node#createChild}
*/
createChild(options) {
const node = super.createChild(options);
node.id = this.id;
return node;
}
}
/**
* Browses the server database.
*/
export default class NodeBrowser {
/**
* Creates a new node browser.
* @param {Object} options The options to use.
* @param {number} [options.concurrency=250] The maximum of nodes to process in parallel.
* @param {function(node: BrowsedNode): Promise<any>} options.handleNode A custom node handler.
* @param {boolean} [options.recursive] If the whole node tree should be processed.
*/
constructor({
concurrency = 250,
ignoreNodes = ProjectConfig.ignoreNodes,
handleNode,
recursive = true,
} = {}) {
/** The queue used to process nodes in parallel
* @type {p-queue~PQueue} */
this.queue = new PromiseQueue({
// autoStart: false,
concurrency,
});
/** A map of nodes already handled. Keys are ids, values are `true` if the node was already
* pushed and `false` otherwise.
* @type {Map<string, boolean>}
* */
this._handled = new Map();
this._waitingFor = {};
/** A regular expression matching all ignored nodes. @type {RegExp} */
this._ignoreNodesRegExp = new RegExp(`^(${ignoreNodes.map((n) => n.value).join('|')})`);
/** If the browser should recurse. @type {boolean} */
this._recursive = recursive;
/** If a warning should be printed for attempting to pull sort order nodes
* @type {boolean} */
this._printSortOrderWarning = recursive;
/** The custom node handler. @type {function(node: BrowsedNode): Promise<any>} */
this._handleNode = handleNode;
/** The number of pushed (discovered and handled) nodes. @type {number} */
this._pushed = 0;
/** A map that maps node ids against their discovered hierarchical parent nodes. Used to detect
* reference conflicts.
* @type {Map<string, string>} */
this.parentNode = new Map();
this.ensureHandled = new Set();
}
/**
* Reads the given node's value.
* @param {BrowsedNode} node The node to read.
*/
_readValue(node) {
if (!node.isVariable) {
return null;
}
return new Promise((resolve, reject) => {
this._session.readVariableValue(node.id, (err, result) => {
if (err) {
return reject(err);
}
return resolve(result && result.value);
});
}).then((value) => {
if (node.nodeId.endsWith('.SortOrder')) {
const removed = [];
const siblings = node.parent.children.map((c) => c.idName).filter((n) => n !== 'SortOrder');
const existing = value.value
.map(({ name }) => name)
.filter((name) => {
const exists = siblings.find((c) => c === name);
if (exists) {
return true;
}
removed.push(name);
return false;
});
// We could also add missing references here...
if (removed.length) {
// eslint-disable-next-line no-param-reassign
value.value = existing.map((name) => ({ namespaceIndex: 1, name }));
Logger.warn(`Removed ${removed.length} invalid references from '${node.nodeId}'`);
}
}
if (value) {
return value;
}
// Node is a variable but has no value -> Need to read dataType and arrayType directly.
return new Promise((resolve, reject) => {
const toRead = [AttributeIds.DataType, AttributeIds.ValueRank].map((attributeId) => ({
nodeId: node.id,
attributeId,
}));
this._session.read(
toRead,
(
err,
_,
[
{
value: { value: dataType },
},
{
value: { value: valueRank },
},
] = []
) => {
if (err) return reject(err);
// FIXME: valueRank -2 (Any) and -3 (ScalarOrOneDimension) are not handled properly here
const arrayType = valueRank < 0 ? VariantArrayType.Scalar : VariantArrayType.Array;
return resolve({
dataType: DataType[dataType.value],
arrayType,
value: null,
});
}
);
});
});
}
// FIXME: Debounce á la https://runkit.com/5c347d277da2ad00125b6bc2/5c50161cbc21520012c42290
// FIXME: Move to api
/**
* Browses the server address space at the given node id.
* @param {Object} options The options to use.
*/
_browse({ nodeId, browseDirection = BrowseDirection.Forward, resultMask = 63 }) {
return new Promise((resolve, reject) => {
this._session.browse({ nodeId, browseDirection, resultMask }, (err, data = []) =>
err ? reject(err) : resolve(data[0].references)
);
});
}
/**
* Browses a node.
* @param {BrowsedNode} node The node to browse.
*/
_browseNode(node) {
return this._browse({ nodeId: node.id }).then((allReferences) => {
const children = [];
const references = [];
const typeDefinitionReference = allReferences.find(
(ref) => ref.referenceTypeId.value === ReferenceTypeIds.HasTypeDefinition
);
const isUserGroup =
typeDefinitionReference &&
typeDefinitionReference.nodeId.value === 'ObjectTypes.ATVISE.Group';
allReferences.forEach((reference) => {
// "Cast" ref.nodeId to NodeId
Object.setPrototypeOf(reference.nodeId, NodeId.prototype);
const ignored = this._ignoreNodesRegExp.test(reference.nodeId.value);
const external = this._isExternalReference(reference.nodeId.value);
if (
HierachicalReferencesTypeIds.has(reference.referenceTypeId.value) &&
!ignored &&
!external
) {
if (
reference.referenceTypeId.value === ReferenceTypeIds.HasHistoricalConfiguration ||
(isUserGroup && reference.nodeId.value.split(node.nodeId).length === 1)
) {
references.push(reference);
return;
}
if (
!ProjectConfig.preserveSortOrderNodes &&
reference.nodeId.value.endsWith('.SortOrder')
) {
if (this._printSortOrderWarning) {
Logger.warn(`Skipped pulling an atvise builder sort order node.
- Reason: These nodes are not consistent across pulls.
- You can force pulling them by setting Atviseproject.preserveSortOrderNodes.`);
this._printSortOrderWarning = false;
}
return;
}
const [prefix, subPath] = reference.nodeId.value.split(node.id.value);
if (!subPath || prefix !== '') {
if (!ProjectConfig.isExternal(reference.nodeId.parent.value)) {
references.push(reference);
if (this._handled.get(reference.nodeId.value) === undefined) {
this.ensureHandled.add(reference.nodeId.value);
}
return;
}
}
const earlierParent = this.parentNode.get(reference.nodeId.value);
if (earlierParent) {
Logger.warn(`'${
reference.nodeId.value
}' was discovered as a child node of both '${earlierParent}' and '${node.id.value}'.
- Reference type (to the latter): ${ReferenceTypeNames[reference.referenceTypeId.value]} (${
reference.referenceTypeId.value
})`);
}
if (this._handled.get(reference.nodeId.value) === undefined) {
this.parentNode.set(reference.nodeId.value, node.id.value);
children.push(
new BrowsedNode({
parent: node,
reference,
})
);
} // else node is already handled
} else if (reference.referenceTypeId.value !== 50) {
// Added by atvise builder
// 'HasModelParent' reference set in atvise 3.3.2+
// Always points to parent node, which atscm already handles
if (
reference.referenceTypeId.value === 334 &&
reference.nodeId.value === (node.parent && node.parent.id.value)
) {
return;
}
// Do not add ignored
if (!ignored) {
references.push(reference);
} else {
Logger.debug(
`Ignored reference from ${node.id.value} (${
ReferenceTypeNames[reference.referenceTypeId.value]
}) to ${reference.nodeId.value}`
);
}
}
});
// eslint-disable-next-line no-param-reassign
node.children = children;
node.addReferences(references);
return { children, references };
});
}
/**
* Finishes processing a given node: After calling {@link NodeBrowser#_handleNode}, it resolves
* is's dependencies.
* @param {BrowsedNode} node The node handled.
*/
async _push(node) {
if (this._handled.get(node.id.value)) {
Logger.error('Prevented duplicate handling of', node.id.value);
return;
}
// Prevent duplicate pushes while reading value file
this._handled.set(node.id.value, 'processing');
// eslint-disable-next-line no-param-reassign
node.value = (await this._readValue(node)) || node.value;
// TODO: Remove additional properties (children, ...) for better memory-usage
const originalId = node.id.value;
await this._handleNode(node);
this._pushed += 1;
// Do not proceed if queue is stopped (because an error occured)
if (!this._recursive || this.queue.isPaused) {
// Queue is stopped, not adding...
return;
}
this.queue.addAll(node.children.map((child) => () => this._process(child)));
const idValue = node.id.value;
this._handled.set(idValue, true);
this.ensureHandled.delete(originalId);
// Handle dependencies
if (this._waitingFor[idValue]) {
this._waitingFor[idValue].forEach((dep) => {
// eslint-disable-next-line no-param-reassign
if (--dep.dependencies === 0) {
// Adding as dependencies are resolved
this.queue.add(() => this._push(dep)).catch(this._reject);
}
});
delete this._waitingFor[idValue];
}
}
/**
* Instructs the browser to handle a node that would otherwise be queued behind others (eg: its
* parent node).
* @param {BrowsedNode} node The node to add.
* @return {Promise<?BrowsedNode>} The fully processed node.
*/
addNode(node) {
if (this.queue.isPaused) {
Logger.debug('Queue is stopped, not adding...');
return Promise.resolve();
}
return this.queue.add(() => this._handleNode(node, { transform: false })).catch(this._reject);
}
/**
* Returns `true` for node ids that should be treated as external references.
* @param {string|number} idValue Value of the id to check.
* @return {boolean} If the id should be treated as external.
*/
_isExternalReference(idValue) {
// FIXME: Allow plugins
return typeof idValue !== 'string' || !this._sourceNodesRegExp.test(idValue);
}
/**
* Returns `true` if a node has dependencies it should be queued behind.
* @param {BrowsedNode} node The node to check.
*/
_hasDependencies(node) {
let dependencyCount = 0;
for (const references of node.references.values()) {
for (const reference of references) {
if (
this._handled.get(reference) !== true &&
!this._isExternalReference(reference) &&
!this._ignoreNodesRegExp.test(reference)
) {
dependencyCount++;
this._waitingFor[reference] = (this._waitingFor[reference] || []).concat(node);
}
}
}
// eslint-disable-next-line no-param-reassign
node.dependencies = dependencyCount;
return dependencyCount > 0;
}
/**
* Processes a single node: Requires special error handling.
* @param {BrowsedNode} node The node to process.
* @return {Promise<?BrowsedNode>} The fully processed node.
*/
async _process(node) {
try {
if (this._handled.has(node.id.value)) {
// Already queued
return undefined;
}
this._handled.set(node.id.value, false);
await this._browseNode(node);
if (!this._hasDependencies(node)) {
await this._push(node);
}
} catch (err) {
this._reject(err);
}
return node;
}
/**
* Discovers and browses the source nodes.
* @param {Array<string, NodeId>} nodeIds The source ids.
* @return {Promise<Node[]>} Resolved once finished.
*/
_getSourceNodes(nodeIds) {
const browseUp = ({ nodeId, path = [] }) =>
this._browse({
nodeId,
browseDirection: BrowseDirection.Inverse,
}).then((references) => {
for (const reference of references) {
if (HierachicalReferencesTypeIds.has(reference.referenceTypeId.value)) {
path.unshift(reference.nodeId);
return reference.nodeId.value === ObjectIds.RootFolder
? path
: browseUp({ nodeId: reference.nodeId, path });
}
}
throw new Error(`Unable to find parent node of ${nodeId}`);
});
const browseDown = (path, target) =>
Promise.all(
path.map((nodeId, i) =>
this._browse({ nodeId }).then((references) =>
references.find(
(ref) => ref.nodeId.value === (path[i + 1] ? path[i + 1].value : target.value)
)
)
)
);
return Promise.all(
nodeIds.map((nodeId) =>
browseUp({ nodeId })
.then((path) => browseDown(path, nodeId))
.then((pathDown) =>
pathDown.reduce((parent, reference) => new BrowsedNode({ parent, reference }), null)
)
)
);
}
/**
* Starts the browser of the given nodes.
* @param {NodeId[]} nodeIds The nodes to browse.
* @return {Promise<any>} Resolved once all nodes are finished.
*/
async browse(nodeIds) {
this._sourceNodesRegExp = new RegExp(
`^(${nodeIds.map(({ value }) => `${value.replace(/\./g, '\\.')}`).join('|')})`
);
this._session = await Session.create();
// Add source nodes
const nodes = await this._getSourceNodes(nodeIds);
this.queue.addAll(nodes.map((node) => () => this._process(node)));
// Queue error handling
let processError = null;
this._reject = (err) => {
if (processError) {
// Multiple errors occured. In most cases this means, that the server connection was closed
// after the first error.
Logger.debug('Additional error', err);
return;
}
processError = err;
this.queue.pause();
this.queue.clear();
};
return new Promise((resolve, reject) => {
this.queue.onIdle().then(async () => {
await Session.close(this._session);
if (processError) {
reject(processError);
return;
}
if (Object.keys(this._waitingFor).length) {
const unresolved = Object.entries(this._waitingFor).reduce(
(all, [to, children]) =>
all.concat(
children.map((c) => ({
from: c.id.value,
to,
type:
ReferenceTypeNames[
Array.from(c.references).find(([, refs]) => refs.has(to))[0]
],
}))
),
[]
);
reject(
new Error(`Unable to resolve reference${unresolved.length > 1 ? 's' : ''}:
${unresolved.map(({ from, type, to }) => `${from} → (${type}) → ${to}`).join('\n ')}
`)
);
return;
}
if (this.ensureHandled.size) {
reject(
new Error(`Some referenced nodes were not handled,
- ${Array.from(this.ensureHandled).join('\n - ')}`)
);
return;
}
if (Array.from(this._handled).find(([, pushed]) => !pushed)) {
throw new Error('A node was processed, but not pushed');
}
resolve();
});
});
}
}