Skip to content

Background Processing

AutoMem implements four background processing systems that operate independently of the main Flask API request/response cycle. These systems handle computationally expensive operations without blocking client requests.

For detailed implementation of each system, see:


AutoMem implements four background processing systems, each with distinct triggers, execution models, and responsibilities:

SystemTriggerExecution ModelPrimary Purpose
Enrichment PipelineEvent-drivenQueue + worker threadEnhance memories with entities, tags, relationships
Embedding WorkerEvent-drivenQueue + batch accumulatorGenerate and store vector embeddings
Consolidation EngineTime-basedScheduled intervalsApply decay, discover associations, cluster, forget
Sync WorkerTime-basedInterval pollingDetect and repair FalkorDB/Qdrant drift

graph TB
    subgraph write_ops["Write Operations"]
        PostMem["POST /memory<br/>api/memory.py"]
        PatchMem["PATCH /memory/:id<br/>api/memory.py"]
    end

    subgraph queues["Queue Management"]
        EnrichQueue["enrichment_queue<br/>ServiceState"]
        EmbedQueue["embedding_queue<br/>ServiceState"]

        EnrichPending["enrichment_pending<br/>Set[str]"]
        EnrichInflight["enrichment_inflight<br/>Set[str]"]
        EmbedPending["embedding_pending<br/>Set[str]"]
        EmbedInflight["embedding_inflight<br/>Set[str]"]
    end

    subgraph workers["Worker Threads"]
        direction TB

        EnrichWorker["EnrichmentWorker<br/>enrichment/runtime_worker.py"]
        EmbedWorker["EmbeddingWorker<br/>embedding/runtime_pipeline.py"]
        ConsolWorker["ConsolidationScheduler<br/>consolidation.py"]
        SyncWorker["SyncWorker<br/>sync/runtime_worker.py"]
    end

    subgraph config["Configuration"]
        EnrichConfig["ENRICHMENT_IDLE_SLEEP_SECONDS=2<br/>ENRICHMENT_MAX_ATTEMPTS=3<br/>ENRICHMENT_SIMILARITY_LIMIT=5"]
        EmbedConfig["EMBEDDING_BATCH_SIZE=20<br/>EMBEDDING_BATCH_TIMEOUT_SECONDS=2.0"]
        ConsolConfig["CONSOLIDATION_TICK_SECONDS<br/>DECAY/CREATIVE/CLUSTER/FORGET<br/>_INTERVAL_SECONDS"]
        SyncConfig["SYNC_CHECK_INTERVAL_SECONDS<br/>SYNC_AUTO_REPAIR=true"]
    end

    PostMem --> EnrichQueue
    PostMem --> EmbedQueue
    PatchMem --> EnrichQueue
    PatchMem -.->|"if content changed"| EmbedQueue

    EnrichQueue --> EnrichPending
    EmbedQueue --> EmbedPending

    EnrichWorker --> EnrichInflight
    EnrichWorker --> EnrichPending
    EmbedWorker --> EmbedInflight
    EmbedWorker --> EmbedPending

    EnrichConfig -.-> EnrichWorker
    EmbedConfig -.-> EmbedWorker
    ConsolConfig -.-> ConsolWorker
    SyncConfig -.-> SyncWorker

graph TB
    subgraph "HTTP Request Thread"
        PostMemory["POST /memory<br/>Route handler"]
        PatchMemory["PATCH /memory/:id<br/>Route handler"]
    end

    subgraph "ServiceState Queues"
        EnrichQ["enrichment_queue: Queue<br/>Thread-safe FIFO"]
        EmbedQ["embedding_queue: Queue<br/>Thread-safe FIFO"]

        EnrichPending["enrichment_pending: Set<br/>Dedup tracking"]
        EnrichInflight["enrichment_inflight: Set<br/>Processing tracking"]

        EmbedPending["embedding_pending: Set<br/>Dedup tracking"]
        EmbedInflight["embedding_inflight: Set<br/>Processing tracking"]
    end

    subgraph "Background Worker Threads"
        EnrichWorker["enrichment_worker()<br/>Entity extraction thread"]
        EmbedWorker["embedding_worker()<br/>Batch embedding thread"]
    end

    PostMemory -->|"queue.put(memory_id)"| EnrichQ
    PostMemory -->|"queue.put(memory_id)"| EmbedQ
    PatchMemory -->|"queue.put(memory_id)"| EnrichQ
    PatchMemory -.->|"if content changed"| EmbedQ

    EnrichQ --> EnrichPending
    EmbedQ --> EmbedPending

    EnrichPending -.->|"if not in pending/inflight"| EnrichWorker
    EmbedPending -.->|"if not in pending/inflight"| EmbedWorker

    EnrichWorker -->|"Processing"| EnrichInflight
    EmbedWorker -->|"Processing"| EmbedInflight

    EnrichInflight -.->|"Complete"| EnrichPending
    EmbedInflight -.->|"Complete"| EmbedPending

Key Insights:

  • POST /memory writes immediately to FalkorDB, then enqueues background jobs
  • Enrichment and embedding workers run continuously in separate threads
  • Consolidation scheduler runs periodic checks via a custom thread-based scheduler
  • All systems can operate independently; failures are isolated

All background workers run in daemon threads started during Flask application initialization:

ComponentStarted AtDaemonLifecycle
enrichment_workerautomem/enrichment/runtime_queue_bindings.pyYesRuns until app shutdown
embedding_workerautomem/embedding/runtime_bindings.pyYesRuns until app shutdown
Consolidation schedulerautomem/consolidation/runtime_bindings.pyYesCustom thread-based scheduler

Thread Safety:

  • enrichment_queue and embedding_queue use Python’s thread-safe Queue class
  • Each worker polls its queue in an infinite loop with timeout-based blocking
  • FalkorDB and Qdrant clients are thread-safe for read/write operations

Startup is orchestrated by automem/runtime_wiring.py:

  1. init_falkordb() — Establish FalkorDB connection
  2. init_qdrant() — Establish optional Qdrant connection
  3. init_openai() — Initialize OpenAI client for classification
  4. init_embedding_provider() — Select and initialize embedding provider
  5. init_enrichment_pipeline() — Start enrichment worker thread
  6. init_embedding_pipeline() — Start embedding worker thread
  7. init_consolidation_scheduler() — Start consolidation scheduler
  8. init_sync_worker() — Start sync worker thread

All background threads are daemon threads, meaning they terminate automatically when the main Flask process exits. No explicit cleanup is required.

Implications:

  • In-flight enrichment/embedding jobs may be lost on shutdown
  • Consolidation tasks may be interrupted mid-run
  • Queue contents are not persisted between restarts
  • Restarting the service will reprocess queued items from scratch (new memories will be re-enqueued on next access or re-enrichment trigger)

VariableDefaultPurpose
ENRICHMENT_MAX_ATTEMPTS3Max retries per memory
ENRICHMENT_SIMILARITY_LIMIT5Max similar neighbors to link
ENRICHMENT_SIMILARITY_THRESHOLD0.8Min cosine similarity for SIMILAR_TO edge
ENRICHMENT_IDLE_SLEEP_SECONDS2.0Queue poll timeout
ENRICHMENT_FAILURE_BACKOFF_SECONDS5.0Base backoff on failure
ENRICHMENT_ENABLE_SUMMARIEStrueGenerate content summaries
ENRICHMENT_SPACY_MODELen_core_web_smspaCy model for NER
VariableDefaultPurpose
EMBEDDING_BATCH_SIZE20Items per batch
EMBEDDING_BATCH_TIMEOUT_SECONDS2.0Max wait before processing partial batch
VariableDefaultPurpose
CONSOLIDATION_TICK_SECONDS60Scheduler check interval
CONSOLIDATION_DECAY_INTERVAL_SECONDS86400Decay task frequency (1 day)
CONSOLIDATION_CREATIVE_INTERVAL_SECONDS604800Creative task frequency (1 week)
CONSOLIDATION_CLUSTER_INTERVAL_SECONDS2592000Cluster task frequency (1 month)
CONSOLIDATION_FORGET_INTERVAL_SECONDS0Forget task frequency (0 = disabled)
CONSOLIDATION_DECAY_IMPORTANCE_THRESHOLD0.3Min importance for decay (optional filter)
CONSOLIDATION_HISTORY_LIMIT20Max consolidation run history

The /health endpoint exposes real-time statistics for all background systems:

MetricSourceInterpretation
enrichment.queue_depthServiceState.enrichment_queueItems waiting in queue
enrichment.pendingServiceState.enrichment_pendingMemories not yet enriched in graph
enrichment.inflightServiceState.enrichment_inflightCurrently processing
enrichment.processedEnrichmentStats.successesTotal completed
enrichment.failedEnrichmentStats.failuresTotal failed
embedding.queue_depthServiceState.embedding_queueEmbeddings queued for generation
consolidation.last_runsConsolidationSchedulerLast execution timestamps
consolidation.next_runsConsolidationScheduler.get_next_runs()Time until next run

Advanced monitoring available via admin token:

EndpointMethodPurpose
/enrichment/statusGETDetailed enrichment stats + sample pending IDs
/enrichment/reprocessPOSTRe-enqueue specific memory IDs
/consolidatePOSTManually trigger consolidation tasks
/consolidate/statusGETConsolidation history and next run times

Failed enrichment jobs are automatically retried with flat backoff:

  • Max attempts: ENRICHMENT_MAX_ATTEMPTS (default: 3)
  • Backoff: ENRICHMENT_FAILURE_BACKOFF_SECONDS (default: 5.0) — sleeps this flat duration on each failure
  • Failed jobs logged and removed from queue after max attempts

Embedding failures are currently not retried at the batch level. If a batch fails:

  1. Error is logged
  2. Memories remain without embeddings in Qdrant
  3. Graph metadata embedding_status remains in queue state
  4. Re-embedding can be triggered via /admin/reembed

Consolidation tasks catch exceptions and continue:

  • Failed tasks are logged with error details
  • Next scheduled run proceeds normally
  • Individual task failures don’t affect other tasks
  • History includes error details for debugging

SystemCPU UsageMemory UsageI/O Pattern
EnrichmentModerate (spaCy NER)Low per jobBurst writes to graph
EmbeddingLow (network-bound)Low (batch accumulator)Batched API calls
ConsolidationHigh (graph traversal)Moderate (in-memory cache)Large read + write operations
Sync WorkerLow (periodic scans)LowPeriodic graph + vector queries

Embedding Batching (40-50% cost reduction):

  • Accumulates up to 20 memories before calling provider API
  • Single API call generates embeddings for entire batch
  • Timeout ensures responsiveness (max 2s delay)

Relationship Count Caching (80% consolidation speedup):

  • LRU cache with 10,000 entry capacity
  • Hourly cache invalidation via timestamp key
  • Dramatically reduces graph queries during decay cycles (consolidation.py:152-176)

The POST /memory endpoint response includes indicators of queued background work:

{
"memory_id": "uuid",
"message": "Memory stored successfully",
"enrichment_queued": true,
"embedding_queued": true
}

Recall queries benefit from completed background processing:

  • Vector search uses embeddings generated by the embedding worker
  • Relationship traversal uses edges created by the enrichment worker
  • Relevance scores use relevance_score updated by the consolidation engine

AutoMem’s background processing architecture achieves:

  1. Non-blocking API responses — Write to graph first, enhance later
  2. Cost efficiency — Batched embedding generation reduces API calls by 40-50%
  3. Automatic maintenance — Scheduled consolidation keeps graph healthy over time
  4. Fault tolerance — Independent systems with retry logic and error isolation
  5. Observability — Rich metrics via /health and admin endpoints

For implementation details of each subsystem: