Skip to content

Enrichment Pipeline

The Enrichment Pipeline is a background worker system that automatically enhances stored memories with extracted entities, relationships, summaries, and pattern associations. This page documents the queue-based architecture, processing stages, entity extraction techniques, and relationship creation mechanisms.

For information about embedding generation (which runs in parallel), see Embedding Generation. For details about consolidation cycles (decay, creative, cluster, forget), see the Consolidation Engine page. For the broader context of all background processing systems, see Background Processing.


The enrichment pipeline operates as an independent background thread that consumes jobs from a thread-safe queue. It processes memories asynchronously after storage, ensuring API write operations remain non-blocking.

sequenceDiagram
    participant API as Flask API
    participant Queue as enrichment_queue
    participant Worker as Enrichment Worker
    participant Graph as FalkorDB
    participant Vector as Qdrant

    API->>Queue: enqueue(EnrichmentJob)
    Note over Queue: Add to enrichment_pending

    Worker->>Queue: dequeue (blocking)
    Note over Worker: Move to enrichment_inflight

    Worker->>Graph: MATCH memory by ID
    Worker->>Worker: extract_entities(content)
    Worker->>Worker: generate_summary(content)

    Worker->>Graph: MATCH recent memories<br/>CREATE PRECEDED_BY edges

    Worker->>Vector: search(embedding, limit=5, threshold=0.8)
    Vector-->>Worker: similar_memories
    Worker->>Graph: CREATE SIMILAR_TO edges<br/>SET strength = cosine_score

    Worker->>Graph: MATCH memories by type<br/>Detect pattern keywords
    Worker->>Graph: MERGE Pattern node<br/>CREATE EXEMPLIFIES edges

    Worker->>Graph: SET enriched=true<br/>SET enriched_at=timestamp<br/>SET metadata.enrichment

    Note over Worker: Remove from enrichment_inflight
    Worker->>Queue: record_success

Jobs enqueued for processing contain the memory ID, retry attempt counter, and a forced flag for admin-triggered reprocessing.

Fields:

  • memory_id — UUID of the memory to enrich
  • attempt — Retry counter (0-indexed), incremented on each failure
  • forced — When true (admin trigger), skips the already-enriched check and reprocesses

(app.py:1086-1090)


Enrichment jobs are enqueued in three scenarios:

TriggerEndpointBehavior
Memory creationPOST /memoryAutomatic enqueue after graph write
Memory updatePATCH /memory/:idRe-enqueue if content/tags changed
Admin reprocessingPOST /enrichment/reprocessForced reprocessing with forced=true

Jobs that fail during processing are retried up to ENRICHMENT_MAX_ATTEMPTS times with exponential backoff:

AttemptBackoffBehavior
10sImmediate first attempt
2ENRICHMENT_FAILURE_BACKOFF_SECONDSDefault 5 seconds
3ENRICHMENT_FAILURE_BACKOFF_SECONDS * 210 seconds
FinalRecord failure in enrichment_stats
stateDiagram-v2
    [*] --> Queued
    Queued --> Processing: dequeue
    Processing --> Success: enrichment complete
    Processing --> Retry: error (attempt < max)
    Processing --> Failed: error (attempt >= max)

    Retry --> Queued: re-enqueue with attempt++
    Success --> [*]
    Failed --> [*]: record_failure

The pipeline uses a two-tier approach: spaCy NLP when available, with regex fallbacks for specific patterns.

Tier 1: spaCy NLP

  • Runs en_core_web_sm model (configurable via ENRICHMENT_SPACY_MODEL)
  • Model is loaded once and cached via LRU cache — eliminates repeated 5-10 second load time
  • Named Entity Recognition (NER) extracts PERSON, ORG, PRODUCT, WORK_OF_ART, EVENT, GPE, LOC labels

Tier 2: Regex Fallbacks

  • Active when spaCy is unavailable or as supplements for patterns spaCy misses
  • Patterns like "met with X", "talked to X" for people
  • Backtick-delimited terms for project names
  • "using X", "deploy X" patterns for tools

Five entity categories are extracted and stored in metadata.entities:

TypespaCy LabelsRegex PatternsExamples
peoplePERSONmet with X, talked to X”Sarah”, “John Smith”
organizationsORG“Google”, “NASA”
toolsPRODUCT, WORK_OF_ARTusing X, deploy X”PostgreSQL”, “Docker”
conceptsEVENT, GPE, LOC“Machine Learning”, “New York”
projectsbackticks, project called "X"”automem”, “Project Phoenix”

The _is_valid_entity function filters noise using multiple heuristics.

Rejection criteria:

  • Length < 3 characters
  • Matches SEARCH_STOPWORDS, ENTITY_STOPWORDS, or ENTITY_BLOCKLIST
  • No alphabetic characters
  • Starts lowercase (unless allow_lower=true)
  • Starts with markdown/code symbols (-, *, #, `, {, etc.)
  • Ends with class name suffixes (Adapter, Handler, Manager, Service, etc.)
  • Boolean/null literals (true, false, null, undefined)
  • Environment variable pattern (UPPER_CASE with underscores)
  • Exceeds max_words limit (if specified)

Extracted entities generate structured tags in the format entity:<type>:<slug>:

"John Smith" → entity:people:john-smith
"PostgreSQL" → entity:tools:postgresql
"Project Phoenix" → entity:projects:project-phoenix

Slugification uses _slugify to convert entities to URL-safe lowercase identifiers.


The generate_summary function creates lightweight gist representations for quick scanning, controlled by the ENRICHMENT_ENABLE_SUMMARIES configuration flag.

flowchart LR
    Content["Memory Content"]
    Split["Split into sentences<br/>regex: (?<=[.!?])\\s+"]
    First["Take first sentence"]
    Truncate["Truncate to max_length<br/>Default: 240 chars"]
    Trim["Trim at last word boundary<br/>rsplit(' ', 1)"]
    Summary["Summary"]

    Content --> Split
    Split --> First
    First --> Truncate
    Truncate --> Trim
    Trim --> Summary

Example:

Input: "The team decided to migrate the database to PostgreSQL. This was due to performance concerns. Migration planned for Q2."

Output: "The team decided to migrate the database to PostgreSQL."


The enrichment pipeline automatically creates three types of relationships to build the knowledge graph.

Connects memories to recently created or updated memories within a time window.

  • Queries FalkorDB for memories created/updated within the configured time window
  • Creates PRECEDED_BY edges from the new memory to those recent memories
  • Establishes chronological chains in the graph

Creates bidirectional edges between semantically similar memories using Qdrant vector search.

Semantic Linking Flow:

  1. Query Qdrant for the memory’s embedding (if available)
  2. Search for similar vectors above the similarity threshold
  3. Create SIMILAR_TO edges in FalkorDB with strength = cosine_score
  4. Links are bidirectional (both directions created)

Configuration:

  • ENRICHMENT_SIMILARITY_LIMIT — Maximum neighbors to link (default: 5)
  • ENRICHMENT_SIMILARITY_THRESHOLD — Minimum cosine similarity (default: 0.8)

Discovers recurring themes by analyzing memories of the same type and linking them to shared Pattern nodes.

Pattern Detection Algorithm:

  1. Load recent memories of the same type
  2. Extract key term frequencies (TF-IDF-style counting)
  3. Identify terms appearing in multiple memories above threshold
  4. MERGE a Pattern node identified by slugified theme name
  5. Create EXEMPLIFIES edge from memory to pattern node

Pattern node properties:

  • pattern_id — Unique identifier (slugified theme)
  • occurrences — Counter incremented each time the pattern is detected
  • first_seen — Timestamp of first occurrence
  • last_reinforced — Timestamp of most recent occurrence

EXEMPLIFIES relationship properties:

  • pattern_type — Memory type that exemplifies the pattern
  • confidence — Detection confidence score
  • key_terms — Array of keywords that define the pattern

VariableDefaultDescription
ENRICHMENT_ENABLE_SUMMARIEStrueEnable automatic summary generation
ENRICHMENT_MAX_ATTEMPTS3Maximum retry attempts before giving up
ENRICHMENT_IDLE_SLEEP_SECONDS2Worker sleep duration when queue is empty
ENRICHMENT_FAILURE_BACKOFF_SECONDS5Backoff delay between retry attempts
ENRICHMENT_SIMILARITY_LIMIT5Maximum semantic neighbors to link
ENRICHMENT_SIMILARITY_THRESHOLD0.8Minimum cosine similarity for SIMILAR_TO edges
ENRICHMENT_SPACY_MODELen_core_web_smspaCy model for NER (requires pip install spacy)

The GET /enrichment/status endpoint exposes real-time worker metrics:

Metrics:

  • queue_depth — Jobs waiting in enrichment_pending
  • inflight_count — Jobs currently being processed in enrichment_inflight
  • processed_total — Lifetime total of processed jobs
  • successes / failures — Success/failure counters
  • last_success_id / last_success_at — Most recent successful enrichment
  • last_error / last_error_at — Most recent error details

Thread-safe sets prevent duplicate processing:

  • enrichment_pending — Set of memory IDs awaiting enrichment (not yet dequeued)
  • enrichment_inflight — Set of memory IDs currently being processed

When a job is dequeued, the ID is moved from pending to inflight. When processing completes (success or final failure), it is removed from inflight. New enqueues check both sets to prevent duplicates.


ScenarioBehaviorRecovery
spaCy model unavailableLog warning, use regex fallbacksGraceful degradation
FalkorDB write failureRetry with exponential backoffMax 3 attempts
Qdrant unavailableSkip semantic linking, log warningContinue enrichment
Invalid memory IDLog error, record failureNo retry
Extraction exceptionCatch, log, retryMax 3 attempts

Jobs increment their attempt counter on each retry. When attempt >= ENRICHMENT_MAX_ATTEMPTS, the job is marked as failed and removed from the queue.

The backoff formula is: base_backoff * (attempt) seconds — so with the default 5-second base:

  • Attempt 1 → immediate
  • Attempt 2 → 5 seconds
  • Attempt 3 → 10 seconds
  • After 3 failures → discard, record in enrichment_stats

  • Entity extraction: ~50-100ms per memory (spaCy), ~5-10ms (regex only)
  • Summary generation: <1ms (single sentence extraction)
  • Temporal linking: ~10-50ms (depends on time window size)
  • Semantic linking: ~20-100ms (Qdrant query for 5 neighbors)
  • Pattern detection: ~50-200ms (depends on memory type frequency)

Total enrichment time per memory: ~150-500ms

  • Single worker thread processes jobs sequentially
  • Non-blocking API writes (enrichment happens asynchronously)
  • Lock-protected tracking sets prevent race conditions
  • Queue-based architecture allows future multi-worker scaling

The POST /enrichment/reprocess endpoint (requires X-Admin-Token) allows forced re-enrichment of existing memories:

Parameters:

  • memory_ids — Array of memory UUIDs to reprocess
  • clear_existing (optional) — Remove existing entity tags and relationships before re-enrichment

Enrichment and embedding generation are independent workers:

SystemTriggerDependency
Embedding WorkerMemory createdNone (immediate queue)
Enrichment WorkerMemory createdUses Qdrant for similarity search (uses existing embeddings if available)

Both can proceed in parallel. Enrichment can create SIMILAR_TO edges even if embedding generation is still in progress, using whatever embeddings are already in Qdrant.

For details on embedding generation, see Embedding Generation.

Enrichment occurs immediately after memory creation, while consolidation runs on scheduled intervals:

  • Enrichment: Per-memory processing (immediate, within seconds)
  • Consolidation: Cross-memory analysis (daily/weekly/monthly intervals)

Enrichment creates the foundation (entities, initial relationships) that consolidation builds upon (decay scores, creative associations, clustering).

For details on consolidation cycles, see the Consolidation Engine page.