fix(pipeline): indexing jobs never started due to missing queue.enqueue() calls
All three trigger-indexing routes were calling service.createIndexingJob() directly which only inserts the DB record but never calls processNext(). Fixed to route through getQueue().enqueue() so the job queue actually picks up and runs the job immediately. Affected routes: - POST /api/v1/libs (autoIndex on add) - POST /api/v1/libs/:id/index - POST /api/v1/libs/:id/versions/:tag/index Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import { json } from '@sveltejs/kit';
|
|||||||
import type { RequestHandler } from './$types';
|
import type { RequestHandler } from './$types';
|
||||||
import { getClient } from '$lib/server/db/client';
|
import { getClient } from '$lib/server/db/client';
|
||||||
import { RepositoryService } from '$lib/server/services/repository.service';
|
import { RepositoryService } from '$lib/server/services/repository.service';
|
||||||
|
import { getQueue } from '$lib/server/pipeline/startup';
|
||||||
import { handleServiceError } from '$lib/server/utils/validation';
|
import { handleServiceError } from '$lib/server/utils/validation';
|
||||||
|
|
||||||
function getService() {
|
function getService() {
|
||||||
@@ -55,7 +56,10 @@ export const POST: RequestHandler = async ({ request }) => {
|
|||||||
|
|
||||||
let jobResponse: { id: string; status: string } | null = null;
|
let jobResponse: { id: string; status: string } | null = null;
|
||||||
if (body.autoIndex !== false) {
|
if (body.autoIndex !== false) {
|
||||||
const job = service.createIndexingJob(repo.id);
|
const queue = getQueue();
|
||||||
|
const job = queue
|
||||||
|
? queue.enqueue(repo.id)
|
||||||
|
: service.createIndexingJob(repo.id);
|
||||||
jobResponse = { id: job.id, status: job.status };
|
jobResponse = { id: job.id, status: job.status };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { json } from '@sveltejs/kit';
|
|||||||
import type { RequestHandler } from './$types';
|
import type { RequestHandler } from './$types';
|
||||||
import { getClient } from '$lib/server/db/client';
|
import { getClient } from '$lib/server/db/client';
|
||||||
import { RepositoryService } from '$lib/server/services/repository.service';
|
import { RepositoryService } from '$lib/server/services/repository.service';
|
||||||
|
import { getQueue } from '$lib/server/pipeline/startup';
|
||||||
import { handleServiceError, NotFoundError } from '$lib/server/utils/validation';
|
import { handleServiceError, NotFoundError } from '$lib/server/utils/validation';
|
||||||
|
|
||||||
export const POST: RequestHandler = async ({ params, request }) => {
|
export const POST: RequestHandler = async ({ params, request }) => {
|
||||||
@@ -23,7 +24,12 @@ export const POST: RequestHandler = async ({ params, request }) => {
|
|||||||
// body is optional
|
// body is optional
|
||||||
}
|
}
|
||||||
|
|
||||||
const job = service.createIndexingJob(id, versionId);
|
// Use the queue so processNext() is triggered immediately.
|
||||||
|
// Falls back to direct DB insert if the queue isn't initialised yet.
|
||||||
|
const queue = getQueue();
|
||||||
|
const job = queue
|
||||||
|
? queue.enqueue(id, versionId)
|
||||||
|
: service.createIndexingJob(id, versionId);
|
||||||
|
|
||||||
return json({ job }, { status: 202 });
|
return json({ job }, { status: 202 });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import type { RequestHandler } from './$types';
|
|||||||
import { getClient } from '$lib/server/db/client';
|
import { getClient } from '$lib/server/db/client';
|
||||||
import { RepositoryService } from '$lib/server/services/repository.service';
|
import { RepositoryService } from '$lib/server/services/repository.service';
|
||||||
import { VersionService } from '$lib/server/services/version.service';
|
import { VersionService } from '$lib/server/services/version.service';
|
||||||
|
import { getQueue } from '$lib/server/pipeline/startup';
|
||||||
import { handleServiceError, NotFoundError } from '$lib/server/utils/validation';
|
import { handleServiceError, NotFoundError } from '$lib/server/utils/validation';
|
||||||
|
|
||||||
function getServices() {
|
function getServices() {
|
||||||
@@ -37,7 +38,10 @@ export const POST: RequestHandler = ({ params }) => {
|
|||||||
throw new NotFoundError(`Version ${tag} not found for repository ${repositoryId}`);
|
throw new NotFoundError(`Version ${tag} not found for repository ${repositoryId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const job = repoService.createIndexingJob(repositoryId, version.id);
|
const queue = getQueue();
|
||||||
|
const job = queue
|
||||||
|
? queue.enqueue(repositoryId, version.id)
|
||||||
|
: repoService.createIndexingJob(repositoryId, version.id);
|
||||||
return json({ job }, { status: 202 });
|
return json({ job }, { status: 202 });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return handleServiceError(err);
|
return handleServiceError(err);
|
||||||
|
|||||||
Reference in New Issue
Block a user