Beever Atlas v0.1 has launched! Star us on GitHub

SSE Streaming Protocol

Overview

The Ask API uses Server-Sent Events (SSE) for streaming responses. This enables real-time delivery of generated tokens and citations as they become available.

Connection Lifecycle

1. Establish Connection

Send a POST request to /api/ask with your question. The server responds with text/event-stream content type and keeps the connection open.

2. Receive Events

The server sends events as they occur:

data: {"type":"token","content":"Hello","message_id":"msg_123"}

data: {"type":"citation","citation":{...}}

data: {"type":"done","message_id":"msg_123",...}

3. Connection Termination

The connection closes when:

  • The server sends a done event
  • An error occurs
  • The client disconnects

Event Types

token

Delivers a chunk of generated text.

{
  "type": "token",
  "content": "Hello",
  "message_id": "msg_123",
  "is_complete": false
}

Fields:

  • content: Text fragment (may be partial word)
  • message_id: Unique message identifier
  • is_complete: Whether this completes the current token

citation

Provides a source reference for information in the response.

{
  "type": "citation",
  "citation": {
    "id": "[1]",
    "author": "Jane Smith",
    "channel": "#engineering",
    "timestamp": "2026-04-13T10:30:00Z",
    "text_excerpt": "We decided to use JWT tokens for authentication...",
    "permalink": "https://slack.com/archives/C12345/p1234567890123456",
    "source_message_id": "1234567890.123456",
    "media_type": "link" | "pdf" | "image" | null,
    "media_name": "design-doc.pdf" | null
  }
}

Fields:

  • id: Citation reference like [1], [2]
  • author: Message author
  • channel: Channel name
  • timestamp: Message timestamp
  • text_excerpt: First 100 characters of source
  • permalink: URL to original message
  • source_message_id: Platform message ID
  • media_type: Type of media (if any)
  • media_name: Name of media file (if any)

done

Signals completion of the response.

{
  "type": "done",
  "message_id": "msg_123",
  "session_id": "sess_456",
  "tokens_used": 1234,
  "citations_count": 3,
  "duration_ms": 2500
}

Fields:

  • message_id: Final message ID
  • session_id: Session ID for follow-ups
  • tokens_used: Total tokens consumed
  • citations_count: Number of citations
  • duration_ms: Response generation time

error

Indicates an error occurred.

{
  "type": "error",
  "error": "Error description",
  "code": "RATE_LIMIT_EXCEEDED" | "INVALID_REQUEST" | "INTERNAL_ERROR"
}

Client Implementation

JavaScript (Browser)

async function askQuestion(question) {
  const response = await fetch('http://localhost:8000/api/ask', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ question })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
  let fullResponse = '';
  const citations = [];

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      
      try {
        const event = JSON.parse(line.slice(6));
        
        switch (event.type) {
          case 'token':
            fullResponse += event.content;
            onText(event.content);
            break;
          case 'citation':
            citations.push(event.citation);
            onCitation(event.citation);
            break;
          case 'done':
            onComplete({ 
              response: fullResponse, 
              citations,
              metrics: event 
            });
            break;
          case 'error':
            onError(event.error);
            break;
        }
      } catch (e) {
        console.error('Parse error:', e);
      }
    }
  }
}

function onText(text) {
  process.stdout.write(text);
}

function onCitation(citation) {
  console.log(`\n[${citation.id}] ${citation.author} - ${citation.timestamp}`);
}

function onComplete(result) {
  console.log('\n\nComplete:', result.metrics);
}

JavaScript (Node.js with EventSource)

const EventSource = require('eventsource');

function askQuestion(question) {
  const eventSource = new EventSource(
    `http://localhost:8000/api/ask?question=${encodeURIComponent(question)}`
  );

  eventSource.addEventListener('token', (e) => {
    const { content } = JSON.parse(e.data);
    process.stdout.write(content);
  });

  eventSource.addEventListener('citation', (e) => {
    const citation = JSON.parse(e.data);
    console.log(`\n[Citation] ${citation.id}: ${citation.permalink}`);
  });

  eventSource.addEventListener('done', (e) => {
    const metrics = JSON.parse(e.data);
    console.log('\n[Done]', metrics);
    eventSource.close();
  });

  eventSource.addEventListener('error', (e) => {
    console.error('[Error]', e);
    eventSource.close();
  });

  return eventSource;
}

Python

import json
import requests
import sseclient

def ask_question(question: str):
    response = requests.post(
        'http://localhost:8000/api/ask',
        json={'question': question},
        stream=True
    )
    
    client = sseclient.SSEClient(response)
    full_response = ""
    citations = []
    
    for event in client.events():
        data = json.loads(event.data)
        
        if data['type'] == 'token':
            full_response += data['content']
            print(data['content'], end='', flush=True)
            
        elif data['type'] == 'citation':
            citations.append(data['citation'])
            print(f"\n[{data['citation']['id']}] {data['citation']['permalink']}")
            
        elif data['type'] == 'done':
            print(f"\n\nTokens: {data['tokens_used']}, Citations: {data['citations_count']}")
            break
            
        elif data['type'] == 'error':
            print(f"\nError: {data['error']}")
            break
    
    return full_response, citations

curl

curl -N http://localhost:8000/api/ask \
  -H "Content-Type: application/json" \
  -d '{"question": "What is the architecture?"}' \
  | while read -r line; do
      if [[ $line == data:* ]]; then
        echo "${line#data: }" | jq -r '.select(.type == "token").content' | tr -d '"'
      fi
    done

Error Handling

Connection Errors

  • Timeout: Increase client timeout or reduce query complexity
  • 401 Unauthorized: Check authentication credentials
  • 429 Rate Limit: Implement exponential backoff

Stream Errors

{
  "type": "error",
  "error": "Rate limit exceeded",
  "code": "RATE_LIMIT_EXCEEDED",
  "retry_after": 60
}

Retry Logic

async function askWithRetry(question, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await askQuestion(question);
    } catch (error) {
      if (error.code === 'RATE_LIMIT_EXCEEDED' && i < maxRetries - 1) {
        await sleep(error.retry_after * 1000);
        continue;
      }
      throw error;
    }
  }
}

Best Practices

  1. Buffer Tokens: Collect tokens before rendering to avoid partial words
  2. Handle Citations: Display citations as footnotes or inline references
  3. Track Sessions: Use session IDs for follow-up questions
  4. Error Recovery: Implement reconnection logic for network failures
  5. Rate Limiting: Respect rate limits and implement backoff
  6. Close Connections: Always close connections when done

Session Management

The session_id from the done event enables conversational follow-ups:

// First question
const { session_id } = await askQuestion("What is JWT?");

// Follow-up (uses context from first question)
await askQuestion("How do we refresh tokens?", session_id);

Sessions maintain conversation context for 30 minutes of inactivity.

On this page