import { Observable } from 'rxjs';

export type ProgressHandler = (progress: number) => void;

/**
 * Accepts an array of observables and subscribes to all (but not exceeding the batch limit).
 * Useful if you need to queue several HTTP Requests and don't want to blow the connection limit.
 * @param args Observables to subscribe to
 * @param batchSize Maximum size of the batch (defaults to 40)
 * @returns A single observable containing an array of the results
 */

export function forkJoinBatch<T>(
  args: Observable<T>[],
  batchSize: number = 40,
  progressHandler?: ProgressHandler
): Observable<T[]> {
  const remainingSubs = [...args];
  const activeSubs: Observable<T>[] = [];
  const totalSubs = args.length;
  let completedCount = 0;

  const fireHandler = () => progressHandler && progressHandler((completedCount / totalSubs) * 100);

  const results: T[] = [];

  const result = new Observable<T[]>((sub) => {
    const ensureBatch = () => {
      fireHandler();

      // If there's no remaining subs and all have emitted a value, complete the forkJoin
      if (!remainingSubs.length && !activeSubs.length) {
        sub.next(results);
        sub.complete();
        return;
      }

      // Ensure that the batch is filled
      while (activeSubs.length <= batchSize && remainingSubs.length > 0) {
        // If the sub is closed, ignore everything
        if (sub.closed) {
          return;
        }

        const p = remainingSubs.pop();
        if (!p) {
          continue;
        }

        activeSubs.push(p);

        // Subscribe the inner sub
        const innerSub = p.subscribe(
          (j) => {
            // Remove the sub from the active subs
            activeSubs.splice(activeSubs.indexOf(p), 1);

            // Save it's value onto the results array (don't emit yet)
            results[args.indexOf(p)] = j;
            completedCount++;

            // Requeue to ensure the batch is full
            ensureBatch();
          },
          (error) => sub.error(error)
        );

        sub.add(innerSub);
      }

      fireHandler();
    };

    ensureBatch();
  });

  return result;
}
