diff --git a/src/lib/components/admin/JobStatusBadge.svelte b/src/lib/components/admin/JobStatusBadge.svelte new file mode 100644 index 0000000..79ee3fe --- /dev/null +++ b/src/lib/components/admin/JobStatusBadge.svelte @@ -0,0 +1,22 @@ + + + + {config.label} + diff --git a/src/lib/server/db/schema.ts b/src/lib/server/db/schema.ts index a8d171f..889f217 100644 --- a/src/lib/server/db/schema.ts +++ b/src/lib/server/db/schema.ts @@ -132,7 +132,7 @@ export const indexingJobs = sqliteTable('indexing_jobs', { .references(() => repositories.id, { onDelete: 'cascade' }), versionId: text('version_id'), status: text('status', { - enum: ['queued', 'running', 'done', 'failed'] + enum: ['queued', 'running', 'paused', 'cancelled', 'done', 'failed'] }) .notNull() .default('queued'), diff --git a/src/lib/server/models/indexing-job.ts b/src/lib/server/models/indexing-job.ts index 4901700..b6ed82f 100644 --- a/src/lib/server/models/indexing-job.ts +++ b/src/lib/server/models/indexing-job.ts @@ -2,7 +2,7 @@ export interface IndexingJobEntityProps { id: string; repository_id: string; version_id: string | null; - status: 'queued' | 'running' | 'done' | 'failed'; + status: 'queued' | 'running' | 'paused' | 'cancelled' | 'done' | 'failed'; progress: number; total_files: number; processed_files: number; @@ -16,7 +16,7 @@ export class IndexingJobEntity { id: string; repository_id: string; version_id: string | null; - status: 'queued' | 'running' | 'done' | 'failed'; + status: 'queued' | 'running' | 'paused' | 'cancelled' | 'done' | 'failed'; progress: number; total_files: number; processed_files: number; @@ -44,7 +44,7 @@ export interface IndexingJobProps { id: string; repositoryId: string; versionId: string | null; - status: 'queued' | 'running' | 'done' | 'failed'; + status: 'queued' | 'running' | 'paused' | 'cancelled' | 'done' | 'failed'; progress: number; totalFiles: number; processedFiles: number; @@ -58,7 +58,7 @@ export class IndexingJob { id: string; repositoryId: string; versionId: string | null; - status: 'queued' | 'running' | 'done' | 'failed'; + status: 'queued' | 'running' | 'paused' | 'cancelled' | 'done' | 'failed'; progress: number; totalFiles: number; processedFiles: number; @@ -86,7 +86,7 @@ export interface IndexingJobDtoProps { id: string; repositoryId: string; versionId: string | null; - status: 'queued' | 'running' | 'done' | 'failed'; + status: 'queued' | 'running' | 'paused' | 'cancelled' | 'done' | 'failed'; progress: number; totalFiles: number; processedFiles: number; @@ -100,7 +100,7 @@ export class IndexingJobDto { id: string; repositoryId: string; versionId: string | null; - status: 'queued' | 'running' | 'done' | 'failed'; + status: 'queued' | 'running' | 'paused' | 'cancelled' | 'done' | 'failed'; progress: number; totalFiles: number; processedFiles: number; diff --git a/src/lib/server/pipeline/job-queue.ts b/src/lib/server/pipeline/job-queue.ts index adec1c0..6e1674a 100644 --- a/src/lib/server/pipeline/job-queue.ts +++ b/src/lib/server/pipeline/job-queue.ts @@ -209,9 +209,78 @@ export class JobQueue { params.push(options.status); } - const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND')}` : ''; const sql = `SELECT COUNT(*) as n FROM indexing_jobs ${where}`; const row = this.db.prepare(sql).get(...params); return row?.n ?? 0; } + + /** + * Pause a job that is currently queued or running. + * Returns true if the job was successfully paused, false otherwise. + */ + pauseJob(id: string): boolean { + const job = this.getJob(id); + if (!job) return false; + + // Only queued or running jobs can be paused + if (job.status !== 'queued' && job.status !== 'running') { + return false; + } + + this.db + .prepare(`UPDATE indexing_jobs SET status = 'paused' WHERE id = ?`) + .run(id); + + return true; + } + + /** + * Resume a paused job by changing its status back to 'queued' and + * triggering the queue drain. + * Returns true if the job was successfully resumed, false otherwise. + */ + resumeJob(id: string): boolean { + const job = this.getJob(id); + if (!job) return false; + + // Only paused jobs can be resumed + if (job.status !== 'paused') { + return false; + } + + this.db + .prepare(`UPDATE indexing_jobs SET status = 'queued' WHERE id = ?`) + .run(id); + + // Trigger queue processing in case the queue was idle + this.drainQueued(); + + return true; + } + + /** + * Cancel a job if it's not already completed. + * Returns true if the job was successfully cancelled, false otherwise. + */ + cancelJob(id: string): boolean { + const job = this.getJob(id); + if (!job) return false; + + // Can't cancel jobs that are already done or failed + if (job.status === 'done' || job.status === 'failed') { + return false; + } + + const now = Math.floor(Date.now() / 1000); + this.db + .prepare( + `UPDATE indexing_jobs + SET status = 'cancelled', completed_at = ? + WHERE id = ?` + ) + .run(now, id); + + return true; + } } diff --git a/src/routes/admin/jobs/+page.svelte b/src/routes/admin/jobs/+page.svelte new file mode 100644 index 0000000..33a93c3 --- /dev/null +++ b/src/routes/admin/jobs/+page.svelte @@ -0,0 +1,264 @@ + + + + Job Queue - TrueRef Admin + + +
+
+

Job Queue

+

Monitor and control indexing jobs

+
+ + {#if loading && jobs.length === 0} +
+
+
+

Loading jobs...

+
+
+ {:else if error && jobs.length === 0} +
+

Error: {error}

+
+ {:else if jobs.length === 0} +
+

No jobs found. Jobs will appear here when repositories are indexed.

+
+ {:else} +
+ + + + + + + + + + + + {#each jobs as job (job.id)} + + + + + + + + {/each} + +
+ Repository + + Status + + Progress + + Created + + Actions +
+ {job.repositoryId} + {#if job.versionId} + @{job.versionId} + {/if} + + + +
+ {job.progress}% +
+
+
+ {#if job.totalFiles > 0} + + {job.processedFiles}/{job.totalFiles} files + + {/if} +
+
+ {formatDate(job.createdAt)} + +
+ {#if canPause(job.status)} + + {/if} + {#if canResume(job.status)} + + {/if} + {#if canCancel(job.status)} + + {/if} + {#if !canPause(job.status) && !canResume(job.status) && !canCancel(job.status)} + + {/if} +
+
+
+ + {#if loading} +
+ Refreshing... +
+ {/if} + {/if} +
diff --git a/src/routes/api/v1/jobs/[id]/cancel/+server.ts b/src/routes/api/v1/jobs/[id]/cancel/+server.ts new file mode 100644 index 0000000..8444d20 --- /dev/null +++ b/src/routes/api/v1/jobs/[id]/cancel/+server.ts @@ -0,0 +1,45 @@ +/** + * POST /api/v1/jobs/:id/cancel — cancel a job. + */ + +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { handleServiceError, NotFoundError, InvalidInputError } from '$lib/server/utils/validation.js'; + +export const POST: RequestHandler = ({ params }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + + const job = queue.getJob(params.id); + if (!job) throw new NotFoundError(`Job ${params.id} not found`); + + const success = queue.cancelJob(params.id); + if (!success) { + throw new InvalidInputError( + `Cannot cancel job ${params.id} - job is already done or failed` + ); + } + + // Fetch updated job + const updated = queue.getJob(params.id)!; + + return json({ success: true, job: IndexingJobMapper.toDto(updated) }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'POST, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +}; diff --git a/src/routes/api/v1/jobs/[id]/pause/+server.ts b/src/routes/api/v1/jobs/[id]/pause/+server.ts new file mode 100644 index 0000000..0cf5b5b --- /dev/null +++ b/src/routes/api/v1/jobs/[id]/pause/+server.ts @@ -0,0 +1,45 @@ +/** + * POST /api/v1/jobs/:id/pause — pause a running or queued job. + */ + +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { handleServiceError, NotFoundError, InvalidInputError } from '$lib/server/utils/validation.js'; + +export const POST: RequestHandler = ({ params }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + + const job = queue.getJob(params.id); + if (!job) throw new NotFoundError(`Job ${params.id} not found`); + + const success = queue.pauseJob(params.id); + if (!success) { + throw new InvalidInputError( + `Cannot pause job ${params.id} - only queued or running jobs can be paused` + ); + } + + // Fetch updated job + const updated = queue.getJob(params.id)!; + + return json({ success: true, job: IndexingJobMapper.toDto(updated) }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'POST, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +}; diff --git a/src/routes/api/v1/jobs/[id]/resume/+server.ts b/src/routes/api/v1/jobs/[id]/resume/+server.ts new file mode 100644 index 0000000..3524bf4 --- /dev/null +++ b/src/routes/api/v1/jobs/[id]/resume/+server.ts @@ -0,0 +1,43 @@ +/** + * POST /api/v1/jobs/:id/resume — resume a paused job. + */ + +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { handleServiceError, NotFoundError, InvalidInputError } from '$lib/server/utils/validation.js'; + +export const POST: RequestHandler = ({ params }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + + const job = queue.getJob(params.id); + if (!job) throw new NotFoundError(`Job ${params.id} not found`); + + const success = queue.resumeJob(params.id); + if (!success) { + throw new InvalidInputError(`Cannot resume job ${params.id} - only paused jobs can be resumed`); + } + + // Fetch updated job + const updated = queue.getJob(params.id)!; + + return json({ success: true, job: IndexingJobMapper.toDto(updated) }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'POST, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +};