Home Manual Reference Source Test

src/lib/server/WaitingStream.js

import Logger from 'gulplog';
import ProjectConfig from '../../config/ProjectConfig';
import QueueStream from './QueueStream';

/**
 * A mixin that transforms a regular stream into a {@link WaitingStream}, which makes it wait for a
 * node's dependencies to be transformed.
 * @param {QueueStream} Stream The node stream to inherit from.
 * @return {Class<WaitingStream>} The resulting stream class.
 * @example <caption>Basic usage</caption>
 * import QueueStream from 'atscm/src/lib/server/QueueStream';
 * import { waitForDependencies } from 'atscm/src/lib/server/WaitingStream';
 *
 * class MyWaitingStream extends waitForDependencies(QueueStream) {
 *
 *   dependenciesFor(file) {
 *     return [
 *       // An array of node ids
 *     ];
 *   }
 *
 * }
 */
export const waitForDependencies = (Stream) =>
  class Waiting extends Stream {
    /**
     * Creates a new WaitingStream.
     * @param {Object} options Options passed to the underlying {@link Stream} subclass.
     */
    constructor(options = {}) {
      super(options);

      this._dependencies = {};
      this._waitingFor = {};
      this._finishedProcessing = {};

      this.on('processed-chunk', (file) => {
        const key = file.nodeId.toString();
        const dependents = this._waitingFor[key];
        delete this._waitingFor[key];
        this._finishedProcessing[key] = true;

        if (dependents) {
          dependents.forEach((d) => {
            const k = d.nodeId.toString();
            this._dependencies[k] -= 1;

            if (this._dependencies[k] === 0) {
              super._enqueueChunk(d);
              delete this._dependencies[k];
            }
          });
        }

        this.emit('finished-chunk');
      });
    }

    /**
     * **Must be implemented by all subclasses:** Returns the dependencies for a given file.
     * @typedef {function(file: AtviseFile): NodeId[]} WWaiting#dependenciesFor
     * @param {AtviseFile} file The file to get the dependencies for.
     * @return {NodeId[]} The file's dependencies.
     * @abstract
     */
    // eslint-disable-next-line no-unused-vars
    dependenciesFor(file) {
      throw new Error('#dependenciesFor must be implemented by all subclasses');
    }

    /**
     * Enqueues a file after it's dependencies.
     * @param {AtviseFile} file The file to process.
     */
    _enqueueChunk(file) {
      const dependencies = this.dependenciesFor(file)
        .filter((id) => {
          if (id.namespace === 0 || !id.value || ProjectConfig.nodes.includes(id)) {
            return false;
          }

          return (
            !id.value.match(/^(Object|Variable)Types$/) &&
            !id.value.match(/^(Object|Variable)Types\.ATVISE/) &&
            !id.value.match(/^SYSTEM\.LIBRARY\.ATVISE/)
          );
        })
        .map((id) => id.toString());

      if (dependencies && dependencies.length) {
        const needToWait = dependencies.reduce((wait, dependency) => {
          if (!this._finishedProcessing[dependency]) {
            const name = file.nodeId.toString();

            this._waitingFor[dependency] = (this._waitingFor[dependency] || []).concat(file);
            this._dependencies[name] = (this._dependencies[name] || 0) + 1;

            return true;
          }
          return wait;
        }, false);

        if (needToWait) {
          return;
        }
      }

      super._enqueueChunk(file);
    }

    /**
     * Delays the streams end until all chunks have been processed.
     * @param {function(err: Error)} callback Called once all chunks have been processed.
     */
    _flush(callback) {
      const checkProcessing = () => {
        if (this._processing || this.hasPending) {
          this.once('finished-chunk', () => checkProcessing());
        } else if (Object.keys(this._waitingFor).length > 0) {
          const first = Object.keys(this._waitingFor)[0];

          Logger.debug(`Missing dependency. Trying to process dependents of ${first}`);

          this.once('finished-chunk', () => checkProcessing());
          this.emit('processed-chunk', { nodeId: first });
        } else {
          super._flush(callback);
        }
      };

      checkProcessing();
    }
  };

/**
 * A {@link QueueStream} that waits for a file's dependencies to be processed before the file is
 * processed itself.
 * @abstract
 */
export default class WaitingStream extends waitForDependencies(QueueStream) {}