All files / src/services monitoringService.server.ts

68.9% Statements 113/164
92.85% Branches 13/14
77.77% Functions 7/9
74.45% Lines 102/137

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 1382x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x       2x 2x 3x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 6x 8x 8x 2x 2x 2x 2x 2x 2x       2x               2x 12x 12x 12x 12x 12x 12x 14x 14x 14x 14x 2x 2x 2x 2x 2x 2x 1x 1x 1x 1x 1x 5x 1x 1x 6x 6x 6x 6x 6x 6x 5x 5x 2x 2x 2x 4x 4x 2x     2x 2x 1x     1x 1x                             14x 14x 3x   11x 11x 11x 11x 11x       24x  
// src/services/monitoringService.server.ts
import {
  flyerQueue,
  emailQueue,
  analyticsQueue,
  cleanupQueue,
  weeklyAnalyticsQueue,
  tokenCleanupQueue,
} from './queues.server';
import {
  analyticsWorker,
  cleanupWorker,
  emailWorker,
  flyerWorker,
  weeklyAnalyticsWorker,
  tokenCleanupWorker,
  flyerProcessingService,
} from './workers.server';
import type { Queue } from 'bullmq';
 
// Re-export flyerProcessingService for integration tests that need to inject mocks.
// This ensures tests get the SAME instance that the workers use, rather than creating
// a new instance by importing workers.server.ts directly.
export { flyerProcessingService };
import { NotFoundError, ValidationError } from './db/errors.db';
import { logger } from './logger.server';
 
class MonitoringService {
  /**
   * Retrieves the current running status of all registered BullMQ workers.
   * @returns A promise that resolves to an array of worker statuses.
   */
  async getWorkerStatuses() {
    const workers = [
      flyerWorker,
      emailWorker,
      analyticsWorker,
      cleanupWorker,
      weeklyAnalyticsWorker,
      tokenCleanupWorker,
    ];
    return Promise.all(
      workers.map(async (worker) => ({
        name: worker.name,
        isRunning: worker.isRunning(),
      })),
    );
  }
 
  /**
   * Retrieves job counts for all registered BullMQ queues.
   * @returns A promise that resolves to an array of queue statuses.
   */
  async getQueueStatuses() {
    const queues = [
      flyerQueue,
      emailQueue,
      analyticsQueue,
      cleanupQueue,
      weeklyAnalyticsQueue,
      tokenCleanupQueue,
    ];
    return Promise.all(
      queues.map(async (queue) => ({
        name: queue.name,
        counts: await queue.getJobCounts(
          'waiting',
          'active',
          'completed',
          'failed',
          'delayed',
          'paused',
        ),
      })),
    );
  }
 
  /**
   * Retries a specific failed job in a given queue.
   * @param queueName The name of the queue.
   * @param jobId The ID of the job to retry.
   * @param userId The ID of the user initiating the retry.
   */
  async retryFailedJob(queueName: string, jobId: string, userId: string) {
    const queueMap: { [key: string]: Queue } = {
      'flyer-processing': flyerQueue,
      'email-sending': emailQueue,
      'analytics-reporting': analyticsQueue,
      'file-cleanup': cleanupQueue,
      'weekly-analytics-reporting': weeklyAnalyticsQueue,
      'token-cleanup': tokenCleanupQueue,
    };
 
    const queue = queueMap[queueName];
    if (!queue) {
      throw new NotFoundError(`Queue '${queueName}' not found.`);
    }
 
    const job = await queue.getJob(jobId);
    if (!job) {
      throw new NotFoundError(`Job with ID '${jobId}' not found in queue '${queueName}'.`);
    }

    const jobState = await job.getState();
    if (jobState !== 'failed') {
      throw new ValidationError([], `Job is not in a 'failed' state. Current state: ${jobState}.`);
    }

    await job.retry();
    logger.info(`[Admin] User ${userId} manually retried job ${jobId} in queue ${queueName}.`);
  }

  /**
   * Retrieves the status of a single job from the flyer processing queue.
   * @param jobId The ID of the job to retrieve.
   * @returns A promise that resolves to a simplified job status object.
   */
  async getFlyerJobStatus(jobId: string): Promise<{
    id: string;
    state: string;
    progress: number | object | string | boolean;
    returnValue: unknown;
    failedReason: string | null;
  }> {
    const job = await flyerQueue.getJob(jobId);
    if (!job) {
      throw new NotFoundError('Job not found.');
    }
    const state = await job.getState();
    const progress = job.progress;
    const returnValue = job.returnvalue;
    const failedReason = job.failedReason;
    return { id: job.id!, state, progress, returnValue, failedReason };
  }
}

export const monitoringService = new MonitoringService();