📊 Langfuse Tracing & Robust Ingestion#

TL;DR 🚀 Observe every agent turn safely without blocking execution or hitting body size limits. Langfuse traces are exported via OpenTelemetry (primary) with a REST fallback (backup). Large payloads (long prompt histories, git diffs, file reads) can aggregate to hit the self-hosted Next.js HTTP 413 “Body exceeded 4.5mb limit” block on batch ingestion. We fix this via OTel visibility buffers, payload truncation, and chunked batch flushes. 🔌


🧬 Instrumentation & Fallback Flow#

flowchart TD
  subgraph Agent["🤖 Pi Coding Agent Run"]
    P[Prompt Start] -->|Turn Start| T[Turn Span]
    T -->|Call LLM| G[Generation Observation]
    T -->|Call Tool| O[Tool Span]
    T -->|Turn End| E[Turn End]
  end

  subgraph Export["📡 Trace Export & Visibility Fallback"]
    E -->|agent_end| FLUSH["forceFlush() OTel Spans"]
    FLUSH -->|Wait up to 8s| POLL{"Trace visible in DB?<br/>(POLL_INTERVAL=500ms)"}
    
    POLL -->|🟢 Yes| DONE[OTel Succeeded]
    POLL -->|🔴 No| REST[REST Fallback Ingestion]
  end

  subgraph Fallback["✂️ REST Fallback Processing (pi-langfuse)"]
    REST -->|Tier 1| TRUNC["Truncate String values >200KB<br/>(Recursively prune deep objects)"]
    TRUNC -->|Tier 2| CHUNK["Slice batch into chunks of 15 items"]
    CHUNK -->|Tier 3| SEND["POST sequentially to /api/public/ingestion<br/>(Individual try/catches)"]
  end

  classDef main fill:#5865F2,stroke:#fff,stroke-width:2px,color:#fff;
  classDef helper fill:#2F3136,stroke:#7289DA,stroke-width:1px,color:#fff;
  classDef state fill:#43B581,stroke:#fff,stroke-width:1px,color:#fff;
  class E,FLUSH main;
  class POLL,REST,TRUNC,CHUNK,SEND helper;
  class DONE state;

🚨 The HTTP 413 Body Size Limit#

Self-hosted Langfuse servers use Next.js behind the hood, enforcing a default bodyParser: { sizeLimit: '4.5mb' } on the ingestion endpoint.

When a long-running agent session is flushed, the REST fallback bundles all observations (system prompt, conversation history, tool calls, and final answers) into a single batch JSON array. This payload easily aggregates to 5MB - 10MB, causing the server to reject the entire batch with statusCode: 413.

/*  Next.js body-parser rejection raw body */
{
  "ok": false,
  "error": {
    "reason": "non-json",
    "statusCode": 413,
    "rawBody": "Body exceeded 4.5mb limit"
  }
}

🛡️ The Three-Tier Reliability Fix#

We implement three layers of defensive ingestion inside pi-langfuse’s REST fallback:

1️⃣ OpenTelemetry Visibility Buffer#

Self-hosted Langfuse ingests traces asynchronously (Ingestion queue ➔ Worker ➔ DB). Spans are rarely queryable within ~1.5s after flush.

  • Fix: We increase OTEL_VISIBILITY_TIMEOUT_MS to 8_000ms (polling every 500ms) so the REST fallback only fires on genuine OTel exporter failures, preventing duplicate trace writes.

2️⃣ Recursive Payload Truncation (safeValue)#

No single trace input, output, or metadata object should choke the network. We recursively traverse payloads and truncate individual strings to a safe threshold.

// 🚀 why: prevents massive string values (like git diffs) from exceeding limits
function truncateString(str: string, limit = 200_000): string {
  if (str.length > limit) {
    return str.slice(0, limit) + "\n\n... [Truncated by pi-langfuse due to length limits]";
  }
  return str;
}

// 🚀 what: recursively prunes objects and limits nesting depth to avoid stack overflows
function safeValue(val: unknown, depth = 0): unknown {
  if (val === null || val === undefined) return val;
  if (typeof val === "string") return truncateString(val);
  
  if (Array.isArray(val)) {
    if (depth > 5) return "[Array truncated due to nesting depth]";
    return val.map((item) => safeValue(item, depth + 1));
  }
  if (typeof val === "object") {
    if (depth > 5) return "[Object truncated due to nesting depth]";
    const cleaned: Record<string, unknown> = {};
    for (const key of Object.keys(val)) {
      cleaned[key] = safeValue((val as any)[key], depth + 1);
    }
    return cleaned;
  }
  return val;
}

3️⃣ Chunked REST Ingestion (chunkSize = 15)#

Instead of sending the whole execution history in one request, we partition the batch.

// 🚀 how: slice the payload array and wrap each POST in an independent try/catch block
const chunkSize = 15;
for (let i = 0; i < batch.length; i += chunkSize) {
  const chunk = batch.slice(i, i + chunkSize);
  try {
    await rt.scoreClient.api.ingestion.batch({
      batch: chunk,
      metadata: {
        source: "pi-langfuse",
        fallback: "rest-ingestion",
        chunkIndex: Math.floor(i / chunkSize),
        totalChunks: Math.ceil(batch.length / chunkSize),
      },
    });
  } catch (e) {
    // 💡 non-blocking: one chunk failing doesn't abort the remaining queue flushes
    console.warn(`📊 Langfuse: Failed to ingest fallback batch chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(batch.length / chunkSize)}`, e);
  }
}

💡 Best Practices Checklist#

  • Deferred Flush: Always defer the final Langfuse shutdown/flush using setTimeout(shutdownRuntime, 0) on agent_end. Never let telemetry block the main agent execution turn or increase user latency.
  • 🛡️ PII & Credential Masking: Never log environment arrays or raw configs. Function arguments containing secret keys must be stripped before logging inputs.
  • 🏷️ Clean Trace Naming: Choose descriptive names (code-generation, tool-execution) over generic IDs to ensure UI filtering is actually usable.
  • 🔗 Session Attribution: Bind sessionId on traces using the Discord thread ID. This groups conversational multi-turn histories in the Langfuse Sessions view.