Compare commits
2 Commits
e63279fcf6
...
6f3f4db19b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f3f4db19b | ||
|
|
f4fe8c6043 |
113
docs/features/TRUEREF-0021.md
Normal file
113
docs/features/TRUEREF-0021.md
Normal file
@@ -0,0 +1,113 @@
|
||||
# TRUEREF-0021 — Differential Tag Indexing
|
||||
|
||||
**Priority:** P1
|
||||
**Status:** Implemented
|
||||
**Depends On:** TRUEREF-0014, TRUEREF-0017, TRUEREF-0019
|
||||
**Blocks:** —
|
||||
|
||||
---
|
||||
|
||||
## Problem Statement
|
||||
|
||||
Repositories with many version tags (e.g. hundreds or thousands, as seen in projects like RWC
|
||||
UXFramework) make full re-indexing prohibitively expensive. Between consecutive semver tags the
|
||||
overwhelming majority of files are unchanged — often only dependency manifests (`package.json`,
|
||||
`*.lock`) differ. Indexing the complete file tree for every tag wastes compute time, GitHub API
|
||||
quota, and embedding credits.
|
||||
|
||||
---
|
||||
|
||||
## Solution
|
||||
|
||||
Differential tag indexing detects when an already-indexed ancestor version exists for a given
|
||||
target tag, determines exactly which files changed, and:
|
||||
|
||||
1. **Clones** unchanged document rows, snippet rows, and embedding rows from the ancestor version
|
||||
into the target version in a single SQLite transaction (`cloneFromAncestor`).
|
||||
2. **Crawls** only the changed (added / modified) files, parses and embeds them normally.
|
||||
3. **Skips** deleted files (not cloned, not crawled).
|
||||
4. **Falls back** silently to a full crawl when no indexed ancestor can be found or any step fails.
|
||||
|
||||
---
|
||||
|
||||
## Algorithm
|
||||
|
||||
### Stage 0 — Differential Plan (`buildDifferentialPlan`)
|
||||
|
||||
Executed in `IndexingPipeline.run()` before the crawl, when the job has a `versionId`:
|
||||
|
||||
1. **Ancestor selection** (`findBestAncestorVersion` in `tag-order.ts`): Loads all `indexed`
|
||||
versions for the repository, parses their tags as semver, and returns the closest predecessor
|
||||
to the target tag. Falls back to creation-timestamp ordering for non-semver tags.
|
||||
|
||||
2. **Changed-file detection**: For GitHub repositories, calls the GitHub Compare API
|
||||
(`fetchGitHubChangedFiles` in `github-compare.ts`). For local repositories, uses
|
||||
`git diff --name-status` via `getChangedFilesBetweenRefs` in `git.ts` (implemented with
|
||||
`execFileSync` — not `execSync` — to prevent shell-injection attacks on branch/tag names
|
||||
containing shell metacharacters).
|
||||
|
||||
3. **Path partitioning**: The changed-file list is split into `changedPaths` (added + modified
|
||||
+ renamed-destination) and `deletedPaths`. `unchangedPaths` is derived as
|
||||
`ancestorFilePaths − changedPaths − deletedPaths`.
|
||||
|
||||
4. **Guard**: Returns `null` when no indexed ancestor exists, when the ancestor has no indexed
|
||||
documents, or when all files changed (nothing to clone).
|
||||
|
||||
### Stage 0.5 — Clone Unchanged Files (`cloneFromAncestor`)
|
||||
|
||||
When `buildDifferentialPlan` returns a non-null plan with `unchangedPaths.size > 0`:
|
||||
|
||||
- Fetches ancestor `documents` rows for the unchanged paths using a parameterised
|
||||
`IN (?, ?, …)` query (no string interpolation of path values → no SQL injection).
|
||||
- Inserts new `documents` rows for each, with new UUIDs and `version_id = targetVersionId`.
|
||||
- Fetches ancestor `snippets` rows for those document IDs; inserts clones with new IDs.
|
||||
- Fetches ancestor `snippet_embeddings` rows; inserts clones pointing to the new snippet IDs.
|
||||
- The entire operation runs inside a single `this.db.transaction(…)()` call for atomicity.
|
||||
|
||||
### Stage 1 — Partial Crawl
|
||||
|
||||
`IndexingPipeline.crawl()` accepts an optional third argument `allowedPaths?: Set<string>`.
|
||||
When provided (set to `differentialPlan.changedPaths`), the crawl result is filtered so only
|
||||
matching files are returned. This minimises GitHub API requests and local I/O.
|
||||
|
||||
---
|
||||
|
||||
## API Surface Changes
|
||||
|
||||
| Symbol | Location | Change |
|
||||
|---|---|---|
|
||||
| `buildDifferentialPlan` | `pipeline/differential-strategy.ts` | **New** — async function |
|
||||
| `DifferentialPlan` | `pipeline/differential-strategy.ts` | **New** — interface |
|
||||
| `findBestAncestorVersion` | `utils/tag-order.ts` | **New** — pure function |
|
||||
| `fetchGitHubChangedFiles` | `crawler/github-compare.ts` | **New** — async function |
|
||||
| `getChangedFilesBetweenRefs` | `utils/git.ts` | **New** — sync function (uses `execFileSync`) |
|
||||
| `ChangedFile` | `crawler/types.ts` | **New** — interface |
|
||||
| `CrawlOptions.allowedPaths` | `crawler/types.ts` | **New** — optional field |
|
||||
| `IndexingPipeline.crawl()` | `pipeline/indexing.pipeline.ts` | **Modified** — added `allowedPaths` param |
|
||||
| `IndexingPipeline.cloneFromAncestor()` | `pipeline/indexing.pipeline.ts` | **New** — private method |
|
||||
| `IndexingPipeline.run()` | `pipeline/indexing.pipeline.ts` | **Modified** — Stage 0 added |
|
||||
|
||||
---
|
||||
|
||||
## Correctness Properties
|
||||
|
||||
- **Atomicity**: `cloneFromAncestor` wraps all inserts in one SQLite transaction; a failure
|
||||
leaves the target version with no partially-cloned data.
|
||||
- **Idempotency (fallback)**: If the clone or plan step fails for any reason, the pipeline
|
||||
catches the error, logs a warning, and continues with a full crawl. No data loss occurs.
|
||||
- **No shell injection**: `getChangedFilesBetweenRefs` uses `execFileSync` with an argument
|
||||
array rather than `execSync` with a template-literal string.
|
||||
- **No SQL injection**: Path values are never interpolated into SQL strings; only `?`
|
||||
placeholders are used.
|
||||
|
||||
---
|
||||
|
||||
## Fallback Conditions
|
||||
|
||||
The differential plan returns `null` (triggering a full crawl) when:
|
||||
|
||||
- No versions for this repository have `state = 'indexed'`.
|
||||
- The best ancestor has no indexed documents.
|
||||
- All files changed between ancestor and target (`unchangedPaths.size === 0`).
|
||||
- The GitHub Compare API call or `git diff` call throws an error.
|
||||
- Any unexpected exception inside `buildDifferentialPlan`.
|
||||
446
docs/features/TRUEREF-0022.md
Normal file
446
docs/features/TRUEREF-0022.md
Normal file
@@ -0,0 +1,446 @@
|
||||
# TRUEREF-0022 — Worker-Thread Indexing, Parallel Job Execution, and Real-Time Progress Streaming
|
||||
|
||||
**Priority:** P1
|
||||
**Status:** Draft
|
||||
**Depends On:** TRUEREF-0009, TRUEREF-0014, TRUEREF-0017, TRUEREF-0021
|
||||
**Blocks:** —
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
The indexing pipeline currently runs on the same Node.js event loop as the HTTP and MCP servers. Because `better-sqlite3` is synchronous and `parseFile` is CPU-bound, a single indexing job can starve the event loop for seconds at a time, making the web UI completely unresponsive during indexing. With hundreds of version tags queued simultaneously, the problem compounds: the UI cannot navigate, poll for progress, or serve MCP requests while work is in flight.
|
||||
|
||||
This feature fixes the root cause by moving all indexing work into a dedicated Node.js Worker Thread, enabling controlled parallel execution of multiple jobs, and replacing the polling-based progress model with a real-time push mechanism — either Server-Sent Events (SSE) or a lightweight WebSocket channel.
|
||||
|
||||
---
|
||||
|
||||
## Problem Statement
|
||||
|
||||
### 1. Event Loop Starvation
|
||||
|
||||
`IndexingPipeline.run()` spends the bulk of its time in two blocking operations:
|
||||
|
||||
- `parseFile(file, ...)` — CPU-bound text/AST parsing, fully synchronous
|
||||
- `better-sqlite3` writes — synchronous I/O by design
|
||||
|
||||
Neither yields to the event loop. A repository with 2 000 files produces ~2 000 consecutive blocking micro-tasks with no opportunity for the HTTP server to process any incoming request in between. The current mitigation (yielding every 20 files via `setImmediate`) reduces the freeze to sub-second intervals but does not eliminate it — it is a band-aid, not a structural fix.
|
||||
|
||||
### 2. Sequential-only Queue
|
||||
|
||||
`JobQueue` serialises all jobs one at a time to avoid SQLite write contention. This is correct for a single-writer model, but it means:
|
||||
|
||||
- Indexing `/my-lib/v3.0.0` blocks `/other-lib` from starting, even though they write to entirely disjoint rows.
|
||||
- With hundreds of version tags registered from Discover Tags, a user must wait for every previous tag to finish before the next one starts — typically hours for a large monorepo.
|
||||
|
||||
### 3. Polling Overhead and Lag
|
||||
|
||||
The UI currently polls `GET /api/v1/jobs?repositoryId=...` every 2 seconds. This means:
|
||||
|
||||
- Progress updates are always up to 2 seconds stale.
|
||||
- Each poll is a full DB read regardless of whether anything changed.
|
||||
- The polling interval itself adds load during the highest-contention window.
|
||||
|
||||
---
|
||||
|
||||
## Goals
|
||||
|
||||
1. Move `IndexingPipeline.run()` into a Node.js Worker Thread so the HTTP event loop is never blocked by indexing work.
|
||||
2. Support configurable parallel job execution (default: 2 concurrent workers, max: N where N is the number of CPU cores minus 1).
|
||||
3. Replace polling with Server-Sent Events (SSE) for real-time per-job progress streaming.
|
||||
4. Keep a single SQLite file as the persistence layer — no external message broker.
|
||||
5. Detailed progress: expose current stage name (crawl / diff / parse / store / embed), not just a percentage.
|
||||
6. Remain backward-compatible: the existing `GET /api/v1/jobs/{id}` REST endpoint continues to work unchanged.
|
||||
|
||||
---
|
||||
|
||||
## Non-Goals
|
||||
|
||||
- Moving to a multi-process (fork) architecture.
|
||||
- External queue systems (Redis, BullMQ, etc.).
|
||||
- Distributed or cluster execution across multiple machines.
|
||||
- Resumable indexing (pause mid-parse and continue after restart).
|
||||
- Changing the SQLite storage backend.
|
||||
|
||||
---
|
||||
|
||||
## Architecture
|
||||
|
||||
### Worker Thread Model
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ Main Thread (SvelteKit / HTTP / MCP) │
|
||||
│ │
|
||||
│ WorkerPool JobQueue (SQLite) │
|
||||
│ ┌────────────┐ ┌──────────────────────────────────────┐ │
|
||||
│ │ Worker 0 │◄────►│ indexing_jobs (queued/running/done) │ │
|
||||
│ │ Worker 1 │ └──────────────────────────────────────┘ │
|
||||
│ │ Worker N │ │
|
||||
│ └────────────┘ ProgressBroadcaster │
|
||||
│ │ ┌──────────────────────────────────────┐ │
|
||||
│ └─────────────►│ SSE channels (Map<jobId, Response>) │ │
|
||||
│ postMessage └──────────────────────────────────────┘ │
|
||||
│ { type: 'progress', jobId, stage, ... } │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
#### Worker Thread lifecycle
|
||||
|
||||
Each worker is a long-lived `node:worker_threads` `Worker` instance that:
|
||||
1. Opens its own `better-sqlite3` connection to the same database file.
|
||||
2. Listens for `{ type: 'run', jobId }` messages from the main thread.
|
||||
3. Runs `IndexingPipeline.run(job)`, emitting `postMessage` progress events at each stage boundary and every N files.
|
||||
4. Posts `{ type: 'done', jobId }` or `{ type: 'failed', jobId, error }` when finished.
|
||||
5. Is reused for subsequent jobs (no spawn-per-job overhead).
|
||||
|
||||
#### WorkerPool (main thread)
|
||||
|
||||
Manages a pool of `concurrency` workers.
|
||||
|
||||
```typescript
|
||||
interface WorkerPoolOptions {
|
||||
concurrency: number; // default: Math.max(1, os.cpus().length - 1), capped at 4
|
||||
workerScript: string; // absolute path to the compiled worker entry
|
||||
}
|
||||
|
||||
class WorkerPool {
|
||||
private workers: Worker[];
|
||||
private idle: Worker[];
|
||||
|
||||
enqueue(jobId: string): void;
|
||||
private dispatch(worker: Worker, jobId: string): void;
|
||||
private onWorkerMessage(msg: WorkerMessage): void;
|
||||
private onWorkerExit(worker: Worker, code: number): void;
|
||||
}
|
||||
```
|
||||
|
||||
Workers are kept alive across jobs. If a worker crashes (non-zero exit), the pool spawns a replacement and marks any in-flight job as `failed`.
|
||||
|
||||
#### Parallelism and write contention
|
||||
|
||||
With WAL mode enabled (already the case), SQLite supports:
|
||||
- **One concurrent writer** (the transaction lock)
|
||||
- **Many concurrent readers**
|
||||
|
||||
The `replaceSnippets` transaction for different repositories never contends — they write different rows. The `cloneFromAncestor` operation writes to the same tables but different `version_id` values, so WAL checkpoint logic keeps them non-overlapping at the page level.
|
||||
|
||||
Two jobs on the **same repository** (e.g. `/my-lib/v1.0.0` and `/my-lib/v2.0.0`) can run in parallel because:
|
||||
- Differential indexing (TRUEREF-0021) ensures `v2.0.0` reads from `v1.0.0`'s already-committed rows.
|
||||
- The write transactions for each version touch disjoint `version_id` partitions.
|
||||
|
||||
If write contention still occurs under parallel load, `busy_timeout = 5000` (already set) absorbs transient waits.
|
||||
|
||||
#### Concurrency limit per repository
|
||||
|
||||
To prevent a user from queuing 500 tags and overwhelming the worker pool, the pool enforces:
|
||||
- **Max 1 running job per repository** for the default branch (re-index).
|
||||
- **Max `concurrency` total running jobs** across all repositories.
|
||||
- Version jobs for the same repository are serialised within the pool (the queue picks the oldest queued version job for a given repo only when no other version job for that repo is running).
|
||||
|
||||
---
|
||||
|
||||
## Progress Model
|
||||
|
||||
### Stage Enumeration
|
||||
|
||||
Replace the opaque integer progress with a structured stage model:
|
||||
|
||||
```typescript
|
||||
type IndexingStage =
|
||||
| 'queued'
|
||||
| 'differential' // computing ancestor diff
|
||||
| 'crawling' // fetching files from GitHub or local FS
|
||||
| 'cloning' // cloning unchanged files from ancestor (differential only)
|
||||
| 'parsing' // parsing files into snippets
|
||||
| 'storing' // writing documents + snippets to DB
|
||||
| 'embedding' // generating vector embeddings
|
||||
| 'done'
|
||||
| 'failed';
|
||||
```
|
||||
|
||||
### Extended Job Schema
|
||||
|
||||
```sql
|
||||
ALTER TABLE indexing_jobs ADD COLUMN stage TEXT NOT NULL DEFAULT 'queued';
|
||||
ALTER TABLE indexing_jobs ADD COLUMN stage_detail TEXT; -- e.g. "42 / 200 files"
|
||||
```
|
||||
|
||||
The `progress` column (0–100) is retained for backward compatibility and overall bar rendering.
|
||||
|
||||
### Worker → Main thread progress message
|
||||
|
||||
```typescript
|
||||
interface ProgressMessage {
|
||||
type: 'progress';
|
||||
jobId: string;
|
||||
stage: IndexingStage;
|
||||
stageDetail?: string; // human-readable detail for the current stage
|
||||
progress: number; // 0–100 overall
|
||||
processedFiles: number;
|
||||
totalFiles: number;
|
||||
}
|
||||
```
|
||||
|
||||
Workers emit this message:
|
||||
- On every stage transition (crawl start, parse start, store start, embed start).
|
||||
- Every `PROGRESS_EMIT_EVERY = 10` files during the parse loop.
|
||||
- On job completion or failure.
|
||||
|
||||
The main thread receives these messages and does two things:
|
||||
1. Writes the update to `indexing_jobs` in SQLite (batched — one write per message, not per file).
|
||||
2. Pushes the payload to any open SSE channels for that jobId.
|
||||
|
||||
---
|
||||
|
||||
## Server-Sent Events API
|
||||
|
||||
### `GET /api/v1/jobs/:id/stream`
|
||||
|
||||
Opens an SSE connection for a specific job. The server:
|
||||
1. Sends the current job state as the first event immediately (no initial lag).
|
||||
2. Pushes `ProgressMessage` events as the worker emits them.
|
||||
3. Sends a final `event: done` or `event: failed` event, then closes the connection.
|
||||
4. Accepts `Last-Event-ID` header for reconnect support — replays the last known state.
|
||||
|
||||
```
|
||||
GET /api/v1/jobs/abc-123/stream HTTP/1.1
|
||||
Accept: text/event-stream
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: text/event-stream
|
||||
Cache-Control: no-cache
|
||||
X-Accel-Buffering: no
|
||||
|
||||
id: 1
|
||||
event: progress
|
||||
data: {"stage":"crawling","progress":0,"processedFiles":0,"totalFiles":0}
|
||||
|
||||
id: 2
|
||||
event: progress
|
||||
data: {"stage":"parsing","progress":12,"processedFiles":240,"totalFiles":2000}
|
||||
|
||||
id: 47
|
||||
event: done
|
||||
data: {"stage":"done","progress":100,"processedFiles":2000,"totalFiles":2000}
|
||||
```
|
||||
|
||||
The connection is automatically closed by the server after `event: done` or `event: failed`. If the client disconnects and reconnects with `Last-Event-ID: 47`, the server replays the last cached event (only the most recent event per job is cached in memory).
|
||||
|
||||
### `GET /api/v1/jobs/stream` (batch)
|
||||
|
||||
A second endpoint streams progress for all active jobs of a repository:
|
||||
|
||||
```
|
||||
GET /api/v1/jobs/stream?repositoryId=/my-lib HTTP/1.1
|
||||
Accept: text/event-stream
|
||||
```
|
||||
|
||||
Events are multiplexed in the same stream:
|
||||
|
||||
```
|
||||
event: job-progress
|
||||
data: {"jobId":"abc-123","stage":"parsing","progress":34,...}
|
||||
|
||||
event: job-progress
|
||||
data: {"jobId":"def-456","stage":"crawling","progress":5,...}
|
||||
```
|
||||
|
||||
This replaces the current single-interval `GET /api/v1/jobs?repositoryId=...` poll on the repository detail page entirely.
|
||||
|
||||
---
|
||||
|
||||
## UI Changes
|
||||
|
||||
### Repository detail page (`/repos/[id]`)
|
||||
|
||||
- Replace the `$effect` poll for version jobs with a single `EventSource` connection to `GET /api/v1/jobs/stream?repositoryId=...`.
|
||||
- Replace the inline progress bar markup with a refined component that shows stage name + file count + percentage.
|
||||
- Show a compact "N jobs in queue" badge when jobs are queued but not yet running.
|
||||
|
||||
### Admin jobs page (`/admin/jobs`)
|
||||
|
||||
- Replace the `setInterval(fetchJobs, 3000)` poll with an `EventSource` on `GET /api/v1/jobs/stream` (all jobs, no repositoryId filter).
|
||||
|
||||
### Progress component
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────────────────────────┐
|
||||
│ v2.1.0 Parsing 42 / 200 21% │
|
||||
│ ████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ │
|
||||
└──────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
Stage label transitions: `Queued → Diff → Crawling → Cloning → Parsing → Storing → Embedding → Done`
|
||||
|
||||
---
|
||||
|
||||
## Configuration
|
||||
|
||||
Expose via the settings table (key `indexing.concurrency`):
|
||||
|
||||
```typescript
|
||||
interface IndexingSettings {
|
||||
concurrency: number; // 1–max(cpus-1, 1); default 2
|
||||
}
|
||||
```
|
||||
|
||||
Surfaced in the settings UI (`/settings`) alongside the embedding provider config.
|
||||
|
||||
---
|
||||
|
||||
## Schema Migration
|
||||
|
||||
```sql
|
||||
-- Migration: add stage columns to indexing_jobs
|
||||
ALTER TABLE indexing_jobs ADD COLUMN stage TEXT NOT NULL DEFAULT 'queued';
|
||||
ALTER TABLE indexing_jobs ADD COLUMN stage_detail TEXT;
|
||||
```
|
||||
|
||||
The `progress`, `processedFiles`, and `totalFiles` columns are retained. The `status` column (`queued / running / paused / cancelled / done / failed`) is also retained. `stage` provides sub-status granularity within `running`.
|
||||
|
||||
---
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
### Worker Thread
|
||||
|
||||
- [ ] `IndexingPipeline.run()` executes entirely inside a Worker Thread — zero `parseFile` / `replaceSnippets` calls on the main thread during indexing
|
||||
- [ ] Worker crashes are detected: the pool spawns a replacement and marks the in-flight job as `failed`
|
||||
- [ ] Worker pool concurrency is configurable via settings (min 1, max `cpus - 1`, default 2)
|
||||
- [ ] Restarting the server cleans up stale `running` jobs and re-queues them (existing behaviour preserved)
|
||||
|
||||
### Parallel Execution
|
||||
|
||||
- [ ] Two jobs for two different repositories run concurrently when `concurrency ≥ 2`
|
||||
- [ ] Two version jobs for the same repository are serialised (at most one per repo at a time)
|
||||
- [ ] Main-branch re-index job and version jobs for the same repo are serialised
|
||||
- [ ] Admin jobs page shows parallel running jobs simultaneously
|
||||
|
||||
### Progress Streaming
|
||||
|
||||
- [ ] `GET /api/v1/jobs/:id/stream` returns `text/event-stream` with stage + progress events
|
||||
- [ ] `GET /api/v1/jobs/stream?repositoryId=...` multiplexes all active jobs for a repo
|
||||
- [ ] First event is sent immediately (no wait for the first stage transition)
|
||||
- [ ] SSE connection closes automatically after `done` / `failed`
|
||||
- [ ] `Last-Event-ID` reconnect replays the last cached event
|
||||
- [ ] Existing `GET /api/v1/jobs/:id` REST endpoint still works (no breaking change)
|
||||
|
||||
### Stage Detail
|
||||
|
||||
- [ ] `stage` column in `indexing_jobs` reflects the current pipeline stage
|
||||
- [ ] UI shows stage label next to the progress bar
|
||||
- [ ] Stage transitions: `queued → differential → crawling → cloning → parsing → storing → embedding → done`
|
||||
|
||||
### UI Responsiveness
|
||||
|
||||
- [ ] Navigating between pages while indexing is in progress has < 200 ms response time
|
||||
- [ ] MCP `query-docs` calls resolve correctly while indexing is running in parallel
|
||||
- [ ] No `SQLITE_BUSY` errors under concurrent indexing + read load
|
||||
|
||||
---
|
||||
|
||||
## Implementation Order
|
||||
|
||||
1. **Schema migration** — add `stage` and `stage_detail` columns (non-breaking, backward-compatible defaults)
|
||||
2. **Worker entry point** — `src/lib/server/pipeline/worker.ts` — thin wrapper that receives `run` messages and calls `IndexingPipeline.run()`
|
||||
3. **WorkerPool** — `src/lib/server/pipeline/worker-pool.ts` — pool management, message routing, crash recovery
|
||||
4. **ProgressBroadcaster** — `src/lib/server/pipeline/progress-broadcaster.ts` — in-memory SSE channel registry, last-event cache
|
||||
5. **SSE endpoints** — `src/routes/api/v1/jobs/[id]/stream/+server.ts` and `src/routes/api/v1/jobs/stream/+server.ts`
|
||||
6. **JobQueue update** — replace `processNext`'s direct `pipeline.run()` call with `workerPool.enqueue(jobId)`; enforce per-repo serialisation
|
||||
7. **Pipeline stage reporting** — add `this.reportStage(stage, detail)` calls at each stage boundary in `IndexingPipeline.run()`
|
||||
8. **UI: SSE client on repository page** — replace `$effect` poll with `EventSource`
|
||||
9. **UI: SSE client on admin/jobs page** — replace `setInterval` with `EventSource`
|
||||
10. **Settings UI** — add concurrency slider to `/settings`
|
||||
11. **Integration tests** — parallel execution, crash recovery, SSE event sequence
|
||||
|
||||
---
|
||||
|
||||
## Dedicated Embedding Worker
|
||||
|
||||
The embedding stage must **not** run inside the same Worker Thread as the crawl/parse/store pipeline. The reasons are structural:
|
||||
|
||||
### Why a dedicated embedding worker
|
||||
|
||||
| Concern | Per-parse-worker model | Dedicated embedding worker |
|
||||
|---|---|---|
|
||||
| Memory | N × ~100 MB (model weights + WASM heap) per worker | 1 × ~100 MB regardless of concurrency |
|
||||
| Model warm-up | Paid once per worker spawn; cold starts slow | Paid once at server startup |
|
||||
| Batch size | Each worker batches only its own job's snippets | All in-flight jobs queue to one worker → larger batches → higher WASM throughput |
|
||||
| Provider migration | Must update every worker | Update one file |
|
||||
| API rate limiting | N parallel streams to the same API → N×rate-limit hits | One serial stream, naturally throttled |
|
||||
|
||||
With `Xenova/all-MiniLM-L6-v2`, the WASM model and weight files occupy ~90–120 MB of heap. Running three parse workers with embedded model loading costs ~300–360 MB of resident memory that can never be freed while the server is alive. A dedicated worker keeps that cost fixed at one instance.
|
||||
|
||||
Batch efficiency matters too: `embedSnippets` already uses `BATCH_SIZE = 50`. A single embedding worker receiving snippet batches from multiple concurrently completing parse jobs can saturate its WASM batch budget (50 texts) far more consistently than individual workers whose parse jobs complete asynchronously.
|
||||
|
||||
### Architecture
|
||||
|
||||
The embedding worker is a **separate, long-lived worker thread** distinct from the parse worker pool:
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ Main Thread │
|
||||
│ │
|
||||
│ WorkerPool (parse workers, concurrency N) │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||||
│ │ Worker 0 │ │ Worker 1 │ │ Worker N │ │
|
||||
│ │ crawl │ │ crawl │ │ crawl │ │
|
||||
│ │ diff │ │ diff │ │ diff │ │
|
||||
│ │ parse │ │ parse │ │ parse │ │
|
||||
│ │ store │ │ store │ │ store │ │
|
||||
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
|
||||
│ │ notify │ notify │ notify │
|
||||
│ └──────────────┴─────────────┘ │
|
||||
│ │ (via main thread broadcast) │
|
||||
│ ┌──────▼───────┐ │
|
||||
│ │ Embed Worker │ ← single instance │
|
||||
│ │ loads model once │
|
||||
│ │ drains snippet_embeddings deficit │
|
||||
│ │ writes embeddings to DB │
|
||||
│ └──────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Communication protocol
|
||||
|
||||
Parse workers do **not** send snippet content to the embedding worker over IPC — that would serialise potentially megabytes of text per job and negate the bandwidth savings of the deficit-drain pattern.
|
||||
|
||||
Instead, the existing `findSnippetIdsMissingEmbeddings` query is the handshake:
|
||||
|
||||
1. Parse worker completes stage `storing` and posts `{ type: 'snippets-ready', repositoryId, versionId }` to the main thread.
|
||||
2. Main thread forwards this to the embedding worker.
|
||||
3. Embedding worker calls `findSnippetIdsMissingEmbeddings(repositoryId, versionId)` on its own DB connection, then runs `embedSnippets()` as it does today.
|
||||
4. Embedding worker posts `{ type: 'embed-progress', jobId, done, total }` back to the main thread at each batch boundary.
|
||||
5. Main thread routes this to the SSE broadcaster → UI updates the embedding progress slice.
|
||||
|
||||
This means:
|
||||
- The embedding worker reads snippet text from the DB itself (no IPC serialisation of content).
|
||||
- The model is loaded once, stays warm, and processes batches from all repositories in FIFO order.
|
||||
- Parse workers are never blocked waiting for embeddings — they complete their job stages and exit immediately.
|
||||
|
||||
### Embedding worker message contract
|
||||
|
||||
```typescript
|
||||
// Main → Embedding worker
|
||||
type EmbedRequest =
|
||||
| { type: 'embed'; jobId: string; repositoryId: string; versionId: string | null }
|
||||
| { type: 'shutdown' };
|
||||
|
||||
// Embedding worker → Main
|
||||
type EmbedResponse =
|
||||
| { type: 'embed-progress'; jobId: string; done: number; total: number }
|
||||
| { type: 'embed-done'; jobId: string }
|
||||
| { type: 'embed-failed'; jobId: string; error: string }
|
||||
| { type: 'ready' }; // emitted once after model warm-up completes
|
||||
```
|
||||
|
||||
The `ready` message allows the server startup sequence to defer routing any embed requests until the model is loaded, preventing a race on first-run.
|
||||
|
||||
---
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. Should `cloneFromAncestor` (TRUEREF-0021) remain synchronous on the parse worker, or be split into its own `cloning` stage with explicit SSE events and a progress count? Given that cloning is a bulk DB operation rather than a per-file loop, a single stage-transition event (`stage: 'cloning'`, no per-row progress) is sufficient.
|
||||
2. Does the `busy_timeout = 5000` setting need to increase under high-concurrency parallel writes, or is 5 s sufficient? Empirically test with `concurrency = 4` + the embedding worker all writing simultaneously before increasing it.
|
||||
3. Should the embedding worker support priority queueing — e.g. embedding the most recently completed parse job first — or is strict FIFO sufficient? FIFO is simpler and correct for the current use case.
|
||||
173
src/lib/server/crawler/github-compare.test.ts
Normal file
173
src/lib/server/crawler/github-compare.test.ts
Normal file
@@ -0,0 +1,173 @@
|
||||
/**
|
||||
* Unit tests for GitHub Compare API client (TRUEREF-0021).
|
||||
*/
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { fetchGitHubChangedFiles } from './github-compare.js';
|
||||
import { GitHubApiError } from './github-tags.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function mockFetch(status: number, body: unknown): void {
|
||||
vi.spyOn(global, 'fetch').mockResolvedValueOnce(
|
||||
new Response(JSON.stringify(body), { status })
|
||||
);
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// fetchGitHubChangedFiles
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('fetchGitHubChangedFiles', () => {
|
||||
it('maps added status correctly', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'ahead',
|
||||
files: [{ filename: 'src/new.ts', status: 'added', sha: 'abc123' }]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]).toMatchObject({ path: 'src/new.ts', status: 'added', sha: 'abc123' });
|
||||
});
|
||||
|
||||
it('maps modified status correctly', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'ahead',
|
||||
files: [{ filename: 'src/index.ts', status: 'modified', sha: 'def456' }]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result[0]).toMatchObject({ path: 'src/index.ts', status: 'modified' });
|
||||
});
|
||||
|
||||
it('maps removed status correctly and omits sha', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'ahead',
|
||||
files: [{ filename: 'src/old.ts', status: 'removed', sha: '000000' }]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result[0]).toMatchObject({ path: 'src/old.ts', status: 'removed' });
|
||||
expect(result[0].sha).toBeUndefined();
|
||||
});
|
||||
|
||||
it('maps renamed status and sets previousPath', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'ahead',
|
||||
files: [
|
||||
{
|
||||
filename: 'src/renamed.ts',
|
||||
status: 'renamed',
|
||||
sha: 'ghi789',
|
||||
previous_filename: 'src/original.ts'
|
||||
}
|
||||
]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result[0]).toMatchObject({
|
||||
path: 'src/renamed.ts',
|
||||
status: 'renamed',
|
||||
previousPath: 'src/original.ts',
|
||||
sha: 'ghi789'
|
||||
});
|
||||
});
|
||||
|
||||
it('returns empty array when compare status is identical', async () => {
|
||||
mockFetch(200, { status: 'identical', files: [] });
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.0.0');
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('returns empty array when compare status is behind', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'behind',
|
||||
files: [{ filename: 'src/index.ts', status: 'modified', sha: 'abc' }]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.1.0', 'v1.0.0');
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('throws GitHubApiError on 401 unauthorized', async () => {
|
||||
mockFetch(401, { message: 'Unauthorized' });
|
||||
await expect(
|
||||
fetchGitHubChangedFiles('owner', 'private-repo', 'v1.0.0', 'v1.1.0')
|
||||
).rejects.toThrow(GitHubApiError);
|
||||
});
|
||||
|
||||
it('throws GitHubApiError on 404 not found', async () => {
|
||||
mockFetch(404, { message: 'Not Found' });
|
||||
await expect(
|
||||
fetchGitHubChangedFiles('owner', 'missing-repo', 'v1.0.0', 'v1.1.0')
|
||||
).rejects.toThrow(GitHubApiError);
|
||||
});
|
||||
|
||||
it('throws GitHubApiError on 422 unprocessable entity', async () => {
|
||||
mockFetch(422, { message: 'Unprocessable Entity' });
|
||||
await expect(
|
||||
fetchGitHubChangedFiles('owner', 'repo', 'bad-ref', 'v1.1.0')
|
||||
).rejects.toThrow(GitHubApiError);
|
||||
});
|
||||
|
||||
it('returns empty array when files property is missing', async () => {
|
||||
mockFetch(200, { status: 'ahead' });
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('returns empty array when files array is empty', async () => {
|
||||
mockFetch(200, { status: 'ahead', files: [] });
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it('maps copied status to modified', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'ahead',
|
||||
files: [{ filename: 'src/copy.ts', status: 'copied', sha: 'jkl012' }]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result[0]).toMatchObject({ path: 'src/copy.ts', status: 'modified' });
|
||||
});
|
||||
|
||||
it('maps changed status to modified', async () => {
|
||||
mockFetch(200, {
|
||||
status: 'ahead',
|
||||
files: [{ filename: 'src/changed.ts', status: 'changed', sha: 'mno345' }]
|
||||
});
|
||||
const result = await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect(result[0]).toMatchObject({ path: 'src/changed.ts', status: 'modified' });
|
||||
});
|
||||
|
||||
it('sends Authorization header when token is provided', async () => {
|
||||
const fetchSpy = vi.spyOn(global, 'fetch').mockResolvedValueOnce(
|
||||
new Response(JSON.stringify({ status: 'ahead', files: [] }), { status: 200 })
|
||||
);
|
||||
await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0', 'my-token');
|
||||
const callArgs = fetchSpy.mock.calls[0];
|
||||
const headers = (callArgs[1] as RequestInit).headers as Record<string, string>;
|
||||
expect(headers['Authorization']).toBe('Bearer my-token');
|
||||
});
|
||||
|
||||
it('does not send Authorization header when no token provided', async () => {
|
||||
const fetchSpy = vi.spyOn(global, 'fetch').mockResolvedValueOnce(
|
||||
new Response(JSON.stringify({ status: 'ahead', files: [] }), { status: 200 })
|
||||
);
|
||||
await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
const callArgs = fetchSpy.mock.calls[0];
|
||||
const headers = (callArgs[1] as RequestInit).headers as Record<string, string>;
|
||||
expect(headers['Authorization']).toBeUndefined();
|
||||
});
|
||||
|
||||
it('throws GitHubApiError with correct status code', async () => {
|
||||
mockFetch(403, { message: 'Forbidden' });
|
||||
try {
|
||||
await fetchGitHubChangedFiles('owner', 'repo', 'v1.0.0', 'v1.1.0');
|
||||
expect.fail('should have thrown');
|
||||
} catch (e) {
|
||||
expect(e).toBeInstanceOf(GitHubApiError);
|
||||
expect((e as GitHubApiError).status).toBe(403);
|
||||
}
|
||||
});
|
||||
});
|
||||
104
src/lib/server/crawler/github-compare.ts
Normal file
104
src/lib/server/crawler/github-compare.ts
Normal file
@@ -0,0 +1,104 @@
|
||||
/**
|
||||
* GitHub Compare API client for differential tag indexing (TRUEREF-0021).
|
||||
*
|
||||
* Uses GET /repos/{owner}/{repo}/compare/{base}...{head} to determine
|
||||
* which files changed between two refs without downloading full trees.
|
||||
*/
|
||||
import { GitHubApiError } from './github-tags.js';
|
||||
import type { ChangedFile } from './types.js';
|
||||
|
||||
const GITHUB_API = 'https://api.github.com';
|
||||
|
||||
interface GitHubCompareFile {
|
||||
filename: string;
|
||||
status: 'added' | 'modified' | 'removed' | 'renamed' | 'copied' | 'changed' | 'unchanged';
|
||||
sha: string;
|
||||
previous_filename?: string;
|
||||
}
|
||||
|
||||
interface GitHubCompareResponse {
|
||||
status: 'diverged' | 'ahead' | 'behind' | 'identical';
|
||||
files?: GitHubCompareFile[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch changed files between two GitHub refs using the Compare API.
|
||||
*
|
||||
* @param owner GitHub owner/org
|
||||
* @param repo GitHub repository name
|
||||
* @param base Base ref (tag, branch, or commit SHA)
|
||||
* @param head Head ref (tag, branch, or commit SHA)
|
||||
* @param token Optional PAT for private repos
|
||||
* @returns Array of ChangedFile objects; empty array when refs are identical or head is behind base
|
||||
*/
|
||||
export async function fetchGitHubChangedFiles(
|
||||
owner: string,
|
||||
repo: string,
|
||||
base: string,
|
||||
head: string,
|
||||
token?: string
|
||||
): Promise<ChangedFile[]> {
|
||||
const url = `${GITHUB_API}/repos/${owner}/${repo}/compare/${base}...${head}?per_page=300`;
|
||||
|
||||
const headers: Record<string, string> = {
|
||||
Accept: 'application/vnd.github+json',
|
||||
'X-GitHub-Api-Version': '2022-11-28',
|
||||
'User-Agent': 'TrueRef/1.0'
|
||||
};
|
||||
if (token) headers['Authorization'] = `Bearer ${token}`;
|
||||
|
||||
const response = await fetch(url, { headers });
|
||||
|
||||
if (!response.ok) {
|
||||
throw new GitHubApiError(response.status);
|
||||
}
|
||||
|
||||
const data = (await response.json()) as GitHubCompareResponse;
|
||||
|
||||
// Identical or behind means no relevant changes to index
|
||||
if (data.status === 'identical' || data.status === 'behind') {
|
||||
return [];
|
||||
}
|
||||
|
||||
if (!data.files || data.files.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return data.files.map((file): ChangedFile => {
|
||||
let status: ChangedFile['status'];
|
||||
|
||||
switch (file.status) {
|
||||
case 'added':
|
||||
status = 'added';
|
||||
break;
|
||||
case 'removed':
|
||||
status = 'removed';
|
||||
break;
|
||||
case 'renamed':
|
||||
status = 'renamed';
|
||||
break;
|
||||
case 'modified':
|
||||
case 'copied':
|
||||
case 'changed':
|
||||
case 'unchanged':
|
||||
default:
|
||||
status = 'modified';
|
||||
break;
|
||||
}
|
||||
|
||||
const result: ChangedFile = {
|
||||
path: file.filename,
|
||||
status
|
||||
};
|
||||
|
||||
if (status === 'renamed' && file.previous_filename) {
|
||||
result.previousPath = file.previous_filename;
|
||||
}
|
||||
|
||||
if (status !== 'removed') {
|
||||
result.sha = file.sha;
|
||||
}
|
||||
|
||||
return result;
|
||||
});
|
||||
}
|
||||
@@ -55,6 +55,21 @@ export interface CrawlOptions {
|
||||
config?: RepoConfig;
|
||||
/** Progress callback invoked after each file is processed */
|
||||
onProgress?: (processed: number, total: number) => void;
|
||||
/**
|
||||
* When provided, the crawler must restrict returned files to only these paths.
|
||||
* Used by the differential indexing pipeline to skip unchanged files.
|
||||
*/
|
||||
allowedPaths?: Set<string>;
|
||||
}
|
||||
|
||||
export interface ChangedFile {
|
||||
/** Path of the file in the new version (head). For renames, this is the destination path. */
|
||||
path: string;
|
||||
status: 'added' | 'modified' | 'removed' | 'renamed';
|
||||
/** Previous path, only set when status === 'renamed' */
|
||||
previousPath?: string;
|
||||
/** Blob SHA of the file content in the head ref (omitted for removed files) */
|
||||
sha?: string;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -13,6 +13,7 @@ export function getClient(): Database.Database {
|
||||
_client = new Database(env.DATABASE_URL);
|
||||
_client.pragma('journal_mode = WAL');
|
||||
_client.pragma('foreign_keys = ON');
|
||||
_client.pragma('busy_timeout = 5000');
|
||||
}
|
||||
return _client;
|
||||
}
|
||||
|
||||
@@ -15,6 +15,10 @@ const client = new Database(env.DATABASE_URL);
|
||||
client.pragma('journal_mode = WAL');
|
||||
// Enforce foreign key constraints.
|
||||
client.pragma('foreign_keys = ON');
|
||||
// Wait up to 5 s when the DB is locked instead of failing immediately.
|
||||
// Prevents SQLITE_BUSY errors when the indexing pipeline holds the write lock
|
||||
// and an HTTP request arrives simultaneously.
|
||||
client.pragma('busy_timeout = 5000');
|
||||
|
||||
export const db = drizzle(client, { schema });
|
||||
|
||||
|
||||
122
src/lib/server/pipeline/differential-strategy.ts
Normal file
122
src/lib/server/pipeline/differential-strategy.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
/**
|
||||
* Differential indexing strategy coordinator (TRUEREF-0021).
|
||||
*
|
||||
* Determines whether differential indexing can be used for a given version tag,
|
||||
* and if so, builds a plan describing which files to clone from the ancestor
|
||||
* and which files to crawl fresh.
|
||||
*/
|
||||
import type Database from 'better-sqlite3';
|
||||
import type { Repository } from '$lib/server/models/repository.js';
|
||||
import type { RepositoryVersion } from '$lib/server/models/repository-version.js';
|
||||
import { RepositoryVersionMapper } from '$lib/server/mappers/repository-version.mapper.js';
|
||||
import type { RepositoryVersionEntity } from '$lib/server/models/repository-version.js';
|
||||
import { findBestAncestorVersion } from '$lib/server/utils/tag-order.js';
|
||||
import { fetchGitHubChangedFiles } from '$lib/server/crawler/github-compare.js';
|
||||
import { getChangedFilesBetweenRefs } from '$lib/server/utils/git.js';
|
||||
import type { ChangedFile } from '$lib/server/crawler/types.js';
|
||||
|
||||
export interface DifferentialPlan {
|
||||
/** Version ID of the closest already-indexed predecessor tag */
|
||||
ancestorVersionId: string;
|
||||
/** Ancestor tag name (needed for git diff / GitHub compare calls) */
|
||||
ancestorTag: string;
|
||||
/** File paths that changed (added + modified + renamed-destination) */
|
||||
changedPaths: Set<string>;
|
||||
/** File paths that were deleted in the target vs ancestor */
|
||||
deletedPaths: Set<string>;
|
||||
/** File paths present in ancestor that are unchanged in target — must be cloned */
|
||||
unchangedPaths: Set<string>;
|
||||
}
|
||||
|
||||
export async function buildDifferentialPlan(params: {
|
||||
repo: Repository;
|
||||
targetTag: string;
|
||||
db: Database.Database;
|
||||
/** Override for testing only */
|
||||
_fetchGitHubChangedFiles?: typeof fetchGitHubChangedFiles;
|
||||
}): Promise<DifferentialPlan | null> {
|
||||
const { repo, targetTag, db } = params;
|
||||
const fetchFn = params._fetchGitHubChangedFiles ?? fetchGitHubChangedFiles;
|
||||
|
||||
try {
|
||||
// 1. Load all indexed versions for this repository
|
||||
const rows = db
|
||||
.prepare(
|
||||
`SELECT * FROM repository_versions WHERE repository_id = ? AND state = 'indexed'`
|
||||
)
|
||||
.all(repo.id) as RepositoryVersionEntity[];
|
||||
|
||||
const indexedVersions: RepositoryVersion[] = rows.map((row) =>
|
||||
RepositoryVersionMapper.fromEntity(row)
|
||||
);
|
||||
|
||||
// 2. Find the best ancestor version
|
||||
const ancestor = findBestAncestorVersion(targetTag, indexedVersions);
|
||||
if (!ancestor) return null;
|
||||
|
||||
// 3. Load ancestor's document file paths
|
||||
const docRows = db
|
||||
.prepare(`SELECT DISTINCT file_path FROM documents WHERE version_id = ?`)
|
||||
.all(ancestor.id) as Array<{ file_path: string }>;
|
||||
|
||||
const ancestorFilePaths = new Set(docRows.map((r) => r.file_path));
|
||||
if (ancestorFilePaths.size === 0) return null;
|
||||
|
||||
// 4. Fetch changed files between ancestor and target
|
||||
let changedFiles: ChangedFile[];
|
||||
|
||||
if (repo.source === 'github') {
|
||||
const url = new URL(repo.sourceUrl);
|
||||
const parts = url.pathname.split('/').filter(Boolean);
|
||||
const owner = parts[0];
|
||||
const repoName = parts[1];
|
||||
changedFiles = await fetchFn(
|
||||
owner,
|
||||
repoName,
|
||||
ancestor.tag,
|
||||
targetTag,
|
||||
repo.githubToken ?? undefined
|
||||
);
|
||||
} else {
|
||||
changedFiles = getChangedFilesBetweenRefs({
|
||||
repoPath: repo.sourceUrl,
|
||||
base: ancestor.tag,
|
||||
head: targetTag
|
||||
});
|
||||
}
|
||||
|
||||
// 5. Partition changed files into changed and deleted sets
|
||||
const changedPaths = new Set<string>();
|
||||
const deletedPaths = new Set<string>();
|
||||
|
||||
for (const file of changedFiles) {
|
||||
if (file.status === 'removed') {
|
||||
deletedPaths.add(file.path);
|
||||
} else {
|
||||
changedPaths.add(file.path);
|
||||
}
|
||||
}
|
||||
|
||||
// 6. Compute unchanged paths: ancestor paths minus changed minus deleted
|
||||
const unchangedPaths = new Set<string>();
|
||||
for (const p of ancestorFilePaths) {
|
||||
if (!changedPaths.has(p) && !deletedPaths.has(p)) {
|
||||
unchangedPaths.add(p);
|
||||
}
|
||||
}
|
||||
|
||||
// 7. Return null when there's nothing to clone (all files changed)
|
||||
if (unchangedPaths.size === 0) return null;
|
||||
|
||||
return {
|
||||
ancestorVersionId: ancestor.id,
|
||||
ancestorTag: ancestor.tag,
|
||||
changedPaths,
|
||||
deletedPaths,
|
||||
unchangedPaths
|
||||
};
|
||||
} catch {
|
||||
// Fail-safe: fall back to full crawl on any error
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import { JobQueue } from './job-queue.js';
|
||||
import { IndexingPipeline } from './indexing.pipeline.js';
|
||||
import { recoverStaleJobs } from './startup.js';
|
||||
import { EmbeddingService } from '$lib/server/embeddings/embedding.service.js';
|
||||
import * as diffStrategy from './differential-strategy.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test DB factory
|
||||
@@ -1019,3 +1020,290 @@ describe('IndexingPipeline', () => {
|
||||
expect(rules).toEqual(['v3: use the streaming API.']);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// differential indexing
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('differential indexing', () => {
|
||||
let db: Database.Database;
|
||||
|
||||
beforeEach(() => {
|
||||
db = createTestDb();
|
||||
insertRepo(db, { source: 'local', source_url: '/tmp/test-repo' });
|
||||
});
|
||||
|
||||
function insertDocument(
|
||||
localDb: Database.Database,
|
||||
overrides: Partial<Record<string, unknown>> = {}
|
||||
): string {
|
||||
const id = crypto.randomUUID();
|
||||
localDb
|
||||
.prepare(
|
||||
`INSERT INTO documents (id, repository_id, version_id, file_path, title, language, token_count, checksum, indexed_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
||||
)
|
||||
.run(
|
||||
(overrides.id as string) ?? id,
|
||||
(overrides.repository_id as string) ?? '/test/repo',
|
||||
(overrides.version_id as string | null) ?? null,
|
||||
(overrides.file_path as string) ?? 'README.md',
|
||||
null,
|
||||
'markdown',
|
||||
100,
|
||||
(overrides.checksum as string) ?? 'abc123',
|
||||
Math.floor(Date.now() / 1000)
|
||||
);
|
||||
return (overrides.id as string) ?? id;
|
||||
}
|
||||
|
||||
function insertSnippet(
|
||||
localDb: Database.Database,
|
||||
documentId: string,
|
||||
overrides: Partial<Record<string, unknown>> = {}
|
||||
): string {
|
||||
const id = crypto.randomUUID();
|
||||
localDb
|
||||
.prepare(
|
||||
`INSERT INTO snippets (id, document_id, repository_id, version_id, type, title, content, language, breadcrumb, token_count, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
||||
)
|
||||
.run(
|
||||
(overrides.id as string) ?? id,
|
||||
documentId,
|
||||
(overrides.repository_id as string) ?? '/test/repo',
|
||||
(overrides.version_id as string | null) ?? null,
|
||||
'info',
|
||||
null,
|
||||
'content',
|
||||
'markdown',
|
||||
null,
|
||||
10,
|
||||
Math.floor(Date.now() / 1000)
|
||||
);
|
||||
return (overrides.id as string) ?? id;
|
||||
}
|
||||
|
||||
type PipelineInternals = IndexingPipeline & {
|
||||
cloneFromAncestor: (
|
||||
ancestorVersionId: string,
|
||||
targetVersionId: string,
|
||||
repositoryId: string,
|
||||
unchangedPaths: Set<string>
|
||||
) => void;
|
||||
};
|
||||
|
||||
it('cloneFromAncestor inserts documents and snippets into the target version', () => {
|
||||
const ancestorVersionId = insertVersion(db, { tag: 'v1.0.0', state: 'indexed' });
|
||||
const targetVersionId = insertVersion(db, { tag: 'v1.1.0', state: 'pending' });
|
||||
|
||||
const doc1Id = insertDocument(db, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: ancestorVersionId,
|
||||
file_path: 'README.md',
|
||||
checksum: 'sha-readme'
|
||||
});
|
||||
const doc2Id = insertDocument(db, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: ancestorVersionId,
|
||||
file_path: 'src/index.ts',
|
||||
checksum: 'sha-index'
|
||||
});
|
||||
insertSnippet(db, doc1Id, { repository_id: '/test/repo', version_id: ancestorVersionId });
|
||||
insertSnippet(db, doc2Id, { repository_id: '/test/repo', version_id: ancestorVersionId });
|
||||
|
||||
const pipeline = new IndexingPipeline(
|
||||
db,
|
||||
vi.fn() as never,
|
||||
{ crawl: vi.fn() } as never,
|
||||
null
|
||||
);
|
||||
(pipeline as unknown as PipelineInternals).cloneFromAncestor(
|
||||
ancestorVersionId,
|
||||
targetVersionId,
|
||||
'/test/repo',
|
||||
new Set(['README.md', 'src/index.ts'])
|
||||
);
|
||||
|
||||
const targetDocs = db
|
||||
.prepare(`SELECT * FROM documents WHERE version_id = ?`)
|
||||
.all(targetVersionId) as { id: string; file_path: string }[];
|
||||
expect(targetDocs).toHaveLength(2);
|
||||
expect(targetDocs.map((d) => d.file_path).sort()).toEqual(
|
||||
['README.md', 'src/index.ts'].sort()
|
||||
);
|
||||
// New IDs must differ from ancestor doc IDs.
|
||||
const targetDocIds = targetDocs.map((d) => d.id);
|
||||
expect(targetDocIds).not.toContain(doc1Id);
|
||||
expect(targetDocIds).not.toContain(doc2Id);
|
||||
|
||||
const targetSnippets = db
|
||||
.prepare(`SELECT * FROM snippets WHERE version_id = ?`)
|
||||
.all(targetVersionId) as { id: string }[];
|
||||
expect(targetSnippets).toHaveLength(2);
|
||||
});
|
||||
|
||||
it('cloneFromAncestor silently skips paths absent from the ancestor', () => {
|
||||
const ancestorVersionId = insertVersion(db, { tag: 'v1.0.0', state: 'indexed' });
|
||||
const targetVersionId = insertVersion(db, { tag: 'v1.1.0', state: 'pending' });
|
||||
|
||||
insertDocument(db, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: ancestorVersionId,
|
||||
file_path: 'src/main.ts',
|
||||
checksum: 'sha-main'
|
||||
});
|
||||
|
||||
const pipeline = new IndexingPipeline(
|
||||
db,
|
||||
vi.fn() as never,
|
||||
{ crawl: vi.fn() } as never,
|
||||
null
|
||||
);
|
||||
(pipeline as unknown as PipelineInternals).cloneFromAncestor(
|
||||
ancestorVersionId,
|
||||
targetVersionId,
|
||||
'/test/repo',
|
||||
new Set(['src/main.ts', 'MISSING.md'])
|
||||
);
|
||||
|
||||
const targetDocs = db
|
||||
.prepare(`SELECT * FROM documents WHERE version_id = ?`)
|
||||
.all(targetVersionId) as { id: string; file_path: string }[];
|
||||
expect(targetDocs).toHaveLength(1);
|
||||
expect(targetDocs[0].file_path).toBe('src/main.ts');
|
||||
});
|
||||
|
||||
it('falls back to full crawl when no indexed ancestor exists', async () => {
|
||||
const targetVersionId = insertVersion(db, { tag: 'v1.0.0', state: 'pending' });
|
||||
|
||||
const files = [
|
||||
{
|
||||
path: 'README.md',
|
||||
content: '# Hello\n\nThis is documentation.',
|
||||
sha: 'sha-readme',
|
||||
language: 'markdown'
|
||||
},
|
||||
{
|
||||
path: 'src/index.ts',
|
||||
content: 'export const x = 1;',
|
||||
sha: 'sha-index',
|
||||
language: 'typescript'
|
||||
}
|
||||
];
|
||||
|
||||
const mockLocalCrawl = vi.fn().mockResolvedValue({
|
||||
files,
|
||||
totalFiles: 2,
|
||||
skippedFiles: 0,
|
||||
branch: 'main',
|
||||
commitSha: 'abc'
|
||||
});
|
||||
|
||||
const pipeline = new IndexingPipeline(
|
||||
db,
|
||||
vi.fn() as never,
|
||||
{ crawl: mockLocalCrawl } as never,
|
||||
null
|
||||
);
|
||||
|
||||
const jobId = insertJob(db, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: targetVersionId,
|
||||
status: 'queued'
|
||||
});
|
||||
const job = db.prepare(`SELECT * FROM indexing_jobs WHERE id = ?`).get(jobId) as never;
|
||||
|
||||
await pipeline.run(job);
|
||||
|
||||
const updatedJob = db
|
||||
.prepare(`SELECT status FROM indexing_jobs WHERE id = ?`)
|
||||
.get(jobId) as { status: string };
|
||||
expect(updatedJob.status).toBe('done');
|
||||
|
||||
const docs = db
|
||||
.prepare(`SELECT * FROM documents WHERE version_id = ?`)
|
||||
.all(targetVersionId) as { id: string }[];
|
||||
expect(docs.length).toBeGreaterThanOrEqual(2);
|
||||
});
|
||||
|
||||
it('cloned unchanged documents survive the diff/replace stage', async () => {
|
||||
// 1. Set up ancestor and target versions.
|
||||
const ancestorVersionId = insertVersion(db, { tag: 'v1.0.0', state: 'indexed' });
|
||||
const targetVersionId = insertVersion(db, { tag: 'v1.1.0', state: 'pending' });
|
||||
|
||||
// 2. Insert ancestor doc + snippet for unchanged.md.
|
||||
const ancestorDocId = insertDocument(db, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: ancestorVersionId,
|
||||
file_path: 'unchanged.md',
|
||||
checksum: 'sha-unchanged'
|
||||
});
|
||||
insertSnippet(db, ancestorDocId, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: ancestorVersionId
|
||||
});
|
||||
|
||||
// 3. Crawl returns ONLY changed.md (unchanged.md is absent — differential only).
|
||||
const mockLocalCrawl = vi.fn().mockResolvedValue({
|
||||
files: [
|
||||
{
|
||||
path: 'changed.md',
|
||||
content: '# Changed\n\nThis file was added.',
|
||||
sha: 'sha-changed',
|
||||
language: 'markdown'
|
||||
}
|
||||
],
|
||||
totalFiles: 1,
|
||||
skippedFiles: 0,
|
||||
branch: 'main',
|
||||
commitSha: 'abc'
|
||||
});
|
||||
|
||||
// 4. Mock buildDifferentialPlan to return a plan with the two paths.
|
||||
const mockPlan = {
|
||||
ancestorVersionId,
|
||||
ancestorTag: 'v1.0.0',
|
||||
changedPaths: new Set(['changed.md']),
|
||||
deletedPaths: new Set<string>(),
|
||||
unchangedPaths: new Set(['unchanged.md'])
|
||||
};
|
||||
const spy = vi
|
||||
.spyOn(diffStrategy, 'buildDifferentialPlan')
|
||||
.mockResolvedValueOnce(mockPlan);
|
||||
|
||||
const pipeline = new IndexingPipeline(
|
||||
db,
|
||||
vi.fn() as never,
|
||||
{ crawl: mockLocalCrawl } as never,
|
||||
null
|
||||
);
|
||||
|
||||
// 5. Run pipeline for the target version job.
|
||||
const jobId = insertJob(db, {
|
||||
repository_id: '/test/repo',
|
||||
version_id: targetVersionId,
|
||||
status: 'queued'
|
||||
});
|
||||
const job = db.prepare(`SELECT * FROM indexing_jobs WHERE id = ?`).get(jobId) as never;
|
||||
await pipeline.run(job);
|
||||
|
||||
spy.mockRestore();
|
||||
|
||||
// 6. Assert job completed and both docs exist under the target version.
|
||||
const finalJob = db
|
||||
.prepare(`SELECT status FROM indexing_jobs WHERE id = ?`)
|
||||
.get(jobId) as { status: string };
|
||||
expect(finalJob.status).toBe('done');
|
||||
|
||||
const targetDocs = db
|
||||
.prepare(`SELECT file_path FROM documents WHERE version_id = ?`)
|
||||
.all(targetVersionId) as { file_path: string }[];
|
||||
const filePaths = targetDocs.map((d) => d.file_path);
|
||||
|
||||
// unchanged.md was cloned and must NOT have been deleted by computeDiff.
|
||||
expect(filePaths).toContain('unchanged.md');
|
||||
// changed.md was crawled and indexed in this run.
|
||||
expect(filePaths).toContain('changed.md');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -26,6 +26,7 @@ import { resolveConfig, type ParsedConfig } from '$lib/server/config/config-pars
|
||||
import { parseFile } from '$lib/server/parser/index.js';
|
||||
import { computeTrustScore } from '$lib/server/search/trust-score.js';
|
||||
import { computeDiff } from './diff.js';
|
||||
import { buildDifferentialPlan, type DifferentialPlan } from './differential-strategy.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Progress calculation
|
||||
@@ -95,11 +96,44 @@ export class IndexingPipeline {
|
||||
this.updateVersion(normJob.versionId, { state: 'indexing' });
|
||||
}
|
||||
|
||||
// ---- Stage 1: Crawl -------------------------------------------------
|
||||
const versionTag = normJob.versionId
|
||||
? this.getVersionTag(normJob.versionId)
|
||||
: undefined;
|
||||
const crawlResult = await this.crawl(repo, versionTag);
|
||||
|
||||
// ---- Stage 0: Differential strategy (TRUEREF-0021) ----------------------
|
||||
// When indexing a tagged version, check if we can inherit unchanged files
|
||||
// from an already-indexed ancestor version instead of crawling everything.
|
||||
let differentialPlan: DifferentialPlan | null = null;
|
||||
if (normJob.versionId && versionTag) {
|
||||
differentialPlan = await buildDifferentialPlan({
|
||||
repo,
|
||||
targetTag: versionTag,
|
||||
db: this.db
|
||||
}).catch((err) => {
|
||||
console.warn(
|
||||
`[IndexingPipeline] Differential plan failed, falling back to full crawl: ${err instanceof Error ? err.message : String(err)}`
|
||||
);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
// If a differential plan exists, clone unchanged files from ancestor.
|
||||
if (differentialPlan && differentialPlan.unchangedPaths.size > 0) {
|
||||
this.cloneFromAncestor(
|
||||
differentialPlan.ancestorVersionId,
|
||||
normJob.versionId!,
|
||||
repo.id,
|
||||
differentialPlan.unchangedPaths
|
||||
);
|
||||
console.info(
|
||||
`[IndexingPipeline] Differential indexing: cloned ${differentialPlan.unchangedPaths.size} unchanged files from ${differentialPlan.ancestorTag}`
|
||||
);
|
||||
}
|
||||
|
||||
// ---- Stage 1: Crawl -------------------------------------------------
|
||||
// Pass changedPaths as allowlist so crawl only fetches/returns changed files.
|
||||
const crawlAllowedPaths = differentialPlan ? differentialPlan.changedPaths : undefined;
|
||||
const crawlResult = await this.crawl(repo, versionTag, crawlAllowedPaths);
|
||||
|
||||
// Resolve trueref.json / context7.json configuration.
|
||||
// Prefer the pre-parsed config carried in the CrawlResult (set by
|
||||
@@ -137,7 +171,16 @@ export class IndexingPipeline {
|
||||
// Load all existing documents for this repo so computeDiff can
|
||||
// classify every crawled file and detect deletions.
|
||||
const existingDocs = this.getExistingDocuments(repo.id, normJob.versionId);
|
||||
const diff = computeDiff(filteredFiles, existingDocs);
|
||||
|
||||
// Exclude files that were cloned from the ancestor — they are not candidates
|
||||
// for deletion or re-processing (computeDiff must not see them in existingDocs).
|
||||
const clonedPaths = differentialPlan?.unchangedPaths ?? new Set<string>();
|
||||
const existingDocsForDiff =
|
||||
clonedPaths.size > 0
|
||||
? existingDocs.filter((d) => !clonedPaths.has(d.filePath))
|
||||
: existingDocs;
|
||||
|
||||
const diff = computeDiff(filteredFiles, existingDocsForDiff);
|
||||
|
||||
// Accumulate new documents/snippets; skip unchanged files.
|
||||
const newDocuments: NewDocument[] = [];
|
||||
@@ -146,11 +189,11 @@ export class IndexingPipeline {
|
||||
|
||||
// Schedule stale documents (modified + deleted) for deletion.
|
||||
for (const file of diff.modified) {
|
||||
const existing = existingDocs.find((d) => d.filePath === file.path);
|
||||
const existing = existingDocsForDiff.find((d) => d.filePath === file.path);
|
||||
if (existing) changedDocIds.push(existing.id);
|
||||
}
|
||||
for (const filePath of diff.deleted) {
|
||||
const existing = existingDocs.find((d) => d.filePath === filePath);
|
||||
const existing = existingDocsForDiff.find((d) => d.filePath === filePath);
|
||||
if (existing) changedDocIds.push(existing.id);
|
||||
}
|
||||
|
||||
@@ -172,7 +215,19 @@ export class IndexingPipeline {
|
||||
this.updateJob(job.id, { processedFiles, progress: initialProgress });
|
||||
}
|
||||
|
||||
// Yield the event loop and flush progress every N files.
|
||||
// Lower = more responsive UI; higher = less overhead.
|
||||
const YIELD_EVERY = 20;
|
||||
|
||||
for (const [i, file] of filesToProcess.entries()) {
|
||||
// Yield the Node.js event loop periodically so the HTTP server can
|
||||
// handle incoming requests (navigation, polling) between file parses.
|
||||
// Without this, the synchronous parse + SQLite work blocks the thread
|
||||
// entirely and the UI becomes unresponsive during indexing.
|
||||
if (i > 0 && i % YIELD_EVERY === 0) {
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
}
|
||||
|
||||
const checksum = file.sha || sha256(file.content);
|
||||
|
||||
// Create new document record.
|
||||
@@ -204,16 +259,20 @@ export class IndexingPipeline {
|
||||
newDocuments.push(newDoc);
|
||||
newSnippets.push(...snippets);
|
||||
|
||||
// Count ALL files (including skipped unchanged ones) in progress.
|
||||
// Write progress to the DB only on yield boundaries or the final file.
|
||||
// Avoids a synchronous SQLite UPDATE on every single iteration.
|
||||
const totalProcessed = diff.unchanged.length + i + 1;
|
||||
const progress = calculateProgress(
|
||||
totalProcessed,
|
||||
totalFiles,
|
||||
0,
|
||||
0,
|
||||
this.embeddingService !== null
|
||||
);
|
||||
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
|
||||
const isLast = i === filesToProcess.length - 1;
|
||||
if (isLast || i % YIELD_EVERY === YIELD_EVERY - 1) {
|
||||
const progress = calculateProgress(
|
||||
totalProcessed,
|
||||
totalFiles,
|
||||
0,
|
||||
0,
|
||||
this.embeddingService !== null
|
||||
);
|
||||
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
|
||||
}
|
||||
}
|
||||
|
||||
// After the loop processedFiles should reflect the full count.
|
||||
@@ -316,7 +375,7 @@ export class IndexingPipeline {
|
||||
// Private — crawl
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private async crawl(repo: Repository, ref?: string): Promise<{
|
||||
private async crawl(repo: Repository, ref?: string, allowedPaths?: Set<string>): Promise<{
|
||||
files: Array<{ path: string; content: string; sha: string; size: number; language: string }>;
|
||||
totalFiles: number;
|
||||
/** Pre-parsed trueref.json / context7.json, or undefined when absent. */
|
||||
@@ -339,7 +398,12 @@ export class IndexingPipeline {
|
||||
token: repo.githubToken ?? undefined
|
||||
});
|
||||
|
||||
return { files: result.files, totalFiles: result.totalFiles };
|
||||
// Apply allowedPaths filter for differential indexing.
|
||||
const githubFinalFiles =
|
||||
allowedPaths && allowedPaths.size > 0
|
||||
? result.files.filter((f) => allowedPaths.has(f.path))
|
||||
: result.files;
|
||||
return { files: githubFinalFiles, totalFiles: result.totalFiles };
|
||||
} else {
|
||||
// Local filesystem crawl.
|
||||
const result = await this.localCrawler.crawl({
|
||||
@@ -347,7 +411,12 @@ export class IndexingPipeline {
|
||||
ref: ref ?? (repo.branch !== 'main' ? (repo.branch ?? undefined) : undefined)
|
||||
});
|
||||
|
||||
return { files: result.files, totalFiles: result.totalFiles, config: result.config };
|
||||
// Apply allowedPaths filter for differential indexing.
|
||||
const localFinalFiles =
|
||||
allowedPaths && allowedPaths.size > 0
|
||||
? result.files.filter((f) => allowedPaths.has(f.path))
|
||||
: result.files;
|
||||
return { files: localFinalFiles, totalFiles: result.totalFiles, config: result.config };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -358,6 +427,146 @@ export class IndexingPipeline {
|
||||
return row?.tag;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Private — differential clone (TRUEREF-0021)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Clone documents, snippets, and embeddings from an ancestor version into
|
||||
* the target version for all unchanged file paths.
|
||||
*
|
||||
* Runs in a single SQLite transaction for atomicity.
|
||||
*/
|
||||
private cloneFromAncestor(
|
||||
ancestorVersionId: string,
|
||||
targetVersionId: string,
|
||||
repositoryId: string,
|
||||
unchangedPaths: Set<string>
|
||||
): void {
|
||||
this.db.transaction(() => {
|
||||
const pathList = [...unchangedPaths];
|
||||
const placeholders = pathList.map(() => '?').join(',');
|
||||
const ancestorDocs = this.db
|
||||
.prepare(
|
||||
`SELECT * FROM documents WHERE version_id = ? AND file_path IN (${placeholders})`
|
||||
)
|
||||
.all(ancestorVersionId, ...pathList) as Array<{
|
||||
id: string;
|
||||
repository_id: string;
|
||||
file_path: string;
|
||||
title: string | null;
|
||||
language: string | null;
|
||||
token_count: number;
|
||||
checksum: string;
|
||||
indexed_at: number;
|
||||
}>;
|
||||
|
||||
const docIdMap = new Map<string, string>();
|
||||
const nowEpoch = Math.floor(Date.now() / 1000);
|
||||
|
||||
for (const doc of ancestorDocs) {
|
||||
const newDocId = randomUUID();
|
||||
docIdMap.set(doc.id, newDocId);
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO documents (id, repository_id, version_id, file_path, title, language, token_count, checksum, indexed_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
||||
)
|
||||
.run(
|
||||
newDocId,
|
||||
repositoryId,
|
||||
targetVersionId,
|
||||
doc.file_path,
|
||||
doc.title,
|
||||
doc.language,
|
||||
doc.token_count,
|
||||
doc.checksum,
|
||||
nowEpoch
|
||||
);
|
||||
}
|
||||
|
||||
if (docIdMap.size === 0) return;
|
||||
|
||||
const oldDocIds = [...docIdMap.keys()];
|
||||
const snippetPlaceholders = oldDocIds.map(() => '?').join(',');
|
||||
const ancestorSnippets = this.db
|
||||
.prepare(
|
||||
`SELECT * FROM snippets WHERE document_id IN (${snippetPlaceholders})`
|
||||
)
|
||||
.all(...oldDocIds) as Array<{
|
||||
id: string;
|
||||
document_id: string;
|
||||
repository_id: string;
|
||||
version_id: string | null;
|
||||
type: string;
|
||||
title: string | null;
|
||||
content: string;
|
||||
language: string | null;
|
||||
breadcrumb: string | null;
|
||||
token_count: number;
|
||||
created_at: number;
|
||||
}>;
|
||||
|
||||
const snippetIdMap = new Map<string, string>();
|
||||
for (const snippet of ancestorSnippets) {
|
||||
const newSnippetId = randomUUID();
|
||||
snippetIdMap.set(snippet.id, newSnippetId);
|
||||
const newDocId = docIdMap.get(snippet.document_id)!;
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO snippets (id, document_id, repository_id, version_id, type, title, content, language, breadcrumb, token_count, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
||||
)
|
||||
.run(
|
||||
newSnippetId,
|
||||
newDocId,
|
||||
repositoryId,
|
||||
targetVersionId,
|
||||
snippet.type,
|
||||
snippet.title,
|
||||
snippet.content,
|
||||
snippet.language,
|
||||
snippet.breadcrumb,
|
||||
snippet.token_count,
|
||||
snippet.created_at
|
||||
);
|
||||
}
|
||||
|
||||
if (snippetIdMap.size > 0) {
|
||||
const oldSnippetIds = [...snippetIdMap.keys()];
|
||||
const embPlaceholders = oldSnippetIds.map(() => '?').join(',');
|
||||
const ancestorEmbeddings = this.db
|
||||
.prepare(
|
||||
`SELECT * FROM snippet_embeddings WHERE snippet_id IN (${embPlaceholders})`
|
||||
)
|
||||
.all(...oldSnippetIds) as Array<{
|
||||
snippet_id: string;
|
||||
profile_id: string;
|
||||
model: string;
|
||||
dimensions: number;
|
||||
embedding: Buffer;
|
||||
created_at: number;
|
||||
}>;
|
||||
for (const emb of ancestorEmbeddings) {
|
||||
const newSnippetId = snippetIdMap.get(emb.snippet_id)!;
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO snippet_embeddings (snippet_id, profile_id, model, dimensions, embedding, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)`
|
||||
)
|
||||
.run(
|
||||
newSnippetId,
|
||||
emb.profile_id,
|
||||
emb.model,
|
||||
emb.dimensions,
|
||||
emb.embedding,
|
||||
emb.created_at
|
||||
);
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Private — atomic snippet replacement
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -7,9 +7,10 @@
|
||||
* - File extraction via `git archive` to temp directories
|
||||
*/
|
||||
|
||||
import { execSync } from 'node:child_process';
|
||||
import { execSync, execFileSync } from 'node:child_process';
|
||||
import { mkdirSync, rmSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import type { ChangedFile } from '../crawler/types.js';
|
||||
|
||||
export interface ResolveTagOptions {
|
||||
repoPath: string;
|
||||
@@ -158,3 +159,55 @@ export function cleanupTempExtraction(extractPath: string): void {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export interface LocalChangedFileOptions {
|
||||
repoPath: string;
|
||||
base: string;
|
||||
head: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of files that differ between two git refs (tags, branches, commits).
|
||||
*
|
||||
* Uses `git diff --name-status` which produces tab-separated lines in formats:
|
||||
* M\tpath
|
||||
* A\tpath
|
||||
* D\tpath
|
||||
* R85\told-path\tnew-path
|
||||
*
|
||||
* @returns Array of ChangedFile objects
|
||||
* @throws Error when git command fails
|
||||
*/
|
||||
export function getChangedFilesBetweenRefs(options: LocalChangedFileOptions): ChangedFile[] {
|
||||
const { repoPath, base, head } = options;
|
||||
|
||||
try {
|
||||
const output = execFileSync('git', ['-C', repoPath, 'diff', '--name-status', base, head], {
|
||||
encoding: 'utf-8',
|
||||
stdio: ['ignore', 'pipe', 'pipe']
|
||||
}).trim();
|
||||
|
||||
if (!output) return [];
|
||||
|
||||
const results: ChangedFile[] = [];
|
||||
for (const line of output.split('\n')) {
|
||||
if (!line) continue;
|
||||
const parts = line.split('\t');
|
||||
const statusCode = parts[0];
|
||||
if (statusCode === 'A') {
|
||||
results.push({ path: parts[1], status: 'added' });
|
||||
} else if (statusCode === 'M') {
|
||||
results.push({ path: parts[1], status: 'modified' });
|
||||
} else if (statusCode === 'D') {
|
||||
results.push({ path: parts[1], status: 'removed' });
|
||||
} else if (statusCode.startsWith('R')) {
|
||||
results.push({ path: parts[2], status: 'renamed', previousPath: parts[1] });
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (error) {
|
||||
throw new Error(
|
||||
`Failed to get changed files between '${base}' and '${head}' in ${repoPath}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
123
src/lib/server/utils/tag-order.test.ts
Normal file
123
src/lib/server/utils/tag-order.test.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
/**
|
||||
* Unit tests for tag-order utilities (TRUEREF-0021).
|
||||
*/
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { findBestAncestorVersion } from './tag-order.js';
|
||||
import { RepositoryVersion } from '$lib/server/models/repository-version.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeVersion(tag: string, state: RepositoryVersion['state'] = 'indexed'): RepositoryVersion {
|
||||
return new RepositoryVersion({
|
||||
id: `/facebook/react/${tag}`,
|
||||
repositoryId: '/facebook/react',
|
||||
tag,
|
||||
title: null,
|
||||
commitHash: null,
|
||||
state,
|
||||
totalSnippets: 0,
|
||||
indexedAt: new Date(),
|
||||
createdAt: new Date()
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// findBestAncestorVersion
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('findBestAncestorVersion', () => {
|
||||
it('returns null when candidates array is empty', () => {
|
||||
expect(findBestAncestorVersion('v2.1.0', [])).toBeNull();
|
||||
});
|
||||
|
||||
it('returns null when no candidates have state === indexed', () => {
|
||||
const candidates = [
|
||||
makeVersion('v1.0.0', 'pending'),
|
||||
makeVersion('v1.1.0', 'indexing'),
|
||||
makeVersion('v2.0.0', 'error')
|
||||
];
|
||||
expect(findBestAncestorVersion('v2.1.0', candidates)).toBeNull();
|
||||
});
|
||||
|
||||
it('returns the nearest semver predecessor from a list', () => {
|
||||
const candidates = [
|
||||
makeVersion('v1.0.0'),
|
||||
makeVersion('v1.1.0'),
|
||||
makeVersion('v2.0.0')
|
||||
];
|
||||
const result = findBestAncestorVersion('v2.1.0', candidates);
|
||||
expect(result?.tag).toBe('v2.0.0');
|
||||
});
|
||||
|
||||
it('handles v-prefix stripping correctly', () => {
|
||||
const candidates = [
|
||||
makeVersion('v1.0.0'),
|
||||
makeVersion('v1.5.0'),
|
||||
makeVersion('v2.0.0')
|
||||
];
|
||||
const result = findBestAncestorVersion('v2.0.1', candidates);
|
||||
expect(result?.tag).toBe('v2.0.0');
|
||||
});
|
||||
|
||||
it('returns null when all candidates are after current tag', () => {
|
||||
const candidates = [makeVersion('v2.0.0')];
|
||||
expect(findBestAncestorVersion('v1.0.0', candidates)).toBeNull();
|
||||
});
|
||||
|
||||
it('returns null when all candidates equal the current tag', () => {
|
||||
const candidates = [makeVersion('v1.0.0'), makeVersion('v2.0.0')];
|
||||
expect(findBestAncestorVersion('v1.0.0', candidates)).toBeNull();
|
||||
});
|
||||
|
||||
it('handles tag lists without semver format using lexicographic fallback', () => {
|
||||
const candidates = [
|
||||
makeVersion('release-alpha'),
|
||||
makeVersion('release-beta'),
|
||||
makeVersion('release-gamma')
|
||||
];
|
||||
const result = findBestAncestorVersion('release-zeta', candidates);
|
||||
expect(result).not.toBeNull();
|
||||
// Lexicographic: all are "less than" release-zeta, so the max is release-gamma
|
||||
expect(result?.tag).toBe('release-gamma');
|
||||
});
|
||||
|
||||
it('returns single candidate that is older than current tag', () => {
|
||||
const candidates = [makeVersion('v1.0.0')];
|
||||
const result = findBestAncestorVersion('v2.0.0', candidates);
|
||||
expect(result?.tag).toBe('v1.0.0');
|
||||
});
|
||||
|
||||
it('ignores non-indexed versions even when they are valid predecessors', () => {
|
||||
const candidates = [
|
||||
makeVersion('v1.0.0', 'indexed'),
|
||||
makeVersion('v1.5.0', 'pending'),
|
||||
makeVersion('v1.8.0', 'error')
|
||||
];
|
||||
const result = findBestAncestorVersion('v2.0.0', candidates);
|
||||
expect(result?.tag).toBe('v1.0.0');
|
||||
});
|
||||
|
||||
it('correctly handles pre-release versions (pre-release < release)', () => {
|
||||
const candidates = [
|
||||
makeVersion('v2.0.0-alpha'),
|
||||
makeVersion('v2.0.0-beta'),
|
||||
makeVersion('v1.9.0')
|
||||
];
|
||||
// v2.0.0 is target; pre-releases are stricter: v2.0.0-alpha < v2.0.0
|
||||
const result = findBestAncestorVersion('v2.0.0', candidates);
|
||||
expect(result?.tag).toBe('v2.0.0-beta');
|
||||
});
|
||||
|
||||
it('selects closest minor version as predecessor', () => {
|
||||
const candidates = [
|
||||
makeVersion('v1.0.0'),
|
||||
makeVersion('v1.1.0'),
|
||||
makeVersion('v1.2.0'),
|
||||
makeVersion('v1.3.0')
|
||||
];
|
||||
const result = findBestAncestorVersion('v1.4.0', candidates);
|
||||
expect(result?.tag).toBe('v1.3.0');
|
||||
});
|
||||
});
|
||||
88
src/lib/server/utils/tag-order.ts
Normal file
88
src/lib/server/utils/tag-order.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
/**
|
||||
* Tag ordering and ancestor selection for differential indexing (TRUEREF-0021).
|
||||
*/
|
||||
import type { RepositoryVersion } from '$lib/server/models/repository-version.js';
|
||||
|
||||
interface ParsedVersion {
|
||||
major: number;
|
||||
minor: number;
|
||||
patch: number;
|
||||
prerelease: string[];
|
||||
}
|
||||
|
||||
function parseVersion(tag: string): ParsedVersion | null {
|
||||
const stripped = tag.startsWith('v') ? tag.slice(1) : tag;
|
||||
const dashIndex = stripped.indexOf('-');
|
||||
const versionPart = dashIndex === -1 ? stripped : stripped.slice(0, dashIndex);
|
||||
const prereleaseStr = dashIndex === -1 ? '' : stripped.slice(dashIndex + 1);
|
||||
|
||||
const segments = versionPart.split('.');
|
||||
if (segments.length < 1 || segments.some((s) => !/^\d+$/.test(s))) return null;
|
||||
|
||||
const [majorStr, minorStr = '0', patchStr = '0'] = segments;
|
||||
const major = Number(majorStr);
|
||||
const minor = Number(minorStr);
|
||||
const patch = Number(patchStr);
|
||||
|
||||
const prerelease = prereleaseStr ? prereleaseStr.split('.') : [];
|
||||
|
||||
return { major, minor, patch, prerelease };
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare two version tags. Returns negative if a < b, positive if a > b, 0 if equal.
|
||||
*/
|
||||
function compareTagVersions(tagA: string, tagB: string): number {
|
||||
const a = parseVersion(tagA);
|
||||
const b = parseVersion(tagB);
|
||||
|
||||
if (!a || !b) {
|
||||
// Fall back to lexicographic comparison when semver parsing fails
|
||||
return tagA.localeCompare(tagB);
|
||||
}
|
||||
|
||||
if (a.major !== b.major) return a.major - b.major;
|
||||
if (a.minor !== b.minor) return a.minor - b.minor;
|
||||
if (a.patch !== b.patch) return a.patch - b.patch;
|
||||
|
||||
// Pre-release versions have lower precedence than the release version
|
||||
if (a.prerelease.length === 0 && b.prerelease.length > 0) return 1;
|
||||
if (a.prerelease.length > 0 && b.prerelease.length === 0) return -1;
|
||||
|
||||
// Compare pre-release segments lexicographically
|
||||
const len = Math.max(a.prerelease.length, b.prerelease.length);
|
||||
for (let i = 0; i < len; i++) {
|
||||
const pa = a.prerelease[i] ?? '';
|
||||
const pb = b.prerelease[i] ?? '';
|
||||
if (pa !== pb) return pa.localeCompare(pb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the best ancestor version for differential indexing.
|
||||
*
|
||||
* Selects the most-recent indexed version whose tag sorts before `currentTag`
|
||||
* using semver comparison. Falls back to lexicographic comparison when semver
|
||||
* parsing fails. Falls back to creation timestamp order as last resort.
|
||||
*
|
||||
* @param currentTag The tag being indexed
|
||||
* @param candidates All versioned snapshots for this repository
|
||||
* @returns The best indexed ancestor, or null if none qualifies
|
||||
*/
|
||||
export function findBestAncestorVersion(
|
||||
currentTag: string,
|
||||
candidates: RepositoryVersion[]
|
||||
): RepositoryVersion | null {
|
||||
const indexed = candidates.filter((v) => v.state === 'indexed');
|
||||
|
||||
const predecessors = indexed.filter((v) => compareTagVersions(v.tag, currentTag) < 0);
|
||||
|
||||
if (predecessors.length === 0) return null;
|
||||
|
||||
// Return the one with the highest version (closest predecessor)
|
||||
return predecessors.reduce((best, candidate) =>
|
||||
compareTagVersions(candidate.tag, best.tag) > 0 ? candidate : best
|
||||
);
|
||||
}
|
||||
@@ -24,7 +24,7 @@ export const GET: RequestHandler = ({ url }) => {
|
||||
const status = (url.searchParams.get('status') ?? undefined) as
|
||||
| IndexingJob['status']
|
||||
| undefined;
|
||||
const limit = Math.min(parseInt(url.searchParams.get('limit') ?? '20', 10) || 20, 200);
|
||||
const limit = Math.min(parseInt(url.searchParams.get('limit') ?? '20', 10) || 20, 1000);
|
||||
|
||||
const jobs = queue.listJobs({ repositoryId, status, limit });
|
||||
const total = queue.countJobs({ repositoryId, status });
|
||||
|
||||
@@ -55,6 +55,9 @@
|
||||
// Active version indexing jobs: tag -> jobId
|
||||
let activeVersionJobs = $state<Record<string, string | undefined>>({});
|
||||
|
||||
// Job progress data fed by the single shared poller (replaces per-version <IndexingProgress>).
|
||||
let versionJobProgress = $state<Record<string, IndexingJob>>({});
|
||||
|
||||
// Remove confirm
|
||||
let removeTag = $state<string | null>(null);
|
||||
|
||||
@@ -102,6 +105,66 @@
|
||||
loadVersions();
|
||||
});
|
||||
|
||||
// Single shared poller — one interval regardless of how many tags are active.
|
||||
// This replaces the N per-version <IndexingProgress> components that each had
|
||||
// their own setInterval, which caused ERR_INSUFFICIENT_RESOURCES and UI lockup
|
||||
// when hundreds of tags were queued simultaneously.
|
||||
$effect(() => {
|
||||
const activeIds = new Set(
|
||||
Object.values(activeVersionJobs).filter((id): id is string => !!id)
|
||||
);
|
||||
if (activeIds.size === 0) {
|
||||
versionJobProgress = {};
|
||||
return;
|
||||
}
|
||||
|
||||
let stopped = false;
|
||||
|
||||
async function poll() {
|
||||
if (stopped) return;
|
||||
try {
|
||||
const res = await fetch(
|
||||
`/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000`
|
||||
);
|
||||
if (!res.ok || stopped) return;
|
||||
const d = await res.json();
|
||||
|
||||
// Build a jobId → job lookup from the response.
|
||||
const map: Record<string, IndexingJob> = {};
|
||||
for (const job of (d.jobs ?? []) as IndexingJob[]) {
|
||||
map[job.id] = job;
|
||||
}
|
||||
if (!stopped) versionJobProgress = map;
|
||||
|
||||
// Retire completed jobs and trigger a single refresh.
|
||||
let anyCompleted = false;
|
||||
const nextJobs = { ...activeVersionJobs };
|
||||
for (const [tag, jobId] of Object.entries(activeVersionJobs)) {
|
||||
if (!jobId) continue;
|
||||
const job = map[jobId];
|
||||
if (job?.status === 'done' || job?.status === 'failed') {
|
||||
delete nextJobs[tag];
|
||||
anyCompleted = true;
|
||||
}
|
||||
}
|
||||
if (anyCompleted && !stopped) {
|
||||
activeVersionJobs = nextJobs;
|
||||
void loadVersions();
|
||||
void refreshRepo();
|
||||
}
|
||||
} catch {
|
||||
// ignore transient errors
|
||||
}
|
||||
}
|
||||
|
||||
void poll();
|
||||
const interval = setInterval(poll, 2000);
|
||||
return () => {
|
||||
stopped = true;
|
||||
clearInterval(interval);
|
||||
};
|
||||
});
|
||||
|
||||
async function handleReindex() {
|
||||
errorMessage = null;
|
||||
successMessage = null;
|
||||
@@ -265,23 +328,29 @@
|
||||
errorMessage = null;
|
||||
try {
|
||||
const tags = [...selectedDiscoveredTags];
|
||||
const responses = await Promise.all(
|
||||
tags.map((tag) =>
|
||||
fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}/versions`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ tag, autoIndex: true })
|
||||
})
|
||||
)
|
||||
);
|
||||
const results = await Promise.all(responses.map((r) => (r.ok ? r.json() : null)));
|
||||
const BATCH_SIZE = 5;
|
||||
let next = { ...activeVersionJobs };
|
||||
for (let i = 0; i < tags.length; i++) {
|
||||
const result = results[i];
|
||||
if (result?.job?.id) {
|
||||
next = { ...next, [tags[i]]: result.job.id };
|
||||
|
||||
for (let i = 0; i < tags.length; i += BATCH_SIZE) {
|
||||
const batch = tags.slice(i, i + BATCH_SIZE);
|
||||
const responses = await Promise.all(
|
||||
batch.map((tag) =>
|
||||
fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}/versions`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ tag, autoIndex: true })
|
||||
})
|
||||
)
|
||||
);
|
||||
const results = await Promise.all(responses.map((r) => (r.ok ? r.json() : null)));
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const result = results[j];
|
||||
if (result?.job?.id) {
|
||||
next = { ...next, [batch[j]]: result.job.id };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
activeVersionJobs = next;
|
||||
showDiscoverPanel = false;
|
||||
discoveredTags = [];
|
||||
@@ -547,16 +616,23 @@
|
||||
{/each}
|
||||
</div>
|
||||
{/if}
|
||||
{#if !!activeVersionJobs[version.tag]}
|
||||
<IndexingProgress
|
||||
jobId={activeVersionJobs[version.tag]!}
|
||||
oncomplete={() => {
|
||||
const { [version.tag]: _, ...rest } = activeVersionJobs;
|
||||
activeVersionJobs = rest;
|
||||
loadVersions();
|
||||
refreshRepo();
|
||||
}}
|
||||
/>
|
||||
{#if activeVersionJobs[version.tag]}
|
||||
{@const job = versionJobProgress[activeVersionJobs[version.tag]!]}
|
||||
<div class="mt-2">
|
||||
<div class="flex justify-between text-xs text-gray-500">
|
||||
<span>{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files</span>
|
||||
<span>{job?.progress ?? 0}%</span>
|
||||
</div>
|
||||
<div class="mt-1 h-1.5 w-full rounded-full bg-gray-200">
|
||||
<div
|
||||
class="h-1.5 rounded-full bg-blue-600 transition-all duration-300"
|
||||
style="width: {job?.progress ?? 0}%"
|
||||
></div>
|
||||
</div>
|
||||
{#if job?.status === 'failed'}
|
||||
<p class="mt-1 text-xs text-red-600">{job.error ?? 'Indexing failed.'}</p>
|
||||
{/if}
|
||||
</div>
|
||||
{/if}
|
||||
</div>
|
||||
{/each}
|
||||
|
||||
Reference in New Issue
Block a user