📊 Langfuse Tracing & Robust Ingestion#
TL;DR 🚀 Observe every agent turn safely without blocking execution or hitting body size limits. Langfuse traces are exported via OpenTelemetry (primary) with a REST fallback (backup). Large payloads (long prompt histories, git diffs, file reads) can aggregate to hit the self-hosted Next.js HTTP 413 “Body exceeded 4.5mb limit” block on batch ingestion. We fix this via OTel visibility buffers, payload truncation, and chunked batch flushes. 🔌
🧬 Instrumentation & Fallback Flow#
flowchart TD
subgraph Agent["🤖 Pi Coding Agent Run"]
P[Prompt Start] -->|Turn Start| T[Turn Span]
T -->|Call LLM| G[Generation Observation]
T -->|Call Tool| O[Tool Span]
T -->|Turn End| E[Turn End]
end
subgraph Export["📡 Trace Export & Visibility Fallback"]
E -->|agent_end| FLUSH["forceFlush() OTel Spans"]
FLUSH -->|Wait up to 8s| POLL{"Trace visible in DB?<br/>(POLL_INTERVAL=500ms)"}
POLL -->|🟢 Yes| DONE[OTel Succeeded]
POLL -->|🔴 No| REST[REST Fallback Ingestion]
end
subgraph Fallback["✂️ REST Fallback Processing (pi-langfuse)"]
REST -->|Tier 1| TRUNC["Truncate String values >200KB<br/>(Recursively prune deep objects)"]
TRUNC -->|Tier 2| CHUNK["Slice batch into chunks of 15 items"]
CHUNK -->|Tier 3| SEND["POST sequentially to /api/public/ingestion<br/>(Individual try/catches)"]
end
classDef main fill:#5865F2,stroke:#fff,stroke-width:2px,color:#fff;
classDef helper fill:#2F3136,stroke:#7289DA,stroke-width:1px,color:#fff;
classDef state fill:#43B581,stroke:#fff,stroke-width:1px,color:#fff;
class E,FLUSH main;
class POLL,REST,TRUNC,CHUNK,SEND helper;
class DONE state;🚨 The HTTP 413 Body Size Limit#
Self-hosted Langfuse servers use Next.js behind the hood, enforcing a default bodyParser: { sizeLimit: '4.5mb' } on the ingestion endpoint.
When a long-running agent session is flushed, the REST fallback bundles all observations (system prompt, conversation history, tool calls, and final answers) into a single batch JSON array. This payload easily aggregates to 5MB - 10MB, causing the server to reject the entire batch with statusCode: 413.
/* ❌ Next.js body-parser rejection raw body */
{
"ok": false,
"error": {
"reason": "non-json",
"statusCode": 413,
"rawBody": "Body exceeded 4.5mb limit"
}
}🛡️ The Three-Tier Reliability Fix#
We implement three layers of defensive ingestion inside pi-langfuse’s REST fallback:
1️⃣ OpenTelemetry Visibility Buffer#
Self-hosted Langfuse ingests traces asynchronously (Ingestion queue ➔ Worker ➔ DB). Spans are rarely queryable within ~1.5s after flush.
- Fix: We increase
OTEL_VISIBILITY_TIMEOUT_MSto8_000ms(polling every500ms) so the REST fallback only fires on genuine OTel exporter failures, preventing duplicate trace writes.
2️⃣ Recursive Payload Truncation (safeValue)#
No single trace input, output, or metadata object should choke the network. We recursively traverse payloads and truncate individual strings to a safe threshold.
// 🚀 why: prevents massive string values (like git diffs) from exceeding limits
function truncateString(str: string, limit = 200_000): string {
if (str.length > limit) {
return str.slice(0, limit) + "\n\n... [Truncated by pi-langfuse due to length limits]";
}
return str;
}
// 🚀 what: recursively prunes objects and limits nesting depth to avoid stack overflows
function safeValue(val: unknown, depth = 0): unknown {
if (val === null || val === undefined) return val;
if (typeof val === "string") return truncateString(val);
if (Array.isArray(val)) {
if (depth > 5) return "[Array truncated due to nesting depth]";
return val.map((item) => safeValue(item, depth + 1));
}
if (typeof val === "object") {
if (depth > 5) return "[Object truncated due to nesting depth]";
const cleaned: Record<string, unknown> = {};
for (const key of Object.keys(val)) {
cleaned[key] = safeValue((val as any)[key], depth + 1);
}
return cleaned;
}
return val;
}3️⃣ Chunked REST Ingestion (chunkSize = 15)#
Instead of sending the whole execution history in one request, we partition the batch.
// 🚀 how: slice the payload array and wrap each POST in an independent try/catch block
const chunkSize = 15;
for (let i = 0; i < batch.length; i += chunkSize) {
const chunk = batch.slice(i, i + chunkSize);
try {
await rt.scoreClient.api.ingestion.batch({
batch: chunk,
metadata: {
source: "pi-langfuse",
fallback: "rest-ingestion",
chunkIndex: Math.floor(i / chunkSize),
totalChunks: Math.ceil(batch.length / chunkSize),
},
});
} catch (e) {
// 💡 non-blocking: one chunk failing doesn't abort the remaining queue flushes
console.warn(`📊 Langfuse: Failed to ingest fallback batch chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(batch.length / chunkSize)}`, e);
}
}💡 Best Practices Checklist#
- ⏳ Deferred Flush: Always defer the final Langfuse shutdown/flush using
setTimeout(shutdownRuntime, 0)onagent_end. Never let telemetry block the main agent execution turn or increase user latency. - 🛡️ PII & Credential Masking: Never log environment arrays or raw configs. Function arguments containing secret keys must be stripped before logging inputs.
- 🏷️ Clean Trace Naming: Choose descriptive names (
code-generation,tool-execution) over generic IDs to ensure UI filtering is actually usable. - 🔗 Session Attribution: Bind
sessionIdon traces using the Discord thread ID. This groups conversational multi-turn histories in the Langfuse Sessions view.