Ingestion Pipeline

The ingestion pipeline is the heart of Beever Atlas — it transforms raw messages from Slack, Discord, and Teams into structured knowledge that populates both memory systems. Every message passes through 6 stages before becoming part of your team's knowledge base.

Pipeline Overview

Loading diagram...

Stage 1: Sync

Purpose: Fetch messages from chat platforms in a unified format.

Multi-Platform Adapters normalize messages from different platforms:

  • Slack: slack-sdk (Python) — fetches history, threads, reactions, files
  • Discord: discord.py — fetches messages, attachments, embeds
  • Microsoft Teams: Microsoft Graph API — fetches messages, replies

All platforms produce a NormalizedMessage with:

  • Content (text, markdown)
  • Author (name, ID, role)
  • Channel (ID, name)
  • Timestamp
  • Thread context
  • Attachments (images, PDFs, files)
  • Reactions

Media Processing:

  • Images sent to Gemini Vision for content extraction
  • PDFs processed with pypdf (large PDFs chunked into virtual messages)
  • Other files indexed by filename and metadata

Stage 2: Extract

Purpose: Use LLMs to extract structured knowledge from messages.

This stage runs in parallel for efficiency:

Fact Extraction (FactExtractorAgent)

Extracts atomic facts from message + media context:

What it extracts:

  • Factual statements from conversation
  • Context from attached images/PDFs
  • Action items and decisions
  • Technical details and rationale

Quality Gate:

  • Minimum quality score: 0.5
  • Maximum 2 facts per message (prevents over-extraction)
  • Rejects vague patterns ("the user", "the process", "it was")
  • Requires minimum 40 characters

Example:

Message: "We're switching to JWT for auth. Alice will implement it next sprint."
→ Facts:
  1. "Team decided to switch from OAuth to JWT for authentication"
  2. "Alice is assigned to implement JWT authentication next sprint"

Entity Extraction (EntityExtractorAgent)

Extracts entities and relationships:

Core Entity Types (prefer these):

  • Person: Individual (name, role, team)
  • Decision: Concrete choice (summary, status, rationale, date)
  • Project: Initiative (name, status, description)
  • Technology: Tool/framework (name, category)

Extension Types (create as needed):

  • Team, Meeting, Artifact, Constraint, Budget, Deadline, etc.

Relationships (descriptive verb phrases):

  • Person DECIDED Decision
  • Person WORKS_ON Project
  • Decision AFFECTS Project
  • Decision BLOCKED_BY Constraint
  • Any relationship that captures meaning

Entity Quality Gate:

  • Minimum confidence: 0.6
  • Higher thresholds for high-stakes relationships (DECIDED: 0.7, SUPERSEDES: 0.7)
  • Filters hypotheticals ("maybe", "might", "could")
  • Rejects sarcasm and joking

Alias Mapping: Maps all name variants to canonical form:

  • "Alice", "@alice", "alice.chen" → "Alice Chen"
  • "Atlas", "beever-atlas", "the atlas project" → canonical project name

Stage 3: Validate

Purpose: Quality gates prevent low-quality data from entering your knowledge base.

Memory Quality Gate

Filters facts before embedding:

CriterionImpact
Length < 40 characters-0.3
Vague patterns ("the user")-0.2 each
Starts with "It/This/That"-0.15
Has capitalized words (except first)+0.1

Threshold: Score ≥ 0.5 to pass

Entity Quality Gate

Per-relationship confidence thresholds:

Relationship TypeMin Confidence
DECIDED0.7
OWNS, LEADS0.6
BLOCKED_BY0.6
SUPERSEDES0.7
WORKS_ON0.4
MENTIONS0.3
MEMBER_OF0.4
USES0.4
DEPENDS_ON0.5

Hypothetical detection: Threshold raised to 0.8 if message contains "maybe", "might", "could", "should we"

Cross-Batch Validator

Runs across message batches to:

  • Resolve entity aliases (merge "Alice" and "@alice")
  • Validate relationship consistency
  • Detect and merge duplicate entities

Stage 4: Store

Purpose: Write to Weaviate and Neo4j using the outbox pattern for safety.

Outbox Pattern

Ensures cross-store write safety:

Phase 1: Write Intent

  • Create single MongoDB document with:
    • Facts, entities, embeddings
    • Status: {weaviate: pending, neo4j: pending, state: pending}
    • Deterministic UUID for idempotency

Phase 2: Fan Out

  • Weaviate: Upsert atomic fact (idempotent via UUID)
    • Mark intent.status.weaviate = "done"
  • Neo4j: MERGE entities + relationships (idempotent via MERGE)
    • Mark intent.status.neo4j = "done" (skip if Neo4j unavailable)
  • MongoDB: Update sync state, mark intent complete

Background Reconciler:

  • Runs every 15 minutes
  • Retries incomplete writes
  • Max 5 retry attempts per intent

What Gets Stored

Weaviate (Semantic Memory):

  • Atomic facts with full metadata
  • Embeddings (text, image, doc vectors)
  • Quality scores and temporal validity
  • Citations to source messages
  • Links to graph entities (graph_entity_ids)

Neo4j (Graph Memory):

  • Entities with flexible properties
  • Relationships with temporal tracking
  • Event nodes linking to Weaviate facts
  • Bidirectional edges for traversal

Stage 5: Cluster

Purpose: Group related facts into topics for hierarchical organization.

Clustering Process

Incremental Assignment (after each sync):

  1. Get existing clusters for channel
  2. For each new fact:
    • Calculate similarity to each cluster
    • If score > 0.6, add to cluster
    • Otherwise, create new cluster seed

Cluster Promotion:

  • Seed clusters become full clusters when 3+ members accumulate
  • Prevents noise from single-fact clusters

Cluster Health (Daily Full Rebuild)

Runs at 2 AM UTC daily:

ConditionAction
Cluster > 100 membersSplit via k-means into 2-3 sub-clusters
Two clusters have summary cosine > 0.85Merge into single cluster
Cluster coherence score < 0.4Re-cluster members from scratch
Cluster has 0 membersDelete cluster

Tier Building

Creates the 3-tier hierarchy:

  • Tier 0: Channel summary from all clusters
  • Tier 1: Topic cluster summaries
  • Tier 2: Atomic facts (linked via member_ids)

Stage 6: Wiki Generate

Purpose: Generate structured wiki pages from clusters and entities.

Triggers

After Sync (incremental):

  • Runs automatically when channel sync completes
  • Only rebuilds touched clusters

Daily Rebuild (full):

  • Re-evaluates all clusters for coherence
  • Splits/merges as needed
  • Rebuilds entire wiki

On-Demand (manual):

  • Admin trigger via API
  • Forces full rebuild

Wiki Structure

Channel Wiki
├── Overview (from Tier 0 summary)
├── Topics (from Tier 1 clusters)
│   ├── Authentication
│   │   ├── Summary
│   │   ├── Key Facts
│   │   ├── Related Decisions
│   │   └── People Involved
│   └── Deployment
│       ├── Summary
│       └── Key Facts
└── Entities (from Neo4j)
    ├── People
    │   └── Alice Chen
    │       ├── Role
    │       ├── Projects
    │       └── Decisions
    └── Decisions
        └── JWT Authentication
            ├── Summary
            ├── Status
            ├── Rationale
            └── Related Entities

Wiki Dirty Flag

Ensures wiki reflects latest changes:

  • Set to dirty after: sync, consolidation, entity changes, contradiction resolution
  • Checked on each wiki read
  • Regenerated if dirty, served from cache if clean

Quality Configuration

From .env.example:

# Fact Extraction Quality
QUALITY_THRESHOLD=0.5              # Minimum fact quality score
MAX_FACTS_PER_MESSAGE=2            # Prevent over-extraction

# Entity Extraction Quality
ENTITY_THRESHOLD=0.6               # Minimum entity confidence

# Relationship Quality (higher for important relationships)
DECIDED_THRESHOLD=0.7
BLOCKED_BY_THRESHOLD=0.6
SUPERSEDES_THRESHOLD=0.7

# Clustering
CLUSTER_SIMILARITY_THRESHOLD=0.6   # Add to existing cluster
CLUSTER_PROMOTION_MIN_MEMBERS=3    # Promote seed to cluster
CLUSTER_SPLIT_SIZE=100             # Split large clusters
CLUSTER_MERGE_SIMILARITY=0.85      # Merge similar clusters

Resilience Features

Graceful Degradation

Each stage is independently skippable:

  • Neo4j down: Skip entity extraction, store facts only
  • Jina down: Queue embeddings, store text-only
  • Gemini down: Queue messages for later extraction
  • Weaviate down: Pause ingestion, queue in MongoDB

Dead Letter Queue

Failed extractions are queued for retry:

  • LLM timeout → retry with smaller batch
  • Parse error → retry with simplified prompt
  • Max 3 retry attempts before permanent failure

Backfill Queues

When optional stages fail, work is queued for backfill:

  • Entities not extracted → backfill when Neo4j recovers
  • Embeddings not generated → backfill when Jina recovers
  • Wiki not updated → rebuild on next scheduled run

Performance

StageCostLatencyParallelizable
Sync$0Varies (platform API)Yes (per channel)
Extract~$0.002/msg~1sYes (facts + entities)
Validate$0<10msYes
Store$0~100msNo (outbox pattern)
Cluster~$0.001/cluster~2sYes (per cluster)
Wiki~$0.01/page~5sYes (per page)

Total ingestion cost: ~$0.003 per message

Typical sync (1000 messages): ~$3, completes in ~5 minutes

Next Steps

On this page