Initial commit: trueref v0.1.0-SNAPSHOT
Some checks failed
Build and publish Docker image / Build and push (push) Failing after 1m27s
Some checks failed
Build and publish Docker image / Build and push (push) Failing after 1m27s
Java 21 / Spring Boot 3.5.3 multi-module Maven project. Hybrid BM25+HNSW search with RRF, cross-encoder reranker, ONNX Runtime 1.22.0 (CPU + CUDA 12 GPU variants).
This commit is contained in:
28
trueref-application/pom.xml
Normal file
28
trueref-application/pom.xml
Normal file
@@ -0,0 +1,28 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.trueref</groupId>
|
||||
<artifactId>trueref-parent</artifactId>
|
||||
<version>0.1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>trueref-application</artifactId>
|
||||
<name>trueref-application</name>
|
||||
<description>Use-case implementations. Depends only on the domain.</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.trueref</groupId>
|
||||
<artifactId>trueref-domain</artifactId>
|
||||
</dependency>
|
||||
<!-- SLF4J only; orchestration may use virtual threads via JDK -->
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,89 @@
|
||||
package com.trueref.application.catalog;
|
||||
|
||||
import com.trueref.domain.error.RepositoryAlreadyRegistered;
|
||||
import com.trueref.domain.error.RepositoryNotFound;
|
||||
import com.trueref.domain.model.Repository;
|
||||
import com.trueref.domain.model.RepositoryId;
|
||||
import com.trueref.domain.model.TagPattern;
|
||||
import com.trueref.domain.model.Version;
|
||||
import com.trueref.domain.port.in.QueryCatalog;
|
||||
import com.trueref.domain.port.in.RegisterRepository;
|
||||
import com.trueref.domain.port.out.RepositoryStore;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** Implements {@link RegisterRepository} and {@link QueryCatalog}. */
|
||||
public final class CatalogService implements RegisterRepository, QueryCatalog {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CatalogService.class);
|
||||
|
||||
private static final List<TagPattern> DEFAULT_RULES = List.of(
|
||||
new TagPattern.Exact(),
|
||||
new TagPattern.VPrefix(),
|
||||
new TagPattern.ReleasePrefix(),
|
||||
new TagPattern.SemverFuzzy());
|
||||
|
||||
private final RepositoryStore store;
|
||||
private final Path trueRefHome;
|
||||
|
||||
public CatalogService(RepositoryStore store, Path trueRefHome) {
|
||||
this.store = store;
|
||||
this.trueRefHome = trueRefHome;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Repository register(Command cmd) {
|
||||
store.findByName(cmd.name()).ifPresent(r -> {
|
||||
throw new RepositoryAlreadyRegistered(cmd.name());
|
||||
});
|
||||
boolean managed = cmd.remoteUrl() != null && cmd.localPath() == null;
|
||||
String localPath = cmd.localPath() != null
|
||||
? cmd.localPath()
|
||||
: trueRefHome.resolve("repos").resolve(cmd.name().replace('/', '_')).toString();
|
||||
Instant now = Instant.now();
|
||||
Repository repo = new Repository(
|
||||
RepositoryId.random(),
|
||||
cmd.name(),
|
||||
cmd.remoteUrl(),
|
||||
localPath,
|
||||
managed,
|
||||
cmd.ignoreGlobs(),
|
||||
cmd.maxFileSizeBytes() == null ? 1_048_576L : cmd.maxFileSizeBytes(),
|
||||
cmd.pollInterval() == null ? Duration.ofHours(1) : cmd.pollInterval(),
|
||||
cmd.tagCap() == null ? 100 : cmd.tagCap(),
|
||||
cmd.versionMappingRules().isEmpty() ? DEFAULT_RULES : cmd.versionMappingRules(),
|
||||
now,
|
||||
now);
|
||||
Repository saved = store.save(repo);
|
||||
log.info("registered repository name={} id={} managed={} localPath={}",
|
||||
saved.name(), saved.id(), managed, localPath);
|
||||
return saved;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregister(RepositoryId id) {
|
||||
Repository existing = store.findById(id).orElseThrow(() -> new RepositoryNotFound(id.toString()));
|
||||
store.delete(id);
|
||||
log.info("unregistered repository name={} id={}", existing.name(), id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Repository> listRepositories() {
|
||||
return store.findAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Repository> findRepository(RepositoryId id) {
|
||||
return store.findById(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Version> listVersions(RepositoryId repoId) {
|
||||
return store.findVersionsByRepo(repoId);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package com.trueref.application.ingest;
|
||||
|
||||
import com.trueref.domain.error.RepositoryNotFound;
|
||||
import com.trueref.domain.model.Repository;
|
||||
import com.trueref.domain.model.Version;
|
||||
import com.trueref.domain.model.VersionId;
|
||||
import com.trueref.domain.model.VersionStatus;
|
||||
import com.trueref.domain.port.in.DiscoverVersions;
|
||||
import com.trueref.domain.port.out.GitClient;
|
||||
import com.trueref.domain.port.out.RepositoryStore;
|
||||
import com.trueref.domain.model.RepositoryId;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/** Fetches tags (git fetch + tag list) and persists new/updated {@link Version}s. */
|
||||
public final class DiscoveryService implements DiscoverVersions {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DiscoveryService.class);
|
||||
|
||||
private final RepositoryStore store;
|
||||
private final GitClient git;
|
||||
|
||||
public DiscoveryService(RepositoryStore store, GitClient git) {
|
||||
this.store = store;
|
||||
this.git = git;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Version> discover(RepositoryId repoId) {
|
||||
Repository repo = store.findById(repoId).orElseThrow(() -> new RepositoryNotFound(repoId.toString()));
|
||||
Path path = Path.of(repo.localPath());
|
||||
|
||||
// clone if managed and not present
|
||||
if (repo.managedClone() && !Files.exists(path.resolve(".git"))) {
|
||||
log.info("cloning for discovery: {}", repo.name());
|
||||
git.cloneRepo(repo.remoteUrl(), path);
|
||||
} else {
|
||||
try { git.fetch(path); } catch (Exception e) {
|
||||
log.warn("fetch failed for {}: {}", repo.name(), e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
List<GitClient.TagInfo> tags = git.listTags(path);
|
||||
// apply tag cap: keep top-N by epoch DESC (already sorted)
|
||||
List<GitClient.TagInfo> capped = tags.stream().limit(Math.max(1, repo.tagCap())).toList();
|
||||
|
||||
for (GitClient.TagInfo t : capped) {
|
||||
Optional<Version> existing = store.findVersionByTag(repoId, t.name());
|
||||
if (existing.isPresent()) {
|
||||
// refresh commit sha only if changed
|
||||
if (!existing.get().commitSha().equalsIgnoreCase(t.commitSha())) {
|
||||
Version updated = new Version(
|
||||
existing.get().id(),
|
||||
existing.get().repoId(),
|
||||
t.name(),
|
||||
t.commitSha(),
|
||||
VersionStatus.DISCOVERED, // needs re-index
|
||||
existing.get().indexedAt(),
|
||||
existing.get().chunkCount(),
|
||||
null);
|
||||
store.saveVersion(updated);
|
||||
log.info("tag {} changed commit; marked DISCOVERED", t.name());
|
||||
}
|
||||
} else {
|
||||
Version v = new Version(
|
||||
VersionId.random(),
|
||||
repoId,
|
||||
t.name(),
|
||||
t.commitSha(),
|
||||
VersionStatus.DISCOVERED,
|
||||
null,
|
||||
0,
|
||||
null);
|
||||
store.saveVersion(v);
|
||||
log.info("discovered new tag {}", t.name());
|
||||
}
|
||||
}
|
||||
return store.findVersionsByRepo(repoId).stream()
|
||||
.sorted(Comparator.comparing(Version::tag))
|
||||
.toList();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,604 @@
|
||||
package com.trueref.application.ingest;
|
||||
|
||||
import com.trueref.domain.model.Chunk;
|
||||
import com.trueref.domain.model.ChunkId;
|
||||
import com.trueref.domain.model.ChunkVersion;
|
||||
import com.trueref.domain.model.Embedding;
|
||||
import com.trueref.domain.model.IngestionJob;
|
||||
import com.trueref.domain.model.JobId;
|
||||
import com.trueref.domain.model.JobLogEvent;
|
||||
import com.trueref.domain.model.JobStage;
|
||||
import com.trueref.domain.model.JobStatus;
|
||||
import com.trueref.domain.model.JobType;
|
||||
import com.trueref.domain.model.Repository;
|
||||
import com.trueref.domain.model.RepositoryId;
|
||||
import com.trueref.domain.model.Version;
|
||||
import com.trueref.domain.model.VersionId;
|
||||
import com.trueref.domain.model.VersionStatus;
|
||||
import com.trueref.domain.port.in.IndexVersion;
|
||||
import com.trueref.domain.port.out.ChunkStore;
|
||||
import com.trueref.domain.port.out.CodeParser;
|
||||
import com.trueref.domain.port.out.EmbeddingCache;
|
||||
import com.trueref.domain.port.out.EmbeddingService;
|
||||
import com.trueref.domain.port.out.GitClient;
|
||||
import com.trueref.domain.port.out.JobEventBus;
|
||||
import com.trueref.domain.port.out.JobStore;
|
||||
import com.trueref.domain.port.out.RepositoryStore;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.security.MessageDigest;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
/**
|
||||
* Orchestrates the full ingestion pipeline for one (repo, version): clone/fetch → checkout →
|
||||
* discover files → diff-vs-parent → parse → chunk → dedupe by hash → embed (with cache) → index
|
||||
* into Lucene → commit.
|
||||
*
|
||||
* <p>The pipeline is split into two concurrent stages:
|
||||
* <ol>
|
||||
* <li><b>Parse phase</b> (virtual threads, up to {@code maxParseJobs} in parallel):
|
||||
* FETCH/CLONE → CHECKOUT → DISCOVER_FILES → DIFF_FILES → PARSE.
|
||||
* I/O-bound; no GPU use; worktree is removed immediately after parse.
|
||||
* </li>
|
||||
* <li><b>Embed phase</b> (single dedicated platform thread):
|
||||
* EMBED → INDEX → COMMIT. GPU-bound; serialises ONNX inference to prevent CUDA
|
||||
* context races. Runs on a platform thread for a stable OS thread identity.
|
||||
* </li>
|
||||
* </ol>
|
||||
* Completed parse batches are handed off via a bounded {@link BlockingQueue}: if the embed
|
||||
* worker is busy, parse workers block before queuing, naturally capping in-memory pressure.
|
||||
* One orchestrator instance is shared across all jobs.
|
||||
*/
|
||||
public final class IngestionOrchestrator implements IndexVersion {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(IngestionOrchestrator.class);
|
||||
|
||||
// Built-in ignore globs (applied in addition to .gitignore + per-repo globs).
|
||||
private static final List<String> BUILTIN_IGNORES = List.of(
|
||||
"**/.git/**",
|
||||
"**/node_modules/**",
|
||||
"**/target/**",
|
||||
"**/build/**",
|
||||
"**/dist/**",
|
||||
"**/out/**",
|
||||
"**/.idea/**",
|
||||
"**/.vscode/**",
|
||||
"**/__pycache__/**",
|
||||
"**/*.png", "**/*.jpg", "**/*.jpeg", "**/*.gif", "**/*.webp", "**/*.ico",
|
||||
"**/*.pdf", "**/*.zip", "**/*.tar", "**/*.gz", "**/*.jar", "**/*.class",
|
||||
"**/*.so", "**/*.dll", "**/*.dylib", "**/*.exe", "**/*.bin");
|
||||
|
||||
private final RepositoryStore repoStore;
|
||||
private final JobStore jobStore;
|
||||
private final ChunkStore chunkStore;
|
||||
private final EmbeddingService embeddings;
|
||||
private final EmbeddingCache embeddingCache;
|
||||
private final GitClient git;
|
||||
private final CodeParser parser;
|
||||
private final JobEventBus bus;
|
||||
|
||||
private final ExecutorService parseExecutor;
|
||||
private final Semaphore parseConcurrencyLimit;
|
||||
private final BlockingQueue<ParsedBatch> embedQueue;
|
||||
private final Thread embedWorker;
|
||||
private volatile boolean shuttingDown = false;
|
||||
private final Map<JobId, Boolean> running = new ConcurrentHashMap<>();
|
||||
|
||||
public IngestionOrchestrator(
|
||||
RepositoryStore repoStore,
|
||||
JobStore jobStore,
|
||||
ChunkStore chunkStore,
|
||||
EmbeddingService embeddings,
|
||||
EmbeddingCache embeddingCache,
|
||||
GitClient git,
|
||||
CodeParser parser,
|
||||
JobEventBus bus,
|
||||
int maxParseJobs,
|
||||
int embedQueueCapacity) {
|
||||
this.repoStore = repoStore;
|
||||
this.jobStore = jobStore;
|
||||
this.chunkStore = chunkStore;
|
||||
this.embeddings = embeddings;
|
||||
this.embeddingCache = embeddingCache;
|
||||
this.git = git;
|
||||
this.parser = parser;
|
||||
this.bus = bus;
|
||||
this.parseExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
// Fair semaphore caps parallel parse jobs (I/O + CPU heavy, no GPU).
|
||||
this.parseConcurrencyLimit = new Semaphore(Math.max(1, maxParseJobs), true);
|
||||
// Bounded queue between parse workers and the embed worker.
|
||||
// Backpressure: parse workers block here when the embed worker is saturated,
|
||||
// preventing unbounded in-memory accumulation of parsed chunks.
|
||||
this.embedQueue = new LinkedBlockingQueue<>(Math.max(1, embedQueueCapacity));
|
||||
// Single platform thread for GPU inference. Platform (not virtual) gives a
|
||||
// stable OS thread identity for CUDA — the synchronized(session) in OnnxEmbeddingService
|
||||
// already pins virtual threads, but a dedicated platform thread removes all doubt.
|
||||
this.embedWorker = Thread.ofPlatform()
|
||||
.name("embed-worker")
|
||||
.daemon(false)
|
||||
.start(this::drainEmbedQueue);
|
||||
log.info("IngestionOrchestrator ready: maxParseJobs={} embedQueueCapacity={}",
|
||||
Math.max(1, maxParseJobs), Math.max(1, embedQueueCapacity));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobId enqueue(RepositoryId repoId, VersionId versionId, boolean force) {
|
||||
Repository repo = repoStore.findById(repoId).orElseThrow();
|
||||
Version ver = repoStore.findVersion(versionId).orElseThrow();
|
||||
if (!force && ver.status() == VersionStatus.INDEXED) {
|
||||
log.info("version already indexed and not forcing; skipping repo={} tag={}", repo.name(), ver.tag());
|
||||
JobId id = JobId.random();
|
||||
IngestionJob skipped = new IngestionJob(
|
||||
id,
|
||||
repoId,
|
||||
versionId,
|
||||
JobType.INDEX_VERSION,
|
||||
JobStatus.SUCCEEDED,
|
||||
Instant.now(),
|
||||
Instant.now(),
|
||||
List.of());
|
||||
jobStore.save(skipped);
|
||||
bus.publishJob(skipped);
|
||||
return id;
|
||||
}
|
||||
|
||||
JobId jobId = JobId.random();
|
||||
IngestionJob job = new IngestionJob(
|
||||
jobId,
|
||||
repoId,
|
||||
versionId,
|
||||
JobType.INDEX_VERSION,
|
||||
JobStatus.QUEUED,
|
||||
null,
|
||||
null,
|
||||
List.of());
|
||||
jobStore.save(job);
|
||||
bus.publishJob(job);
|
||||
running.put(jobId, Boolean.TRUE);
|
||||
parseExecutor.submit(() -> runParsePhase(jobId, repo, ver));
|
||||
return jobId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Carry struct that transfers parse-phase output to the embed worker.
|
||||
* The git worktree has already been removed before this batch enters the queue;
|
||||
* only in-memory chunk data travels across the thread boundary.
|
||||
*/
|
||||
private record ParsedBatch(
|
||||
JobId jobId, Repository repo, Version ver, List<ParsedPiece> pieces) {}
|
||||
|
||||
/**
|
||||
* Parse phase — runs on a virtual thread, up to {@code maxParseJobs} in parallel.
|
||||
* Stages: FETCH/CLONE → CHECKOUT → DISCOVER_FILES → DIFF_FILES → PARSE.
|
||||
* On completion, removes the worktree, releases the parse slot so the next job can
|
||||
* start immediately, then blocks on {@link #embedQueue} until the embed worker has
|
||||
* room (natural backpressure).
|
||||
*/
|
||||
private void runParsePhase(JobId jobId, Repository repo, Version ver) {
|
||||
MDC.put("jobId", jobId.toString());
|
||||
MDC.put("repo", repo.name());
|
||||
MDC.put("tag", ver.tag());
|
||||
try {
|
||||
parseConcurrencyLimit.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("job {} interrupted while waiting for parse slot — failing", jobId);
|
||||
repoStore.updateVersionStatus(ver.id(), VersionStatus.FAILED, "interrupted");
|
||||
transitionJob(jobId, JobStatus.FAILED, null, Instant.now());
|
||||
running.remove(jobId);
|
||||
MDC.clear();
|
||||
return;
|
||||
}
|
||||
boolean slotReleased = false;
|
||||
Path worktree = null;
|
||||
Path repoPath = Path.of(repo.localPath());
|
||||
try {
|
||||
transitionJob(jobId, JobStatus.RUNNING, Instant.now(), null);
|
||||
repoStore.updateVersionStatus(ver.id(), VersionStatus.INDEXING, null);
|
||||
|
||||
// STAGE: FETCH (or CLONE if managed and absent)
|
||||
if (repo.managedClone() && !Files.exists(repoPath.resolve(".git"))) {
|
||||
stage(jobId, JobStage.StageName.CLONE, () -> {
|
||||
logEvent(jobId, JobLogEvent.Level.INFO, JobStage.StageName.CLONE,
|
||||
"cloning " + repo.remoteUrl() + " → " + repoPath);
|
||||
git.cloneRepo(repo.remoteUrl(), repoPath);
|
||||
return 1L;
|
||||
});
|
||||
} else {
|
||||
stage(jobId, JobStage.StageName.FETCH, () -> {
|
||||
git.fetch(repoPath);
|
||||
return 1L;
|
||||
});
|
||||
}
|
||||
|
||||
// STAGE: CHECKOUT
|
||||
final Path wt = stageReturning(jobId, JobStage.StageName.CHECKOUT, () -> {
|
||||
Path w = git.checkoutWorktree(repoPath, ver.tag());
|
||||
logEvent(jobId, JobLogEvent.Level.INFO, JobStage.StageName.CHECKOUT,
|
||||
"checked out at " + w);
|
||||
return w;
|
||||
});
|
||||
worktree = wt;
|
||||
|
||||
// STAGE: DISCOVER_FILES
|
||||
List<Path> files = stageReturning(jobId, JobStage.StageName.DISCOVER_FILES, () ->
|
||||
discoverFiles(wt, repo));
|
||||
logEvent(jobId, JobLogEvent.Level.INFO, JobStage.StageName.DISCOVER_FILES,
|
||||
"found " + files.size() + " indexable files");
|
||||
|
||||
// STAGE: DIFF_FILES (select subset)
|
||||
String baseRef = pickParentIndexedTag(repo, ver);
|
||||
final List<Path> selectedFiles;
|
||||
if (baseRef != null) {
|
||||
Set<String> changedRel = stageReturning(jobId, JobStage.StageName.DIFF_FILES, () -> {
|
||||
List<GitClient.DiffEntry> diff = git.diff(repoPath, baseRef, ver.tag());
|
||||
Set<String> s = new HashSet<>();
|
||||
for (GitClient.DiffEntry e : diff) {
|
||||
if (e.change() != GitClient.DiffEntry.ChangeType.DELETED) s.add(e.path());
|
||||
}
|
||||
return s;
|
||||
});
|
||||
selectedFiles = files.stream()
|
||||
.filter(f -> changedRel.contains(wt.relativize(f).toString().replace('\\', '/')))
|
||||
.toList();
|
||||
logEvent(jobId, JobLogEvent.Level.INFO, JobStage.StageName.DIFF_FILES,
|
||||
"diff vs " + baseRef + " selects " + selectedFiles.size() + "/" + files.size());
|
||||
} else {
|
||||
selectedFiles = files;
|
||||
}
|
||||
|
||||
// STAGE: PARSE + CHUNK + HASH (combined)
|
||||
List<ParsedPiece> pieces = stageReturning(jobId, JobStage.StageName.PARSE, () ->
|
||||
parseAll(selectedFiles, wt));
|
||||
|
||||
// Worktree no longer needed — free disk space before blocking on embed queue.
|
||||
removeWorktreeQuietly(jobId, repoPath, wt);
|
||||
worktree = null;
|
||||
|
||||
// Release parse slot before blocking so the next job can start parsing
|
||||
// while this batch waits for the embed worker (maximises CPU/GPU overlap).
|
||||
parseConcurrencyLimit.release();
|
||||
slotReleased = true;
|
||||
|
||||
// Hand off to embed worker — blocks if the queue is at capacity.
|
||||
embedQueue.put(new ParsedBatch(jobId, repo, ver, pieces));
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("job {} interrupted during parse — failing", jobId);
|
||||
repoStore.updateVersionStatus(ver.id(), VersionStatus.FAILED, "interrupted");
|
||||
transitionJob(jobId, JobStatus.FAILED, null, Instant.now());
|
||||
running.remove(jobId);
|
||||
} catch (Exception e) {
|
||||
log.error("parse phase failed for job {}", jobId, e);
|
||||
logEvent(jobId, JobLogEvent.Level.ERROR, null, "parse phase failed: " + e.getMessage());
|
||||
repoStore.updateVersionStatus(ver.id(), VersionStatus.FAILED, e.getMessage());
|
||||
transitionJob(jobId, JobStatus.FAILED, null, Instant.now());
|
||||
running.remove(jobId);
|
||||
} finally {
|
||||
if (worktree != null) removeWorktreeQuietly(jobId, repoPath, worktree);
|
||||
if (!slotReleased) parseConcurrencyLimit.release();
|
||||
MDC.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Embed worker — runs on a single dedicated platform thread.
|
||||
* Drains {@link #embedQueue} until {@link #shutdown()} signals stop.
|
||||
* Stages per batch: EMBED → INDEX → COMMIT → mark version indexed → transition SUCCEEDED.
|
||||
*/
|
||||
private void drainEmbedQueue() {
|
||||
log.info("embed worker started ({})", Thread.currentThread().getName());
|
||||
while (!shuttingDown || !embedQueue.isEmpty()) {
|
||||
ParsedBatch batch;
|
||||
try {
|
||||
batch = embedQueue.poll(500, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
if (batch == null) continue;
|
||||
runEmbedPhase(batch);
|
||||
}
|
||||
log.info("embed worker stopped");
|
||||
}
|
||||
|
||||
private void runEmbedPhase(ParsedBatch batch) {
|
||||
MDC.put("jobId", batch.jobId().toString());
|
||||
MDC.put("repo", batch.repo().name());
|
||||
MDC.put("tag", batch.ver().tag());
|
||||
try {
|
||||
// STAGE: EMBED
|
||||
List<Chunk> chunks = stageReturning(batch.jobId(), JobStage.StageName.EMBED, () ->
|
||||
embedAll(batch.jobId(), batch.pieces()));
|
||||
|
||||
// STAGE: INDEX
|
||||
stage(batch.jobId(), JobStage.StageName.INDEX, () -> {
|
||||
chunkStore.unlinkVersion(batch.ver().id());
|
||||
List<ChunkVersion> links = buildLinks(batch.ver().id(), batch.pieces());
|
||||
chunkStore.linkChunks(links);
|
||||
return (long) links.size();
|
||||
});
|
||||
|
||||
// STAGE: COMMIT
|
||||
stage(batch.jobId(), JobStage.StageName.COMMIT, () -> {
|
||||
chunkStore.commit();
|
||||
return 1L;
|
||||
});
|
||||
|
||||
repoStore.markVersionIndexed(batch.ver().id(), batch.pieces().size());
|
||||
transitionJob(batch.jobId(), JobStatus.SUCCEEDED, null, Instant.now());
|
||||
logEvent(batch.jobId(), JobLogEvent.Level.INFO, null,
|
||||
"indexed " + chunks.size() + " chunks across " + batch.pieces().size() + " pieces");
|
||||
} catch (Exception e) {
|
||||
log.error("embed phase failed for job {}", batch.jobId(), e);
|
||||
logEvent(batch.jobId(), JobLogEvent.Level.ERROR, null,
|
||||
"embed phase failed: " + e.getMessage());
|
||||
repoStore.updateVersionStatus(batch.ver().id(), VersionStatus.FAILED, e.getMessage());
|
||||
transitionJob(batch.jobId(), JobStatus.FAILED, null, Instant.now());
|
||||
} finally {
|
||||
running.remove(batch.jobId());
|
||||
MDC.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private void removeWorktreeQuietly(JobId jobId, Path repoPath, Path worktree) {
|
||||
try {
|
||||
git.removeWorktree(repoPath, worktree);
|
||||
} catch (Exception e) {
|
||||
logEvent(jobId, JobLogEvent.Level.WARN, null, "worktree cleanup failed: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Orderly shutdown: stops the parse executor, signals the embed worker to stop after
|
||||
* finishing its current batch, then fails any batches still in the queue.
|
||||
* Called by Spring via {@code @Bean(destroyMethod = "shutdown")} in ApplicationBeans.
|
||||
*/
|
||||
void shutdown() {
|
||||
log.info("IngestionOrchestrator shutting down — stopping embed worker");
|
||||
shuttingDown = true;
|
||||
parseExecutor.shutdownNow();
|
||||
embedWorker.interrupt();
|
||||
try {
|
||||
embedWorker.join(10_000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
// Fail any batches that parsed OK but never got to embed (restart mid-queue).
|
||||
ParsedBatch orphan;
|
||||
while ((orphan = embedQueue.poll()) != null) {
|
||||
log.warn("failing orphaned embed batch for job {} (shutdown)", orphan.jobId());
|
||||
repoStore.updateVersionStatus(orphan.ver().id(), VersionStatus.FAILED, "application shutdown");
|
||||
transitionJob(orphan.jobId(), JobStatus.FAILED, null, Instant.now());
|
||||
running.remove(orphan.jobId());
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Stage helpers */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
private interface StageBody {
|
||||
long execute() throws Exception;
|
||||
}
|
||||
|
||||
private interface StageBodyReturning<T> {
|
||||
T execute() throws Exception;
|
||||
}
|
||||
|
||||
private void stage(JobId id, JobStage.StageName name, StageBody body) {
|
||||
stageReturning(id, name, () -> {
|
||||
long n = body.execute();
|
||||
return n;
|
||||
});
|
||||
}
|
||||
|
||||
private <T> T stageReturning(JobId id, JobStage.StageName name, StageBodyReturning<T> body) {
|
||||
Instant start = Instant.now();
|
||||
JobStage running = new JobStage(
|
||||
id, name, JobStage.StageStatus.RUNNING, start, null, 0, 0, 0, null);
|
||||
jobStore.upsertStage(running);
|
||||
publishJob(id);
|
||||
try {
|
||||
T out = body.execute();
|
||||
long items = (out instanceof Long l) ? l : (out instanceof List<?> l ? l.size() : 1);
|
||||
JobStage done = new JobStage(
|
||||
id, name, JobStage.StageStatus.SUCCEEDED, start, Instant.now(), items, items, 0, null);
|
||||
jobStore.upsertStage(done);
|
||||
publishJob(id);
|
||||
return out;
|
||||
} catch (Exception e) {
|
||||
JobStage failed = new JobStage(
|
||||
id, name, JobStage.StageStatus.FAILED, start, Instant.now(), 0, 0, 0, e.getMessage());
|
||||
jobStore.upsertStage(failed);
|
||||
publishJob(id);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void transitionJob(JobId id, JobStatus s, Instant startedAt, Instant finishedAt) {
|
||||
jobStore.updateStatus(id, s, startedAt, finishedAt);
|
||||
publishJob(id);
|
||||
}
|
||||
|
||||
private void publishJob(JobId id) {
|
||||
jobStore.findById(id).ifPresent(bus::publishJob);
|
||||
}
|
||||
|
||||
private void logEvent(JobId id, JobLogEvent.Level level, JobStage.StageName stage, String msg) {
|
||||
bus.publishLog(new JobLogEvent(id, Instant.now(), level, stage, msg));
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Pipeline steps */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
private List<Path> discoverFiles(Path root, Repository repo) throws IOException {
|
||||
List<PathMatcher> matchers = new ArrayList<>();
|
||||
for (String g : BUILTIN_IGNORES) matchers.add(FileSystems.getDefault().getPathMatcher("glob:" + g));
|
||||
for (String g : repo.ignoreGlobs()) matchers.add(FileSystems.getDefault().getPathMatcher("glob:" + g));
|
||||
long maxBytes = repo.maxFileSizeBytes();
|
||||
List<Path> out = new ArrayList<>();
|
||||
Files.walkFileTree(root, new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
|
||||
Path rel = root.relativize(file);
|
||||
for (PathMatcher m : matchers) {
|
||||
if (m.matches(rel) || m.matches(file.getFileName())) return FileVisitResult.CONTINUE;
|
||||
}
|
||||
if (attrs.size() > maxBytes) return FileVisitResult.CONTINUE;
|
||||
out.add(file);
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
|
||||
if (dir.equals(root)) return FileVisitResult.CONTINUE;
|
||||
Path rel = root.relativize(dir);
|
||||
for (PathMatcher m : matchers) {
|
||||
if (m.matches(rel)) return FileVisitResult.SKIP_SUBTREE;
|
||||
}
|
||||
return FileVisitResult.CONTINUE;
|
||||
}
|
||||
});
|
||||
return out;
|
||||
}
|
||||
|
||||
private record ParsedPiece(
|
||||
String contentHash,
|
||||
String content,
|
||||
String language,
|
||||
String symbol,
|
||||
int tokenCount,
|
||||
String filePath,
|
||||
int startLine,
|
||||
int endLine) {}
|
||||
|
||||
private List<ParsedPiece> parseAll(List<Path> files, Path root) {
|
||||
List<ParsedPiece> out = new ArrayList<>();
|
||||
MessageDigest sha;
|
||||
try {
|
||||
sha = MessageDigest.getInstance("SHA-256");
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
for (Path f : files) {
|
||||
String rel = root.relativize(f).toString().replace('\\', '/');
|
||||
try {
|
||||
List<CodeParser.ParsedChunk> parsed = parser.parse(f, rel);
|
||||
for (var pc : parsed) {
|
||||
String hash = bytesToHex(sha.digest(pc.content().getBytes(StandardCharsets.UTF_8)));
|
||||
sha.reset();
|
||||
int tokens = Math.max(1, pc.content().length() / 4); // heuristic; refined if needed
|
||||
out.add(new ParsedPiece(
|
||||
hash, pc.content(), pc.language(), pc.symbol(), tokens, rel, pc.startLine(), pc.endLine()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("parse failed for {}: {}", rel, e.toString());
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private List<Chunk> embedAll(JobId jobId, List<ParsedPiece> pieces) {
|
||||
// Dedupe by hash across this batch AND against existing chunks in the store/cache.
|
||||
Map<String, Chunk> resolved = new HashMap<>();
|
||||
List<ParsedPiece> toEmbed = new ArrayList<>();
|
||||
for (var p : pieces) {
|
||||
if (resolved.containsKey(p.contentHash())) continue;
|
||||
var existing = chunkStore.findByContentHash(p.contentHash());
|
||||
if (existing.isPresent()) {
|
||||
resolved.put(p.contentHash(), existing.get());
|
||||
continue;
|
||||
}
|
||||
// cache?
|
||||
var cached = embeddingCache.get(p.contentHash());
|
||||
if (cached.isPresent()) {
|
||||
Chunk c = upsert(p, cached.get());
|
||||
resolved.put(p.contentHash(), c);
|
||||
continue;
|
||||
}
|
||||
toEmbed.add(p);
|
||||
}
|
||||
|
||||
if (!toEmbed.isEmpty()) {
|
||||
List<String> texts = toEmbed.stream().map(ParsedPiece::content).toList();
|
||||
List<float[]> vecs = embeddings.embed(texts);
|
||||
for (int i = 0; i < toEmbed.size(); i++) {
|
||||
var p = toEmbed.get(i);
|
||||
float[] v = vecs.get(i);
|
||||
embeddingCache.put(p.contentHash(), v);
|
||||
Chunk c = upsert(p, v);
|
||||
resolved.put(p.contentHash(), c);
|
||||
}
|
||||
logEvent(jobId, JobLogEvent.Level.INFO, JobStage.StageName.EMBED,
|
||||
"embedded " + toEmbed.size() + " new chunks (cache/dedupe hits = " + (pieces.size() - toEmbed.size()) + ")");
|
||||
}
|
||||
return new ArrayList<>(resolved.values());
|
||||
}
|
||||
|
||||
private Chunk upsert(ParsedPiece p, float[] vector) {
|
||||
Chunk c = new Chunk(ChunkId.random(), p.contentHash(), p.content(), p.language(), p.symbol(), p.tokenCount());
|
||||
return chunkStore.upsertChunk(c, new Embedding(c.id(), vector));
|
||||
}
|
||||
|
||||
private List<ChunkVersion> buildLinks(VersionId versionId, List<ParsedPiece> pieces) {
|
||||
// Piece → ChunkId requires knowing the chunk id assigned on upsert.
|
||||
// We re-resolve via findByContentHash — cheap because it's a Term query.
|
||||
List<ChunkVersion> links = new ArrayList<>(pieces.size());
|
||||
Map<String, ChunkId> hashToId = new HashMap<>();
|
||||
for (var p : pieces) {
|
||||
ChunkId id = hashToId.computeIfAbsent(p.contentHash(), h ->
|
||||
chunkStore.findByContentHash(h).orElseThrow().id());
|
||||
links.add(new ChunkVersion(id, versionId, p.filePath(), p.startLine(), p.endLine()));
|
||||
}
|
||||
return links;
|
||||
}
|
||||
|
||||
private String pickParentIndexedTag(Repository repo, Version ver) {
|
||||
// Most recent previously-indexed version for this repo that isn't this one.
|
||||
List<Version> indexed = repoStore.findVersionsByStatus(repo.id(), VersionStatus.INDEXED);
|
||||
return indexed.stream()
|
||||
.filter(v -> !v.id().equals(ver.id()))
|
||||
.max((a, b) -> a.tag().compareTo(b.tag()))
|
||||
.map(Version::tag)
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
private static String bytesToHex(byte[] bytes) {
|
||||
char[] hex = "0123456789abcdef".toCharArray();
|
||||
char[] out = new char[bytes.length * 2];
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
int v = bytes[i] & 0xff;
|
||||
out[i * 2] = hex[v >>> 4];
|
||||
out[i * 2 + 1] = hex[v & 0x0f];
|
||||
}
|
||||
return new String(out);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.trueref.application.observability;
|
||||
|
||||
import com.trueref.domain.model.IngestionJob;
|
||||
import com.trueref.domain.model.JobId;
|
||||
import com.trueref.domain.model.JobLogEvent;
|
||||
import com.trueref.domain.port.out.JobEventBus;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* In-process publish/subscribe implementation of {@link JobEventBus}. Listeners receive events on
|
||||
* the publisher's thread; consumers should defer expensive work (e.g. SSE writes) to a virtual
|
||||
* thread to keep the publisher fast.
|
||||
*/
|
||||
public final class InMemoryJobEventBus implements JobEventBus {
|
||||
|
||||
private final CopyOnWriteArrayList<Consumer<IngestionJob>> jobListeners = new CopyOnWriteArrayList<>();
|
||||
private final Map<JobId, CopyOnWriteArrayList<Consumer<JobLogEvent>>> logListeners = new ConcurrentHashMap<>();
|
||||
private final CopyOnWriteArrayList<Consumer<JobLogEvent>> globalLogListeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Override
|
||||
public void publishJob(IngestionJob job) {
|
||||
for (Consumer<IngestionJob> l : jobListeners) {
|
||||
try {
|
||||
l.accept(job);
|
||||
} catch (Exception ignored) {
|
||||
// listener failures must not break publishing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishLog(JobLogEvent event) {
|
||||
var perJob = logListeners.get(event.jobId());
|
||||
if (perJob != null) {
|
||||
for (Consumer<JobLogEvent> l : perJob) {
|
||||
try {
|
||||
l.accept(event);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Consumer<JobLogEvent> l : globalLogListeners) {
|
||||
try {
|
||||
l.accept(event);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseable subscribeJobs(Consumer<IngestionJob> listener) {
|
||||
jobListeners.add(listener);
|
||||
return () -> jobListeners.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseable subscribeLogs(JobId jobId, Consumer<JobLogEvent> listener) {
|
||||
var list = logListeners.computeIfAbsent(jobId, k -> new CopyOnWriteArrayList<>());
|
||||
list.add(listener);
|
||||
return () -> list.remove(listener);
|
||||
}
|
||||
|
||||
/** Subscribe to ALL log events regardless of job (used by the dashboard). */
|
||||
public AutoCloseable subscribeAllLogs(Consumer<JobLogEvent> listener) {
|
||||
globalLogListeners.add(listener);
|
||||
return () -> globalLogListeners.remove(listener);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.trueref.application.observability;
|
||||
|
||||
import com.trueref.domain.model.IngestionJob;
|
||||
import com.trueref.domain.model.JobId;
|
||||
import com.trueref.domain.model.JobLogEvent;
|
||||
import com.trueref.domain.model.JobStatus;
|
||||
import com.trueref.domain.model.RepositoryId;
|
||||
import com.trueref.domain.model.VersionId;
|
||||
import com.trueref.domain.port.in.ObserveJobs;
|
||||
import com.trueref.domain.port.out.JobEventBus;
|
||||
import com.trueref.domain.port.out.JobStore;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
|
||||
public final class JobObservationService implements ObserveJobs {
|
||||
|
||||
private final JobStore jobs;
|
||||
private final JobEventBus bus;
|
||||
|
||||
public JobObservationService(JobStore jobs, JobEventBus bus) {
|
||||
this.jobs = jobs;
|
||||
this.bus = bus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<IngestionJob> findJob(JobId id) {
|
||||
return jobs.findById(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IngestionJob> listJobs(
|
||||
@Nullable RepositoryId repoId, @Nullable VersionId versionId, @Nullable JobStatus status, int limit) {
|
||||
return jobs.find(repoId, versionId, status, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseable subscribeJobs(Consumer<IngestionJob> listener) {
|
||||
return bus.subscribeJobs(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseable subscribeLogs(JobId jobId, Consumer<JobLogEvent> listener) {
|
||||
return bus.subscribeLogs(jobId, listener);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
/** In-process implementations of cross-cutting application services. */
|
||||
@org.jspecify.annotations.NullMarked
|
||||
package com.trueref.application.observability;
|
||||
@@ -0,0 +1,3 @@
|
||||
/** Application services: use-case implementations. */
|
||||
@org.jspecify.annotations.NullMarked
|
||||
package com.trueref.application;
|
||||
@@ -0,0 +1,161 @@
|
||||
package com.trueref.application.resolve;
|
||||
|
||||
import com.trueref.domain.model.Repository;
|
||||
import com.trueref.domain.model.RepositoryId;
|
||||
import com.trueref.domain.model.TagPattern;
|
||||
import com.trueref.domain.model.Version;
|
||||
import com.trueref.domain.model.VersionStatus;
|
||||
import com.trueref.domain.port.in.IndexVersion;
|
||||
import com.trueref.domain.port.in.ResolveLibraryId;
|
||||
import com.trueref.domain.port.out.RepositoryStore;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Fuzzy library-name matching + version→tag mapping. Mirrors Context7's {@code resolve-library-id}
|
||||
* semantics. When {@code version} is provided and maps to a known-but-not-yet-indexed tag, triggers
|
||||
* an async index job (fire-and-forget).
|
||||
*/
|
||||
public final class LibraryResolver implements ResolveLibraryId {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(LibraryResolver.class);
|
||||
|
||||
private static final Pattern SEMVER = Pattern.compile("^v?(\\d+)(?:\\.(\\d+))?(?:\\.(\\d+))?.*$");
|
||||
|
||||
private final RepositoryStore store;
|
||||
private final IndexVersion indexer;
|
||||
|
||||
public LibraryResolver(RepositoryStore store, IndexVersion indexer) {
|
||||
this.store = store;
|
||||
this.indexer = indexer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result resolve(Query q) {
|
||||
String needle = q.libraryName().toLowerCase();
|
||||
List<Repository> all = store.findAll();
|
||||
List<Match> matches = new ArrayList<>();
|
||||
for (Repository r : all) {
|
||||
double score = nameScore(r.name().toLowerCase(), needle);
|
||||
if (score <= 0) continue;
|
||||
List<Version> versions = store.findVersionsByRepo(r.id());
|
||||
|
||||
// If a version was requested, map it to a tag and ensure indexing.
|
||||
if (q.version() != null && !q.version().isBlank()) {
|
||||
Optional<Version> target = mapVersion(r, versions, q.version());
|
||||
target.ifPresent(v -> ensureIndexed(r.id(), v));
|
||||
}
|
||||
|
||||
int snippetCount = versions.stream()
|
||||
.filter(v -> v.status() == VersionStatus.INDEXED)
|
||||
.mapToInt(Version::chunkCount)
|
||||
.sum();
|
||||
List<VersionRef> refs = versions.stream()
|
||||
.sorted(Comparator.comparing(Version::tag).reversed())
|
||||
.map(v -> new VersionRef(v.id(), v.tag(), v.status()))
|
||||
.toList();
|
||||
String libraryId = "/" + r.name();
|
||||
matches.add(new Match(r.id(), libraryId, r.name(), null, snippetCount, refs, score));
|
||||
}
|
||||
matches.sort(Comparator.comparingDouble(Match::score).reversed());
|
||||
return new Result(matches);
|
||||
}
|
||||
|
||||
/** Fuzzy name scoring: exact 1.0, prefix 0.9, contains 0.7, token overlap otherwise. */
|
||||
private double nameScore(String haystack, String needle) {
|
||||
if (haystack.equals(needle)) return 1.0;
|
||||
if (haystack.endsWith("/" + needle) || haystack.startsWith(needle + "/")) return 0.95;
|
||||
if (haystack.contains(needle)) return 0.8;
|
||||
// token overlap
|
||||
String[] hTok = haystack.split("[^a-z0-9]+");
|
||||
String[] nTok = needle.split("[^a-z0-9]+");
|
||||
int hit = 0;
|
||||
for (String nt : nTok) {
|
||||
if (nt.isBlank()) continue;
|
||||
for (String ht : hTok) {
|
||||
if (ht.equals(nt)) { hit++; break; }
|
||||
}
|
||||
}
|
||||
if (hit == 0) return 0.0;
|
||||
return 0.3 + 0.4 * ((double) hit / Math.max(1, nTok.length));
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps a version string to the closest matching tag using the repo's configured mapping rules.
|
||||
* Rules are tried in order.
|
||||
*/
|
||||
public Optional<Version> mapVersion(Repository repo, List<Version> versions, String requested) {
|
||||
for (TagPattern rule : repo.versionMappingRules()) {
|
||||
String candidate = switch (rule) {
|
||||
case TagPattern.Exact e -> requested;
|
||||
case TagPattern.VPrefix v -> "v" + stripV(requested);
|
||||
case TagPattern.ReleasePrefix r -> "release-" + stripV(requested);
|
||||
case TagPattern.Custom c -> c.template()
|
||||
.replace("{version}", requested)
|
||||
.replace("{semver}", stripV(requested));
|
||||
case TagPattern.SemverFuzzy s -> null; // handled below
|
||||
};
|
||||
if (candidate == null) continue;
|
||||
Optional<Version> exact = versions.stream()
|
||||
.filter(v -> v.tag().equalsIgnoreCase(candidate))
|
||||
.findFirst();
|
||||
if (exact.isPresent()) return exact;
|
||||
}
|
||||
// Semver fuzzy: pick tag with closest semver distance
|
||||
return semverClosest(versions, requested);
|
||||
}
|
||||
|
||||
private Optional<Version> semverClosest(List<Version> versions, String requested) {
|
||||
int[] r = parseSemver(requested);
|
||||
if (r == null) return Optional.empty();
|
||||
return versions.stream()
|
||||
.map(v -> new Object[] {v, parseSemver(v.tag())})
|
||||
.filter(t -> t[1] != null)
|
||||
.min(Comparator.comparingLong(t -> semverDist((int[]) t[1], r)))
|
||||
.map(t -> (Version) t[0]);
|
||||
}
|
||||
|
||||
private static @Nullable int[] parseSemver(String s) {
|
||||
Matcher m = SEMVER.matcher(s);
|
||||
if (!m.matches()) return null;
|
||||
return new int[] {
|
||||
parseIntOrZero(m.group(1)),
|
||||
parseIntOrZero(m.group(2)),
|
||||
parseIntOrZero(m.group(3))
|
||||
};
|
||||
}
|
||||
|
||||
private static int parseIntOrZero(String s) {
|
||||
if (s == null || s.isEmpty()) return 0;
|
||||
try { return Integer.parseInt(s); } catch (NumberFormatException e) { return 0; }
|
||||
}
|
||||
|
||||
private static long semverDist(int[] a, int[] b) {
|
||||
long d = 0;
|
||||
d += Math.abs(a[0] - b[0]) * 1_000_000L;
|
||||
d += Math.abs(a[1] - b[1]) * 1_000L;
|
||||
d += Math.abs(a[2] - b[2]);
|
||||
return d;
|
||||
}
|
||||
|
||||
private static String stripV(String s) {
|
||||
return s.startsWith("v") || s.startsWith("V") ? s.substring(1) : s;
|
||||
}
|
||||
|
||||
private void ensureIndexed(RepositoryId repoId, Version v) {
|
||||
if (v.status() == VersionStatus.INDEXED || v.status() == VersionStatus.INDEXING) return;
|
||||
try {
|
||||
log.info("on-demand indexing: repo={} tag={}", repoId, v.tag());
|
||||
indexer.enqueue(repoId, v.id(), false);
|
||||
} catch (Exception e) {
|
||||
log.warn("on-demand indexing enqueue failed: {}", e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,238 @@
|
||||
package com.trueref.application.search;
|
||||
|
||||
import com.trueref.domain.error.InvalidSearchRequest;
|
||||
import com.trueref.domain.model.ChunkId;
|
||||
import com.trueref.domain.model.Repository;
|
||||
import com.trueref.domain.model.SearchHit;
|
||||
import com.trueref.domain.model.SearchScope;
|
||||
import com.trueref.domain.model.Version;
|
||||
import com.trueref.domain.port.in.SearchLibraryDocs;
|
||||
import com.trueref.domain.port.out.ChunkStore;
|
||||
import com.trueref.domain.port.out.EmbeddingService;
|
||||
import com.trueref.domain.port.out.RepositoryStore;
|
||||
import com.trueref.domain.port.out.RerankerService;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Hybrid search: BM25 + dense kNN fused by Reciprocal Rank Fusion (RRF), then reranked by a
|
||||
* cross-encoder, then packed to a token budget.
|
||||
*/
|
||||
public final class HybridSearchService implements SearchLibraryDocs {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(HybridSearchService.class);
|
||||
|
||||
/**
|
||||
* Matches camelCase identifiers that are likely to be Phaser API method/class names (≥6 chars,
|
||||
* must contain at least one uppercase letter after the first char, not all-caps).
|
||||
* Examples: setCollideWorldBounds, createBitmapMask, addOverlap.
|
||||
*/
|
||||
private static final Pattern CAMEL_IDENT = Pattern.compile(
|
||||
"\\b([a-z][a-zA-Z0-9]{5,})(?=\\b)");
|
||||
|
||||
private final ChunkStore chunks;
|
||||
private final EmbeddingService embedder;
|
||||
private final RerankerService reranker;
|
||||
private final RepositoryStore repos;
|
||||
private final int rrfK;
|
||||
private final int rerankTopK;
|
||||
private final int finalTopK;
|
||||
|
||||
public HybridSearchService(
|
||||
ChunkStore chunks,
|
||||
EmbeddingService embedder,
|
||||
RerankerService reranker,
|
||||
RepositoryStore repos,
|
||||
int rrfK,
|
||||
int rerankTopK,
|
||||
int finalTopK) {
|
||||
this.chunks = chunks;
|
||||
this.embedder = embedder;
|
||||
this.reranker = reranker;
|
||||
this.repos = repos;
|
||||
this.rrfK = rrfK;
|
||||
this.rerankTopK = rerankTopK;
|
||||
this.finalTopK = finalTopK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result search(Query q) {
|
||||
if (q.text() == null || q.text().isBlank()) {
|
||||
throw new InvalidSearchRequest("query text must not be blank");
|
||||
}
|
||||
if (q.scope().refs().isEmpty()) {
|
||||
throw new InvalidSearchRequest("search scope must not be empty");
|
||||
}
|
||||
|
||||
String text = rewrite(q.text(), q.topic());
|
||||
// Augment BM25 query with camelCase identifiers found in the text so that the exact
|
||||
// method-name chunk scores higher in BM25 even when it competes with generic mentions.
|
||||
String bm25Text = augmentWithCamelIdents(text);
|
||||
|
||||
List<SearchHit> bm25 = chunks.bm25Search(bm25Text, q.scope(), rerankTopK);
|
||||
float[] vec = embedder.embed(List.of(text)).get(0);
|
||||
List<SearchHit> dense = chunks.denseSearch(vec, q.scope(), rerankTopK);
|
||||
|
||||
List<SearchHit> fused = rrf(bm25, dense);
|
||||
if (fused.size() > rerankTopK) fused = fused.subList(0, rerankTopK);
|
||||
|
||||
// Demote changelog / synthetic-skill / docs paths before the reranker sees them so that
|
||||
// authoritative source-code chunks aren't squeezed out by historical migration notes.
|
||||
List<SearchHit> biased = applyFilePathBias(fused);
|
||||
|
||||
// Enrich with repo name + tag (ChunkStore leaves these empty).
|
||||
List<SearchHit> enriched = enrich(biased);
|
||||
|
||||
List<SearchHit> reranked = reranker.rerank(text, enriched);
|
||||
|
||||
List<SearchHit> packed = packByTokenBudget(reranked, q.tokensBudget(), q.maxHits() > 0 ? q.maxHits() : finalTopK);
|
||||
int totalTokens = packed.stream().mapToInt(h -> estimateTokens(h.content())).sum();
|
||||
return new Result(packed, totalTokens);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
private String rewrite(String text, String topic) {
|
||||
String base = text.trim();
|
||||
if (topic != null && !topic.isBlank()) {
|
||||
return base + " " + topic.trim();
|
||||
}
|
||||
return base;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of {@code text} with each camelCase identifier repeated at the end (once).
|
||||
* This lifts their BM25 term-frequency contribution without altering the semantic meaning
|
||||
* used for the dense embedding query.
|
||||
*
|
||||
* <p>Example: "how to use setCollideWorldBounds" →
|
||||
* "how to use setCollideWorldBounds setCollideWorldBounds"
|
||||
*/
|
||||
private static String augmentWithCamelIdents(String text) {
|
||||
Matcher m = CAMEL_IDENT.matcher(text);
|
||||
StringBuilder extra = new StringBuilder();
|
||||
while (m.find()) {
|
||||
String ident = m.group(1);
|
||||
// Only repeat identifiers that contain at least one uppercase letter
|
||||
// (filters out short common words like "should", "create").
|
||||
if (!ident.equals(ident.toLowerCase())) {
|
||||
extra.append(' ').append(ident);
|
||||
}
|
||||
}
|
||||
return extra.isEmpty() ? text : text + extra;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies a path-based multiplier to RRF scores before handing candidates to the reranker.
|
||||
* Changelogs and synthetic skill docs are semantically relevant but tend to outrank the
|
||||
* authoritative source-code chunks when the query mentions API migration or breaking changes.
|
||||
* Demoting them here keeps them retrievable while giving source files priority.
|
||||
*
|
||||
* <p>Multipliers (tuned against the phaser_rag_eval suite):
|
||||
* <ul>
|
||||
* <li>{@code changelog/} → ×0.50 — migration notes, not current API reference
|
||||
* <li>{@code skills/} / {@code SKILL.md} → ×0.60 — synthetic summaries, not authoritative
|
||||
* <li>{@code docs/} → ×0.75 — curated docs; useful but prefer source JSDoc
|
||||
* <li>everything else (source, tests, configs) → ×1.0
|
||||
* </ul>
|
||||
*/
|
||||
private static List<SearchHit> applyFilePathBias(List<SearchHit> hits) {
|
||||
boolean anyChanged = false;
|
||||
List<SearchHit> out = new ArrayList<>(hits.size());
|
||||
for (SearchHit h : hits) {
|
||||
double mult = filePathMultiplier(h.filePath());
|
||||
if (mult == 1.0) {
|
||||
out.add(h);
|
||||
} else {
|
||||
out.add(new SearchHit(
|
||||
h.chunkId(), h.repoId(), h.versionId(), h.repoName(), h.tag(),
|
||||
h.filePath(), h.startLine(), h.endLine(), h.language(), h.symbol(),
|
||||
h.content(), h.score() * mult));
|
||||
anyChanged = true;
|
||||
}
|
||||
}
|
||||
if (!anyChanged) return hits;
|
||||
out.sort(Comparator.comparingDouble(SearchHit::score).reversed());
|
||||
return out;
|
||||
}
|
||||
|
||||
private static double filePathMultiplier(String filePath) {
|
||||
if (filePath == null || filePath.isEmpty()) return 1.0;
|
||||
String lp = filePath.toLowerCase();
|
||||
if (lp.startsWith("changelog/") || lp.contains("/changelog/")) return 0.50;
|
||||
if (lp.contains("/skills/") || lp.endsWith("skill.md")) return 0.60;
|
||||
if (lp.startsWith("docs/") || lp.contains("/docs/")) return 0.75;
|
||||
return 1.0;
|
||||
}
|
||||
|
||||
private List<SearchHit> rrf(List<SearchHit> a, List<SearchHit> b) {
|
||||
Map<ChunkId, Double> scores = new HashMap<>();
|
||||
Map<ChunkId, SearchHit> firstSeen = new HashMap<>();
|
||||
addRankContribution(a, scores, firstSeen);
|
||||
addRankContribution(b, scores, firstSeen);
|
||||
return scores.entrySet().stream()
|
||||
.sorted(Map.Entry.<ChunkId, Double>comparingByValue().reversed())
|
||||
.map(e -> {
|
||||
SearchHit h = firstSeen.get(e.getKey());
|
||||
return new SearchHit(
|
||||
h.chunkId(), h.repoId(), h.versionId(), h.repoName(), h.tag(),
|
||||
h.filePath(), h.startLine(), h.endLine(), h.language(), h.symbol(),
|
||||
h.content(), e.getValue());
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
|
||||
private void addRankContribution(List<SearchHit> hits, Map<ChunkId, Double> scores, Map<ChunkId, SearchHit> seen) {
|
||||
for (int rank = 0; rank < hits.size(); rank++) {
|
||||
SearchHit h = hits.get(rank);
|
||||
scores.merge(h.chunkId(), 1.0 / (rrfK + rank + 1.0), Double::sum);
|
||||
seen.putIfAbsent(h.chunkId(), h);
|
||||
}
|
||||
}
|
||||
|
||||
private List<SearchHit> enrich(List<SearchHit> hits) {
|
||||
Map<String, String> repoNameByRepoId = new HashMap<>();
|
||||
Map<String, String> tagByVersionId = new HashMap<>();
|
||||
List<SearchHit> out = new ArrayList<>(hits.size());
|
||||
for (SearchHit h : hits) {
|
||||
String repoName = repoNameByRepoId.computeIfAbsent(
|
||||
h.repoId().toString(),
|
||||
k -> repos.findById(h.repoId()).map(Repository::name).orElse("?"));
|
||||
String tag = tagByVersionId.computeIfAbsent(
|
||||
h.versionId().toString(),
|
||||
k -> repos.findVersion(h.versionId()).map(Version::tag).orElse("?"));
|
||||
out.add(new SearchHit(
|
||||
h.chunkId(), h.repoId(), h.versionId(),
|
||||
repoName, tag,
|
||||
h.filePath(), h.startLine(), h.endLine(), h.language(), h.symbol(),
|
||||
h.content(), h.score()));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private List<SearchHit> packByTokenBudget(List<SearchHit> ranked, int tokenBudget, int maxHits) {
|
||||
List<SearchHit> out = new ArrayList<>();
|
||||
int used = 0;
|
||||
for (SearchHit h : ranked) {
|
||||
if (out.size() >= maxHits) break;
|
||||
int t = estimateTokens(h.content());
|
||||
if (used + t > tokenBudget && !out.isEmpty()) break;
|
||||
out.add(h);
|
||||
used += t;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/** 4 chars ≈ 1 token — same rule of thumb Context7 uses for packing. */
|
||||
private static int estimateTokens(String s) {
|
||||
return Math.max(1, s.length() / 4);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user