Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 | 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 25x 25x 25x 25x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 3x 3x 3x 3x 1x 2x 2x 2x 8x 8x 8x 11x 2x 2x 2x 2x 8x 8x 8x 8x 8x 8x 2x 2x 2x 2x 2x 2x 2x 2x 2x 8x 8x 2x 8x 8x 8x 6x 6x 9x 9x 9x 9x 9x 6x 6x 2x 2x 2x 2x 2x 2x 2x 2x 7x 7x 7x 6x 1x 1x 5x 5x 5x 12x 12x 8x 8x 12x 5x 8x 5x 5x 8x 6x 5x 6x 6x 8x 2x 4x 3x 4x 2x 2x 24x 19x 19x 8x 8x 2x 2x 6x 6x 6x 3x 6x 1x 1x 19x 19x 4x 4x 4x 4x 4x 2x 1x 19x 19x 4x 4x 4x 4x 4x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 19x 2x 2x 19x 4x 4x 4x 4x 4x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 19x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 24x 2x 2x 2x 2x 2x | // src/services/backgroundJobService.ts
import cron from 'node-cron';
import type { Logger } from 'pino';
import type { Queue } from 'bullmq';
import { formatCurrency } from '../utils/formatUtils';
import { getSimpleWeekAndYear, getCurrentDateISOString } from '../utils/dateUtils';
import type { Notification, WatchedItemDeal } from '../types';
// Import types for repositories from their source files
import type { PersonalizationRepository } from './db/personalization.db';
import type { NotificationRepository } from './db/notification.db';
import { analyticsQueue, weeklyAnalyticsQueue, tokenCleanupQueue } from './queueService.server';
type UserDealGroup = {
userProfile: { user_id: string; email: string; full_name: string | null };
deals: WatchedItemDeal[];
};
interface EmailJobData {
to: string;
subject: string;
html: string;
text: string;
}
export class BackgroundJobService {
constructor(
private personalizationRepo: PersonalizationRepository,
private notificationRepo: NotificationRepository, // Use the imported type here
private emailQueue: Queue<EmailJobData>,
private logger: Logger,
) {}
public async triggerAnalyticsReport(): Promise<string> {
const reportDate = getCurrentDateISOString(); // YYYY-MM-DD
const jobId = `manual-report-${reportDate}-${Date.now()}`;
const job = await analyticsQueue.add('generate-daily-report', { reportDate }, { jobId });
Iif (!job.id) {
throw new Error('Failed to enqueue daily report job: No job ID returned');
}
return job.id;
}
public async triggerWeeklyAnalyticsReport(): Promise<string> {
const { year: reportYear, week: reportWeek } = getSimpleWeekAndYear();
const jobId = `manual-weekly-report-${reportYear}-${reportWeek}-${Date.now()}`;
const job = await weeklyAnalyticsQueue.add(
'generate-weekly-report',
{ reportYear, reportWeek },
{ jobId },
);
if (!job.id) {
throw new Error('Failed to enqueue weekly report job: No job ID returned');
}
return job.id;
}
public async triggerTokenCleanup(): Promise<string> {
const timestamp = new Date().toISOString();
const jobId = `manual-token-cleanup-${Date.now()}`;
const job = await tokenCleanupQueue.add('cleanup-tokens', { timestamp }, { jobId });
if (!job.id) {
throw new Error('Failed to enqueue token cleanup job: No job ID returned');
}
return job.id;
}
/**
* Prepares the data for an email notification job based on a user's deals.
* @param user The user to whom the email will be sent.
* @param deals The list of deals found for the user.
* @returns An object containing the email job data and a unique job ID.
*/
private _prepareDealEmail(
userProfile: { user_id: string; email: string; full_name: string | null },
deals: WatchedItemDeal[],
): { jobData: EmailJobData; jobId: string } {
const recipientName = userProfile.full_name || 'there';
const subject = `New Deals Found on Your Watched Items!`;
const dealsListHtml = deals
.map(
(deal) =>
`<li><strong>${deal.item_name}</strong> is on sale for <strong>${formatCurrency(
deal.best_price_in_cents,
)}</strong> at ${deal.store.name}!</li>`,
)
.join('');
const html = `<p>Hi ${recipientName},</p><p>We found some great deals on items you're watching:</p><ul>${dealsListHtml}</ul>`;
const text = `Hi ${recipientName},\n\nWe found some great deals on items you're watching. Visit the deals page on the site to learn more.`;
// Use a predictable Job ID to prevent duplicate email notifications for the same user on the same day.
const today = getCurrentDateISOString();
const jobId = `deal-email-${userProfile.user_id}-${today}`;
return {
jobData: { to: userProfile.email, subject, html, text },
jobId,
};
}
/**
* Prepares the data for an in-app notification.
* @param userId The ID of the user to notify.
* @param dealCount The number of deals found.
* @returns The notification object ready for database insertion.
*/
private _prepareInAppNotification(
userId: string,
dealCount: number,
): Omit<Notification, 'notification_id' | 'is_read' | 'created_at' | 'updated_at'> {
return {
user_id: userId,
content: `You have ${dealCount} new deal(s) on your watched items!`,
link_url: '/dashboard/deals', // A link to the future "My Deals" page
};
}
private async _processDealsForUser({
userProfile,
deals,
}: UserDealGroup): Promise<Omit<
Notification,
'notification_id' | 'is_read' | 'created_at' | 'updated_at'
> | null> {
try {
this.logger.info(
`[BackgroundJob] Found ${deals.length} deals for user ${userProfile.user_id}.`,
);
// Prepare in-app and email notifications.
const notification = this._prepareInAppNotification(userProfile.user_id, deals.length);
const { jobData, jobId } = this._prepareDealEmail(userProfile, deals);
// Enqueue an email notification job.
await this.emailQueue.add('send-deal-notification', jobData, { jobId });
// Send real-time WebSocket notification (ADR-022)
const { websocketService } = await import('./websocketService.server');
websocketService.broadcastDealNotification(userProfile.user_id, {
user_id: userProfile.user_id,
deals: deals.map((deal) => ({
item_name: deal.item_name,
best_price_in_cents: deal.best_price_in_cents,
store_name: deal.store.name,
store_id: deal.store.store_id,
})),
message: `You have ${deals.length} new deal(s) on your watched items!`,
});
this.logger.info(
`[BackgroundJob] Sent WebSocket notification to user ${userProfile.user_id}`,
);
// Return the notification to be collected for bulk insertion.
return notification;
} catch (userError) {
this.logger.error(
{ err: userError },
`[BackgroundJob] Failed to process deals for user ${userProfile.user_id}`,
);
return null; // Return null on error for this user.
}
}
/**
* Checks for new deals on watched items for all users and sends notifications.
* This function is designed to be run periodically (e.g., daily).
*/
async runDailyDealCheck(): Promise<void> {
this.logger.info('[BackgroundJob] Starting daily deal check for all users...');
try {
// 1. Get all deals for all users in a single, efficient query.
const allDeals = await this.personalizationRepo.getBestSalePricesForAllUsers(this.logger);
if (allDeals.length === 0) {
this.logger.info('[BackgroundJob] No deals found for any watched items. Skipping.');
return;
}
this.logger.info(`[BackgroundJob] Found ${allDeals.length} total deals across all users.`);
// 2. Group deals by user in memory.
const dealsByUser = new Map<string, UserDealGroup>();
for (const deal of allDeals) {
let userGroup = dealsByUser.get(deal.user_id);
if (!userGroup) {
userGroup = {
userProfile: { user_id: deal.user_id, email: deal.email, full_name: deal.full_name },
deals: [],
};
dealsByUser.set(deal.user_id, userGroup);
}
userGroup.deals.push(deal);
}
// 3. Process each user's deals in parallel.
const userProcessingPromises = Array.from(dealsByUser.values()).map((userGroup) =>
this._processDealsForUser(userGroup),
);
// Wait for all user processing to complete.
const results = await Promise.allSettled(userProcessingPromises);
// 6. Collect all successfully created notifications.
const successfulNotifications = results
.filter(
(
result,
): result is PromiseFulfilledResult<
Omit<Notification, 'notification_id' | 'is_read' | 'created_at' | 'updated_at'>
> => result.status === 'fulfilled' && !!result.value,
)
.map((result) => result.value);
// 7. Bulk insert all in-app notifications in a single query.
if (successfulNotifications.length > 0) {
const notificationsForDb = successfulNotifications.map((n) => ({
...n,
updated_at: new Date().toISOString(),
}));
await this.notificationRepo.createBulkNotifications(notificationsForDb, this.logger);
this.logger.info(
`[BackgroundJob] Successfully created ${successfulNotifications.length} in-app notifications.`,
);
}
this.logger.info('[BackgroundJob] Daily deal check completed successfully.');
} catch (error) {
this.logger.error(
{ err: error },
'[BackgroundJob] A critical error occurred during the daily deal check',
);
// Re-throw the error so the cron wrapper knows it failed.
throw error;
}
}
}
// A simple in-memory lock to prevent job overlaps.
let isDailyDealCheckRunning = false;
/**
* Initializes and starts the cron job for daily deal checks.
* This should be called once when the server starts.
* @param backgroundJobService An instance of BackgroundJobService.
* @param analyticsQueue An instance of the BullMQ analytics queue.
*/
export function startBackgroundJobs(
backgroundJobService: BackgroundJobService,
analyticsQueue: Queue,
weeklyAnalyticsQueue: Queue,
tokenCleanupQueue: Queue,
logger: Logger,
): void {
try {
// Schedule the deal check job to run once every day at 2:00 AM.
cron.schedule('0 2 * * *', () => {
// Self-invoking async function to handle the promise and errors gracefully.
(async () => {
if (isDailyDealCheckRunning) {
logger.warn(
'[BackgroundJob] Daily deal check is already running. Skipping this scheduled run.',
);
return;
}
isDailyDealCheckRunning = true;
try {
await backgroundJobService.runDailyDealCheck();
} catch (error) {
// The method itself logs details, this is a final catch-all.
logger.error(
{ err: error },
'[BackgroundJob] Cron job for daily deal check failed unexpectedly.',
);
} finally {
isDailyDealCheckRunning = false;
}
})().catch((error: unknown) => {
// This catch is for unhandled promise rejections from the async wrapper itself.
logger.error(
{ err: error },
'[BackgroundJob] Unhandled rejection in daily deal check cron wrapper.',
);
isDailyDealCheckRunning = false;
});
});
logger.info('[BackgroundJob] Cron job for daily deal checks has been scheduled.');
// Schedule the analytics report generation job to run at 3:00 AM.
cron.schedule('0 3 * * *', () => {
(async () => {
logger.info('[BackgroundJob] Enqueuing daily analytics report generation job.');
try {
const reportDate = getCurrentDateISOString(); // YYYY-MM-DD
// We use a unique job ID to prevent duplicate jobs for the same day if the scheduler restarts.
await analyticsQueue.add(
'generate-daily-report',
{ reportDate },
{
jobId: `daily-report-${reportDate}`,
},
);
} catch (error) {
logger.error({ err: error }, '[BackgroundJob] Failed to enqueue daily analytics job.');
}
})().catch((error: unknown) => {
logger.error(
{ err: error },
'[BackgroundJob] Unhandled rejection in analytics report cron wrapper.',
);
});
});
logger.info('[BackgroundJob] Cron job for daily analytics reports has been scheduled.');
// Schedule the weekly analytics report generation job to run every Sunday at 4:00 AM.
cron.schedule('0 4 * * 0', () => {
// 0 4 * * 0 means 4:00 AM on Sunday
(async () => {
logger.info('[BackgroundJob] Enqueuing weekly analytics report generation job.');
try {
const { year: reportYear, week: reportWeek } = getSimpleWeekAndYear();
await weeklyAnalyticsQueue.add(
'generate-weekly-report',
{ reportYear, reportWeek },
{
jobId: `weekly-report-${reportYear}-${reportWeek}`,
},
);
} catch (error) {
logger.error({ err: error }, '[BackgroundJob] Failed to enqueue weekly analytics job.');
}
})().catch((error: unknown) => {
logger.error(
{ err: error },
'[BackgroundJob] Unhandled rejection in weekly analytics report cron wrapper.',
);
});
});
logger.info('[BackgroundJob] Cron job for weekly analytics reports has been scheduled.');
// Schedule the expired token cleanup job to run every day at 5:00 AM.
cron.schedule('0 5 * * *', () => {
(async () => {
logger.info('[BackgroundJob] Enqueuing expired password reset token cleanup job.');
try {
const timestamp = new Date().toISOString();
await tokenCleanupQueue.add(
'cleanup-tokens',
{ timestamp },
{
jobId: `token-cleanup-${timestamp.split('T')[0]}`,
},
);
} catch (error) {
logger.error({ err: error }, '[BackgroundJob] Failed to enqueue token cleanup job.');
}
})().catch((error: unknown) => {
logger.error(
{ err: error },
'[BackgroundJob] Unhandled rejection in token cleanup cron wrapper.',
);
});
});
logger.info('[BackgroundJob] Cron job for expired token cleanup has been scheduled.');
} catch (error) {
logger.error(
{ err: error },
'[BackgroundJob] Failed to schedule a cron job. This is a critical setup error.',
);
}
}
// Instantiate the service with its real dependencies for use in the application.
import { personalizationRepo, notificationRepo } from './db/index.db';
import { logger } from './logger.server';
import { emailQueue } from './queueService.server';
export const backgroundJobService = new BackgroundJobService(
personalizationRepo,
notificationRepo,
emailQueue,
logger,
);
|