Skip to content

Background Processing

AutoMem implements four background processing systems that operate independently of the main Flask API request/response cycle. These systems handle computationally expensive operations without blocking client requests.

For detailed implementation of each system, see:


AutoMem implements four background processing systems, each with distinct triggers, execution models, and responsibilities:

SystemTriggerExecution ModelPrimary Purpose
Enrichment PipelineEvent-drivenQueue + worker threadEnhance memories with entities, tags, relationships
Embedding WorkerEvent-drivenQueue + batch accumulatorGenerate and store vector embeddings
Consolidation EngineTime-basedScheduled intervalsApply decay, discover associations, cluster, forget
Sync WorkerTime-basedInterval pollingDetect and repair FalkorDB/Qdrant drift

graph TB
    subgraph write_ops["Write Operations"]
        PostMem["POST /memory<br/>app.py:~467"]
        PatchMem["PATCH /memory/:id<br/>app.py:~902"]
    end

    subgraph queues["Queue Management"]
        EnrichQueue["enrichment_queue<br/>ServiceState"]
        EmbedQueue["embedding_queue<br/>ServiceState"]

        EnrichPending["enrichment_pending<br/>Set[str]"]
        EnrichInflight["enrichment_inflight<br/>Set[str]"]
        EmbedPending["embedding_pending<br/>Set[str]"]
        EmbedInflight["embedding_inflight<br/>Set[str]"]
    end

    subgraph workers["Worker Threads"]
        direction TB

        EnrichWorker["EnrichmentWorker<br/>_run_enrichment_worker()<br/>app.py:~1200-1400"]
        EmbedWorker["EmbeddingWorker<br/>_run_embedding_worker()<br/>app.py:~1400-1600"]
        ConsolWorker["ConsolidationScheduler<br/>consolidation.py"]
        SyncWorker["SyncWorker<br/>_run_sync_worker()<br/>app.py:~1600-1800"]
    end

    subgraph config["Configuration"]
        EnrichConfig["ENRICHMENT_IDLE_SLEEP_SECONDS=2<br/>ENRICHMENT_MAX_ATTEMPTS=3<br/>ENRICHMENT_SIMILARITY_LIMIT=5"]
        EmbedConfig["EMBEDDING_BATCH_SIZE=20<br/>EMBEDDING_BATCH_TIMEOUT_SECONDS=2.0"]
        ConsolConfig["CONSOLIDATION_TICK_SECONDS<br/>DECAY/CREATIVE/CLUSTER/FORGET<br/>_INTERVAL_SECONDS"]
        SyncConfig["SYNC_CHECK_INTERVAL_SECONDS<br/>SYNC_AUTO_REPAIR=true"]
    end

    PostMem --> EnrichQueue
    PostMem --> EmbedQueue
    PatchMem --> EnrichQueue
    PatchMem -.->|"if content changed"| EmbedQueue

    EnrichQueue --> EnrichPending
    EmbedQueue --> EmbedPending

    EnrichWorker --> EnrichInflight
    EnrichWorker --> EnrichPending
    EmbedWorker --> EmbedInflight
    EmbedWorker --> EmbedPending

    EnrichConfig -.-> EnrichWorker
    EmbedConfig -.-> EmbedWorker
    ConsolConfig -.-> ConsolWorker
    SyncConfig -.-> SyncWorker

graph TB
    subgraph "HTTP Request Thread"
        PostMemory["POST /memory<br/>Route handler"]
        PatchMemory["PATCH /memory/:id<br/>Route handler"]
    end

    subgraph "ServiceState Queues"
        EnrichQ["enrichment_queue: Queue<br/>Thread-safe FIFO"]
        EmbedQ["embedding_queue: Queue<br/>Thread-safe FIFO"]

        EnrichPending["enrichment_pending: Set<br/>Dedup tracking"]
        EnrichInflight["enrichment_inflight: Set<br/>Processing tracking"]

        EmbedPending["embedding_pending: Set<br/>Dedup tracking"]
        EmbedInflight["embedding_inflight: Set<br/>Processing tracking"]
    end

    subgraph "Background Worker Threads"
        EnrichWorker["enrichment_worker()<br/>Entity extraction thread"]
        EmbedWorker["embedding_worker()<br/>Batch embedding thread"]
    end

    PostMemory -->|"queue.put(memory_id)"| EnrichQ
    PostMemory -->|"queue.put(memory_id)"| EmbedQ
    PatchMemory -->|"queue.put(memory_id)"| EnrichQ
    PatchMemory -.->|"if content changed"| EmbedQ

    EnrichQ --> EnrichPending
    EmbedQ --> EmbedPending

    EnrichPending -.->|"if not in pending/inflight"| EnrichWorker
    EmbedPending -.->|"if not in pending/inflight"| EmbedWorker

    EnrichWorker -->|"Processing"| EnrichInflight
    EmbedWorker -->|"Processing"| EmbedInflight

    EnrichInflight -.->|"Complete"| EnrichPending
    EmbedInflight -.->|"Complete"| EmbedPending

Key Insights:

  • POST /memory writes immediately to FalkorDB, then enqueues background jobs
  • Enrichment and embedding workers run continuously in separate threads
  • Consolidation scheduler runs periodic checks via Flask-APScheduler
  • All systems can operate independently; failures are isolated

All background workers run in daemon threads started during Flask application initialization:

ComponentStarted AtDaemonLifecycle
enrichment_workerapp.py:2566-2568YesRuns until app shutdown
embedding_workerapp.py:2570-2572YesRuns until app shutdown
Consolidation schedulerapp.py:2586-2610Via APSchedulerManaged by scheduler

Thread Safety:

  • enrichment_queue and embedding_queue use Python’s thread-safe Queue class
  • Each worker polls its queue in an infinite loop with timeout-based blocking
  • FalkorDB and Qdrant clients are thread-safe for read/write operations

  1. Database clients initialized: app.py:2495-2565
  2. Consolidator created: app.py:2574-2584
  3. Enrichment worker started: app.py:2566-2568
  4. Embedding worker started: app.py:2570-2572
  5. Scheduler initialized and started: app.py:2586-2610

All background threads are daemon threads, meaning they terminate automatically when the main Flask process exits. No explicit cleanup is required.

Implications:

  • In-flight enrichment/embedding jobs may be lost on shutdown
  • Consolidation tasks may be interrupted mid-run
  • Queue contents are not persisted between restarts
  • Restarting the service will reprocess queued items from scratch (new memories will be re-enqueued on next access or re-enrichment trigger)

VariableDefaultPurpose
ENRICHMENT_MAX_ATTEMPTS3Max retries per memory
ENRICHMENT_SIMILARITY_LIMIT5Max similar neighbors to link
ENRICHMENT_SIMILARITY_THRESHOLD0.8Min cosine similarity for SIMILAR_TO edge
ENRICHMENT_IDLE_SLEEP_SECONDS2.0Queue poll timeout
ENRICHMENT_FAILURE_BACKOFF_SECONDS5.0Base backoff on failure
ENRICHMENT_ENABLE_SUMMARIEStrueGenerate content summaries
ENRICHMENT_SPACY_MODELen_core_web_smspaCy model for NER
VariableDefaultPurpose
EMBEDDING_BATCH_SIZE20Items per batch
EMBEDDING_BATCH_TIMEOUT_SECONDS2.0Max wait before processing partial batch
VariableDefaultPurpose
CONSOLIDATION_TICK_SECONDS60Scheduler check interval
CONSOLIDATION_DECAY_INTERVAL_SECONDS3600Decay task frequency (1 hour)
CONSOLIDATION_CREATIVE_INTERVAL_SECONDS3600Creative task frequency (1 hour)
CONSOLIDATION_CLUSTER_INTERVAL_SECONDS21600Cluster task frequency (6 hours)
CONSOLIDATION_FORGET_INTERVAL_SECONDS86400Forget task frequency (1 day)
CONSOLIDATION_DECAY_IMPORTANCE_THRESHOLD0.3Min importance for decay (optional filter)
CONSOLIDATION_HISTORY_LIMIT20Max consolidation run history

The /health endpoint exposes real-time statistics for all background systems:

MetricSourceInterpretation
enrichment.queue_depthapp.py:2255Items waiting in queue
enrichment.pendingapp.py:2252Memories not yet enriched in graph
enrichment.inflightapp.py:2253Currently processing
enrichment.processedEnrichmentStats.successesTotal completed
enrichment.failedEnrichmentStats.failuresTotal failed
embedding.queue_depthapp.py:2267Embeddings queued for generation
consolidation.last_runsConsolidationSchedulerLast execution timestamps
consolidation.next_runsConsolidationScheduler.get_next_runs()Time until next run

Advanced monitoring available via admin token:

EndpointMethodPurpose
/enrichment/statusGETDetailed enrichment stats + sample pending IDs
/enrichment/reprocessPOSTRe-enqueue specific memory IDs
/consolidatePOSTManually trigger consolidation tasks
/consolidate/statusGETConsolidation history and next run times

Failed enrichment jobs are automatically retried with exponential backoff:

  • Max attempts: ENRICHMENT_MAX_ATTEMPTS (default: 3)
  • Base backoff: ENRICHMENT_FAILURE_BACKOFF_SECONDS (default: 5.0)
  • Backoff formula: base_backoff * (5 ** attempt)
  • Failed jobs logged and removed from queue after max attempts

Embedding failures are currently not retried at the batch level. If a batch fails:

  1. Error is logged
  2. Memories remain without embeddings in Qdrant
  3. Graph metadata embedding_status remains in queue state
  4. Re-embedding can be triggered via /admin/reembed

Consolidation tasks catch exceptions and continue:

  • Failed tasks are logged with error details
  • Next scheduled run proceeds normally
  • Individual task failures don’t affect other tasks
  • History includes error details for debugging

SystemCPU UsageMemory UsageI/O Pattern
EnrichmentModerate (spaCy NER)Low per jobBurst writes to graph
EmbeddingLow (network-bound)Low (batch accumulator)Batched API calls
ConsolidationHigh (graph traversal)Moderate (in-memory cache)Large read + write operations
Sync WorkerLow (periodic scans)LowPeriodic graph + vector queries

Embedding Batching (40-50% cost reduction):

  • Accumulates up to 20 memories before calling provider API
  • Single API call generates embeddings for entire batch
  • Timeout ensures responsiveness (max 2s delay)

Relationship Count Caching (80% consolidation speedup):

  • LRU cache with 10,000 entry capacity
  • Hourly cache invalidation via timestamp key
  • Dramatically reduces graph queries during decay cycles (consolidation.py:152-176)

The POST /memory endpoint response includes indicators of queued background work:

{
"memory_id": "uuid",
"message": "Memory stored successfully",
"enrichment_queued": true,
"embedding_queued": true
}

Recall queries benefit from completed background processing:

  • Vector search uses embeddings generated by the embedding worker
  • Relationship traversal uses edges created by the enrichment worker
  • Relevance scores use relevance_score updated by the consolidation engine

AutoMem’s background processing architecture achieves:

  1. Non-blocking API responses — Write to graph first, enhance later
  2. Cost efficiency — Batched embedding generation reduces API calls by 40-50%
  3. Automatic maintenance — Scheduled consolidation keeps graph healthy over time
  4. Fault tolerance — Independent systems with retry logic and error isolation
  5. Observability — Rich metrics via /health and admin endpoints

For implementation details of each subsystem: