Beever Atlas v0.1 has launched! Star us on GitHub
Beever AtlasBeever Atlas

SSE Streaming Protocol

Overview

The Ask API uses Server-Sent Events (SSE) for streaming responses. This enables real-time delivery of generated tokens and citations as they become available.

Connection Lifecycle

1. Establish Connection

Send a POST request to /api/ask with your question. The server responds with text/event-stream content type and keeps the connection open.

2. Receive Events

The server sends events as they occur:

data: {"type":"token","content":"Hello","message_id":"msg_123"}

data: {"type":"citation","citation":{...}}

data: {"type":"done","message_id":"msg_123",...}

3. Connection Termination

The connection closes when:

  • The server sends a done event
  • An error occurs
  • The client disconnects

Event Types

token

Delivers a chunk of generated text.

{
  "type": "token",
  "content": "Hello",
  "message_id": "msg_123",
  "is_complete": false
}

Fields:

  • content: Text fragment (may be partial word)
  • message_id: Unique message identifier
  • is_complete: Whether this completes the current token

citation

Provides a source reference for information in the response.

{
  "type": "citation",
  "citation": {
    "id": "[1]",
    "author": "Jane Smith",
    "channel": "#engineering",
    "timestamp": "2026-04-13T10:30:00Z",
    "text_excerpt": "We decided to use JWT tokens for authentication...",
    "permalink": "https://slack.com/archives/C12345/p1234567890123456",
    "source_message_id": "1234567890.123456",
    "media_type": "link" | "pdf" | "image" | null,
    "media_name": "design-doc.pdf" | null
  }
}

Fields:

  • id: Citation reference like [1], [2]
  • author: Message author
  • channel: Channel name
  • timestamp: Message timestamp
  • text_excerpt: First 100 characters of source
  • permalink: URL to original message
  • source_message_id: Platform message ID
  • media_type: Type of media (if any)
  • media_name: Name of media file (if any)

done

Signals completion of the response.

{
  "type": "done",
  "message_id": "msg_123",
  "session_id": "sess_456",
  "tokens_used": 1234,
  "citations_count": 3,
  "duration_ms": 2500
}

Fields:

  • message_id: Final message ID
  • session_id: Session ID for follow-ups
  • tokens_used: Total tokens consumed
  • citations_count: Number of citations
  • duration_ms: Response generation time

error

Indicates an error occurred.

{
  "type": "error",
  "error": "Error description",
  "code": "RATE_LIMIT_EXCEEDED" | "INVALID_REQUEST" | "INTERNAL_ERROR"
}

Client Implementation

JavaScript (Browser)

async function askQuestion(question) {
  const response = await fetch('http://localhost:8000/api/ask', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ question })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
  let fullResponse = '';
  const citations = [];

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      
      try {
        const event = JSON.parse(line.slice(6));
        
        switch (event.type) {
          case 'token':
            fullResponse += event.content;
            onText(event.content);
            break;
          case 'citation':
            citations.push(event.citation);
            onCitation(event.citation);
            break;
          case 'done':
            onComplete({ 
              response: fullResponse, 
              citations,
              metrics: event 
            });
            break;
          case 'error':
            onError(event.error);
            break;
        }
      } catch (e) {
        console.error('Parse error:', e);
      }
    }
  }
}

function onText(text) {
  process.stdout.write(text);
}

function onCitation(citation) {
  console.log(`\n[${citation.id}] ${citation.author} - ${citation.timestamp}`);
}

function onComplete(result) {
  console.log('\n\nComplete:', result.metrics);
}

JavaScript (Node.js with EventSource)

const EventSource = require('eventsource');

function askQuestion(question) {
  const eventSource = new EventSource(
    `http://localhost:8000/api/ask?question=${encodeURIComponent(question)}`
  );

  eventSource.addEventListener('token', (e) => {
    const { content } = JSON.parse(e.data);
    process.stdout.write(content);
  });

  eventSource.addEventListener('citation', (e) => {
    const citation = JSON.parse(e.data);
    console.log(`\n[Citation] ${citation.id}: ${citation.permalink}`);
  });

  eventSource.addEventListener('done', (e) => {
    const metrics = JSON.parse(e.data);
    console.log('\n[Done]', metrics);
    eventSource.close();
  });

  eventSource.addEventListener('error', (e) => {
    console.error('[Error]', e);
    eventSource.close();
  });

  return eventSource;
}

Python

import json
import requests
import sseclient

def ask_question(question: str):
    response = requests.post(
        'http://localhost:8000/api/ask',
        json={'question': question},
        stream=True
    )
    
    client = sseclient.SSEClient(response)
    full_response = ""
    citations = []
    
    for event in client.events():
        data = json.loads(event.data)
        
        if data['type'] == 'token':
            full_response += data['content']
            print(data['content'], end='', flush=True)
            
        elif data['type'] == 'citation':
            citations.append(data['citation'])
            print(f"\n[{data['citation']['id']}] {data['citation']['permalink']}")
            
        elif data['type'] == 'done':
            print(f"\n\nTokens: {data['tokens_used']}, Citations: {data['citations_count']}")
            break
            
        elif data['type'] == 'error':
            print(f"\nError: {data['error']}")
            break
    
    return full_response, citations

curl

curl -N http://localhost:8000/api/ask \
  -H "Content-Type: application/json" \
  -d '{"question": "What is the architecture?"}' \
  | while read -r line; do
      if [[ $line == data:* ]]; then
        echo "${line#data: }" | jq -r '.select(.type == "token").content' | tr -d '"'
      fi
    done

Error Handling

Connection Errors

  • Timeout: Increase client timeout or reduce query complexity
  • 401 Unauthorized: Check authentication credentials
  • 429 Rate Limit: Implement exponential backoff

Stream Errors

{
  "type": "error",
  "error": "Rate limit exceeded",
  "code": "RATE_LIMIT_EXCEEDED",
  "retry_after": 60
}

Retry Logic

async function askWithRetry(question, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await askQuestion(question);
    } catch (error) {
      if (error.code === 'RATE_LIMIT_EXCEEDED' && i < maxRetries - 1) {
        await sleep(error.retry_after * 1000);
        continue;
      }
      throw error;
    }
  }
}

Best Practices

  1. Buffer Tokens: Collect tokens before rendering to avoid partial words
  2. Handle Citations: Display citations as footnotes or inline references
  3. Track Sessions: Use session IDs for follow-up questions
  4. Error Recovery: Implement reconnection logic for network failures
  5. Rate Limiting: Respect rate limits and implement backoff
  6. Close Connections: Always close connections when done

Session Management

The session_id from the done event enables conversational follow-ups:

// First question
const { session_id } = await askQuestion("What is JWT?");

// Follow-up (uses context from first question)
await askQuestion("How do we refresh tokens?", session_id);

Sessions maintain conversation context for 30 minutes of inactivity.

How is this guide?

On this page

Ready for production?

Ship to production with SSO, audit logs, spend controls, and guardrails your security team will approve.

Talk to the team

or email hello@beever.ai