Enrichment Pipeline
The Enrichment Pipeline is a background worker system that automatically enhances stored memories with extracted entities, relationships, summaries, and pattern associations. This page documents the queue-based architecture, processing stages, entity extraction techniques, and relationship creation mechanisms.
For information about embedding generation (which runs in parallel), see Embedding Generation. For details about consolidation cycles (decay, creative, cluster, forget), see the Consolidation Engine page. For the broader context of all background processing systems, see Background Processing.
System Architecture
Section titled “System Architecture”The enrichment pipeline operates as an independent background thread that consumes jobs from a thread-safe queue. It processes memories asynchronously after storage, ensuring API write operations remain non-blocking.
Processing Flow
Section titled “Processing Flow”sequenceDiagram
participant API as Flask API
participant Queue as enrichment_queue
participant Worker as Enrichment Worker
participant Graph as FalkorDB
participant Vector as Qdrant
API->>Queue: enqueue(EnrichmentJob)
Note over Queue: Add to enrichment_pending
Worker->>Queue: dequeue (blocking)
Note over Worker: Move to enrichment_inflight
Worker->>Graph: MATCH memory by ID
Worker->>Worker: extract_entities(content)
Worker->>Worker: generate_summary(content)
Worker->>Graph: MATCH recent memories<br/>CREATE PRECEDED_BY edges
Worker->>Vector: search(embedding, limit=5, threshold=0.8)
Vector-->>Worker: similar_memories
Worker->>Graph: CREATE SIMILAR_TO edges<br/>SET strength = cosine_score
Worker->>Graph: MATCH memories by type<br/>Detect pattern keywords
Worker->>Graph: MERGE Pattern node<br/>CREATE EXEMPLIFIES edges
Worker->>Graph: SET enriched=true<br/>SET enriched_at=timestamp<br/>SET metadata.enrichment
Note over Worker: Remove from enrichment_inflight
Worker->>Queue: record_success
Data Structures
Section titled “Data Structures”EnrichmentJob
Section titled “EnrichmentJob”Jobs enqueued for processing contain the memory ID, retry attempt counter, and a forced flag for admin-triggered reprocessing.
Fields:
memory_id— UUID of the memory to enrichattempt— Retry counter (0-indexed), incremented on each failureforced— Whentrue(admin trigger), skips the already-enriched check and reprocesses
Job Lifecycle
Section titled “Job Lifecycle”Job Creation
Section titled “Job Creation”Enrichment jobs are enqueued in three scenarios:
| Trigger | Endpoint | Behavior |
|---|---|---|
| Memory creation | POST /memory | Automatic enqueue after graph write |
| Memory update | PATCH /memory/:id | Re-enqueue if content/tags changed |
| Admin reprocessing | POST /enrichment/reprocess | Forced reprocessing with forced=true |
Retry Logic
Section titled “Retry Logic”Jobs that fail during processing are retried up to ENRICHMENT_MAX_ATTEMPTS times with exponential backoff:
| Attempt | Backoff | Behavior |
|---|---|---|
| 1 | 0s | Immediate first attempt |
| 2 | ENRICHMENT_FAILURE_BACKOFF_SECONDS | Default 5 seconds |
| 3 | ENRICHMENT_FAILURE_BACKOFF_SECONDS * 2 | 10 seconds |
| Final | — | Record failure in enrichment_stats |
stateDiagram-v2
[*] --> Queued
Queued --> Processing: dequeue
Processing --> Success: enrichment complete
Processing --> Retry: error (attempt < max)
Processing --> Failed: error (attempt >= max)
Retry --> Queued: re-enqueue with attempt++
Success --> [*]
Failed --> [*]: record_failure
Entity Extraction
Section titled “Entity Extraction”Extraction Methods
Section titled “Extraction Methods”The pipeline uses a two-tier approach: spaCy NLP when available, with regex fallbacks for specific patterns.
Tier 1: spaCy NLP
- Runs
en_core_web_smmodel (configurable viaENRICHMENT_SPACY_MODEL) - Model is loaded once and cached via LRU cache — eliminates repeated 5-10 second load time
- Named Entity Recognition (NER) extracts
PERSON,ORG,PRODUCT,WORK_OF_ART,EVENT,GPE,LOClabels
Tier 2: Regex Fallbacks
- Active when spaCy is unavailable or as supplements for patterns spaCy misses
- Patterns like
"met with X","talked to X"for people - Backtick-delimited terms for project names
"using X","deploy X"patterns for tools
Entity Types
Section titled “Entity Types”Five entity categories are extracted and stored in metadata.entities:
| Type | spaCy Labels | Regex Patterns | Examples |
|---|---|---|---|
people | PERSON | met with X, talked to X | ”Sarah”, “John Smith” |
organizations | ORG | — | “Google”, “NASA” |
tools | PRODUCT, WORK_OF_ART | using X, deploy X | ”PostgreSQL”, “Docker” |
concepts | EVENT, GPE, LOC | — | “Machine Learning”, “New York” |
projects | — | backticks, project called "X" | ”automem”, “Project Phoenix” |
Entity Validation
Section titled “Entity Validation”The _is_valid_entity function filters noise using multiple heuristics.
Rejection criteria:
- Length < 3 characters
- Matches
SEARCH_STOPWORDS,ENTITY_STOPWORDS, orENTITY_BLOCKLIST - No alphabetic characters
- Starts lowercase (unless
allow_lower=true) - Starts with markdown/code symbols (
-,*,#,`,{, etc.) - Ends with class name suffixes (
Adapter,Handler,Manager,Service, etc.) - Boolean/null literals (
true,false,null,undefined) - Environment variable pattern (UPPER_CASE with underscores)
- Exceeds
max_wordslimit (if specified)
Auto-Tagging
Section titled “Auto-Tagging”Extracted entities generate structured tags in the format entity:<type>:<slug>:
"John Smith" → entity:people:john-smith"PostgreSQL" → entity:tools:postgresql"Project Phoenix" → entity:projects:project-phoenixSlugification uses _slugify to convert entities to URL-safe lowercase identifiers.
Summary Generation
Section titled “Summary Generation”The generate_summary function creates lightweight gist representations for quick scanning, controlled by the ENRICHMENT_ENABLE_SUMMARIES configuration flag.
Algorithm
Section titled “Algorithm”flowchart LR
Content["Memory Content"]
Split["Split into sentences<br/>regex: (?<=[.!?])\\s+"]
First["Take first sentence"]
Truncate["Truncate to max_length<br/>Default: 240 chars"]
Trim["Trim at last word boundary<br/>rsplit(' ', 1)"]
Summary["Summary"]
Content --> Split
Split --> First
First --> Truncate
Truncate --> Trim
Trim --> Summary
Example:
Input: "The team decided to migrate the database to PostgreSQL. This was due to performance concerns. Migration planned for Q2."
Output: "The team decided to migrate the database to PostgreSQL."
Relationship Creation
Section titled “Relationship Creation”The enrichment pipeline automatically creates three types of relationships to build the knowledge graph.
Temporal Links (PRECEDED_BY)
Section titled “Temporal Links (PRECEDED_BY)”Connects memories to recently created or updated memories within a time window.
- Queries FalkorDB for memories created/updated within the configured time window
- Creates
PRECEDED_BYedges from the new memory to those recent memories - Establishes chronological chains in the graph
Semantic Links (SIMILAR_TO)
Section titled “Semantic Links (SIMILAR_TO)”Creates bidirectional edges between semantically similar memories using Qdrant vector search.
Semantic Linking Flow:
- Query Qdrant for the memory’s embedding (if available)
- Search for similar vectors above the similarity threshold
- Create
SIMILAR_TOedges in FalkorDB withstrength = cosine_score - Links are bidirectional (both directions created)
Configuration:
ENRICHMENT_SIMILARITY_LIMIT— Maximum neighbors to link (default: 5)ENRICHMENT_SIMILARITY_THRESHOLD— Minimum cosine similarity (default: 0.8)
Pattern Detection (EXEMPLIFIES)
Section titled “Pattern Detection (EXEMPLIFIES)”Discovers recurring themes by analyzing memories of the same type and linking them to shared Pattern nodes.
Pattern Detection Algorithm:
- Load recent memories of the same
type - Extract key term frequencies (TF-IDF-style counting)
- Identify terms appearing in multiple memories above threshold
MERGEaPatternnode identified by slugified theme name- Create
EXEMPLIFIESedge from memory to pattern node
Pattern node properties:
pattern_id— Unique identifier (slugified theme)occurrences— Counter incremented each time the pattern is detectedfirst_seen— Timestamp of first occurrencelast_reinforced— Timestamp of most recent occurrence
EXEMPLIFIES relationship properties:
pattern_type— Memory type that exemplifies the patternconfidence— Detection confidence scorekey_terms— Array of keywords that define the pattern
Configuration
Section titled “Configuration”Environment Variables
Section titled “Environment Variables”| Variable | Default | Description |
|---|---|---|
ENRICHMENT_ENABLE_SUMMARIES | true | Enable automatic summary generation |
ENRICHMENT_MAX_ATTEMPTS | 3 | Maximum retry attempts before giving up |
ENRICHMENT_IDLE_SLEEP_SECONDS | 2 | Worker sleep duration when queue is empty |
ENRICHMENT_FAILURE_BACKOFF_SECONDS | 5 | Backoff delay between retry attempts |
ENRICHMENT_SIMILARITY_LIMIT | 5 | Maximum semantic neighbors to link |
ENRICHMENT_SIMILARITY_THRESHOLD | 0.8 | Minimum cosine similarity for SIMILAR_TO edges |
ENRICHMENT_SPACY_MODEL | en_core_web_sm | spaCy model for NER (requires pip install spacy) |
Monitoring and Observability
Section titled “Monitoring and Observability”Status Endpoint
Section titled “Status Endpoint”The GET /enrichment/status endpoint exposes real-time worker metrics:
Metrics:
queue_depth— Jobs waiting inenrichment_pendinginflight_count— Jobs currently being processed inenrichment_inflightprocessed_total— Lifetime total of processed jobssuccesses/failures— Success/failure counterslast_success_id/last_success_at— Most recent successful enrichmentlast_error/last_error_at— Most recent error details
Tracking Sets
Section titled “Tracking Sets”Thread-safe sets prevent duplicate processing:
enrichment_pending— Set of memory IDs awaiting enrichment (not yet dequeued)enrichment_inflight— Set of memory IDs currently being processed
When a job is dequeued, the ID is moved from pending to inflight. When processing completes (success or final failure), it is removed from inflight. New enqueues check both sets to prevent duplicates.
Error Handling
Section titled “Error Handling”Failure Scenarios
Section titled “Failure Scenarios”| Scenario | Behavior | Recovery |
|---|---|---|
| spaCy model unavailable | Log warning, use regex fallbacks | Graceful degradation |
| FalkorDB write failure | Retry with exponential backoff | Max 3 attempts |
| Qdrant unavailable | Skip semantic linking, log warning | Continue enrichment |
| Invalid memory ID | Log error, record failure | No retry |
| Extraction exception | Catch, log, retry | Max 3 attempts |
Retry Mechanics
Section titled “Retry Mechanics”Jobs increment their attempt counter on each retry. When attempt >= ENRICHMENT_MAX_ATTEMPTS, the job is marked as failed and removed from the queue.
The backoff formula is: base_backoff * (attempt) seconds — so with the default 5-second base:
- Attempt 1 → immediate
- Attempt 2 → 5 seconds
- Attempt 3 → 10 seconds
- After 3 failures → discard, record in
enrichment_stats
Performance Characteristics
Section titled “Performance Characteristics”Throughput
Section titled “Throughput”- Entity extraction: ~50-100ms per memory (spaCy), ~5-10ms (regex only)
- Summary generation: <1ms (single sentence extraction)
- Temporal linking: ~10-50ms (depends on time window size)
- Semantic linking: ~20-100ms (Qdrant query for 5 neighbors)
- Pattern detection: ~50-200ms (depends on memory type frequency)
Total enrichment time per memory: ~150-500ms
Concurrency
Section titled “Concurrency”- Single worker thread processes jobs sequentially
- Non-blocking API writes (enrichment happens asynchronously)
- Lock-protected tracking sets prevent race conditions
- Queue-based architecture allows future multi-worker scaling
Admin Operations
Section titled “Admin Operations”Force Reprocessing
Section titled “Force Reprocessing”The POST /enrichment/reprocess endpoint (requires X-Admin-Token) allows forced re-enrichment of existing memories:
Parameters:
memory_ids— Array of memory UUIDs to reprocessclear_existing(optional) — Remove existing entity tags and relationships before re-enrichment
Integration with Other Systems
Section titled “Integration with Other Systems”Relationship to Embedding Worker
Section titled “Relationship to Embedding Worker”Enrichment and embedding generation are independent workers:
| System | Trigger | Dependency |
|---|---|---|
| Embedding Worker | Memory created | None (immediate queue) |
| Enrichment Worker | Memory created | Uses Qdrant for similarity search (uses existing embeddings if available) |
Both can proceed in parallel. Enrichment can create SIMILAR_TO edges even if embedding generation is still in progress, using whatever embeddings are already in Qdrant.
For details on embedding generation, see Embedding Generation.
Relationship to Consolidation
Section titled “Relationship to Consolidation”Enrichment occurs immediately after memory creation, while consolidation runs on scheduled intervals:
- Enrichment: Per-memory processing (immediate, within seconds)
- Consolidation: Cross-memory analysis (daily/weekly/monthly intervals)
Enrichment creates the foundation (entities, initial relationships) that consolidation builds upon (decay scores, creative associations, clustering).
For details on consolidation cycles, see the Consolidation Engine page.