LLM Observability Pipeline with Streaming SQL

LLM Observability Pipeline with Streaming SQL

Your LLM costs tripled last month and you have no idea why. A model started returning truncated responses two days ago but no alert fired. A specific user is hammering your API with 10,000-token prompts and hitting your rate limits, degrading service for everyone else. Sound familiar?

Teams ship LLM-powered features fast, but observability is usually an afterthought. Most setups log API responses to a file or a slow OLAP database, then query them hours later. By then, cost spikes have already compounded, degraded model behavior has already reached users, and the root cause is buried in millions of log rows.

The fix is not better dashboards. The fix is a real-time LLM observability pipeline that processes API logs the moment they arrive, maintains always-current materialized views for token usage, latency percentiles, error rates, and model quality signals, and makes that data queryable in milliseconds.

In this article, you will build exactly that using RisingWave as the streaming SQL engine. You will ingest LLM API response logs from Kafka, create materialized views for every major observability dimension, and see how to wire the results to a live dashboard. All SQL is verified against RisingWave 2.8.0.

Why Batch Logging Is Not Enough for LLM Observability

A typical LLM observability setup looks like this: application servers write API response logs to an S3 bucket or a Kafka topic, a nightly ETL job loads them into a data warehouse, and analysts query the warehouse the next morning. This works for business reporting, but it fails completely for operational monitoring.

Consider what happens in the hours before the ETL job runs:

  • Cost spikes go undetected. A bug in a prompt template adds 2,000 tokens to every request. By the time your daily cost report runs, you have wasted thousands of dollars.
  • Model degradation is invisible. A provider quietly changes model behavior. Your truncation rate climbs from 2% to 18%. Users start noticing shorter, worse responses. No alert fires.
  • Quota exhaustion surprises you. One user starts running automated benchmarks against your API. Rate limit errors spike. Other users experience degraded service. You find out from a Slack message, not a dashboard.
  • Latency regressions compound. A provider infrastructure issue pushes P99 latency from 2 seconds to 12 seconds. Users abandon requests. Your retry logic amplifies the load. The batch job shows the problem at 9 AM tomorrow.

Streaming SQL changes the model. Instead of querying cold logs, you define materialized views that compute exactly the metrics you need, incrementally, as each log event arrives. When costs spike, the llmobs_cost_per_user view updates within seconds. When error rates climb, the llmobs_error_rates view reflects it immediately.

Architecture: Kafka to RisingWave to Dashboards

The pipeline has three layers:

LLM API (OpenAI / Anthropic / etc.)
       |
       v
Application Server (logs API responses)
       |
       v
Kafka Topic: llm-api-logs
       |
       v
RisingWave (streaming SQL, materialized views)
       |
    +--+--+
    |     |
    v     v
Grafana  Kafka Sink (alerts, downstream consumers)

Each LLM API call generates one log event containing: request ID, user ID, model name, token counts, latency, status, error code, finish reason, and cost. These events flow into Kafka and then into RisingWave via a Kafka source.

RisingWave maintains six materialized views, one per observability dimension:

  1. Token usage per user per model (rolling 1-hour window)
  2. Latency percentiles per model (rolling 5-minute window)
  3. Error rates and error type breakdown per model
  4. Cost per model with per-token pricing
  5. Cost per user for budget enforcement
  6. Model quality drift via finish-reason distribution

Let us build each one.

Step 1: Create the Kafka Source

In production, your application server publishes one JSON event per LLM API call to a Kafka topic. RisingWave ingests it with CREATE SOURCE:

CREATE SOURCE llmobs_api_logs_src (
    request_id        VARCHAR,
    user_id           VARCHAR,
    model             VARCHAR,
    prompt_tokens     INT,
    completion_tokens INT,
    total_tokens      INT,
    latency_ms        INT,
    status            VARCHAR,        -- 'success' or 'error'
    error_code        VARCHAR,        -- 'rate_limit_error', 'timeout', 'context_overflow', NULL
    finish_reason     VARCHAR,        -- 'stop', 'length', 'content_filter', NULL
    temperature       FLOAT,
    cost_usd          FLOAT,
    logged_at         TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'llm-api-logs',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

For development and testing without Kafka, replace CREATE SOURCE with CREATE TABLE and insert data directly. All the materialized view SQL below works identically either way. Here is the table-based equivalent used in the examples that follow:

CREATE TABLE llmobs_api_logs (
    request_id        VARCHAR,
    user_id           VARCHAR,
    model             VARCHAR,
    prompt_tokens     INT,
    completion_tokens INT,
    total_tokens      INT,
    latency_ms        INT,
    status            VARCHAR,
    error_code        VARCHAR,
    finish_reason     VARCHAR,
    temperature       FLOAT,
    cost_usd          FLOAT,
    logged_at         TIMESTAMPTZ
);

Step 2: Track Token Usage Per User and Model

Token usage is your primary cost driver and your first quota signal. You want to know, for each user and model, how many tokens are being consumed per hour: how many are prompt tokens (which you control through system prompt design), how many are completion tokens (which reflect model verbosity), and what it all costs.

CREATE MATERIALIZED VIEW llmobs_token_usage_per_user AS
SELECT
    user_id,
    model,
    COUNT(*)                             AS request_count,
    SUM(prompt_tokens)                   AS total_prompt_tokens,
    SUM(completion_tokens)               AS total_completion_tokens,
    SUM(total_tokens)                    AS total_tokens,
    ROUND(AVG(total_tokens)::NUMERIC, 1) AS avg_tokens_per_request,
    ROUND(SUM(cost_usd)::NUMERIC, 4)     AS total_cost_usd,
    window_start,
    window_end
FROM TUMBLE(llmobs_api_logs, logged_at, INTERVAL '1 hour')
GROUP BY user_id, model, window_start, window_end;

This uses a tumbling window to bucket events into non-overlapping 1-hour intervals. RisingWave maintains this view incrementally: as each new log event arrives, only the affected bucket is updated, not the entire table.

Querying the view after inserting sample data:

SELECT user_id, model, request_count,
       total_prompt_tokens, total_completion_tokens,
       total_tokens, avg_tokens_per_request, total_cost_usd
FROM llmobs_token_usage_per_user
ORDER BY total_cost_usd DESC;
  user_id   |     model     | request_count | total_prompt_tokens | total_completion_tokens | total_tokens | avg_tokens_per_request | total_cost_usd
------------+---------------+---------------+---------------------+-------------------------+--------------+------------------------+----------------
 user_alice | gpt-4o        |             4 |                1920 |                     480 |         2400 |                    600 |         0.0660
 user_carol | gpt-4o        |             2 |                1350 |                     337 |         1687 |                  843.5 |         0.0330
 user_bob   | gpt-3.5-turbo |             4 |                1050 |                     265 |         1315 |                  328.8 |         0.0020
 user_carol | gpt-4o-mini   |             2 |                 900 |                     225 |         1125 |                  562.5 |         0.0000
 user_dave  | gpt-4o-mini   |             2 |                 380 |                      95 |          475 |                  237.5 |         0.0000
(5 rows)

user_alice spent $0.066 in one hour using gpt-4o with an average of 600 tokens per request. user_carol is averaging 843 tokens per request against gpt-4o, a signal worth investigating: long prompts often mean bloated system prompts or unbounded context accumulation.

Step 3: Compute Latency Percentiles Per Model

Average latency lies. A model that responds in 400 ms on average might have a P99 of 12 seconds, meaning 1 in 100 users waits 30 times longer than everyone else. Percentile monitoring exposes what averages hide.

CREATE MATERIALIZED VIEW llmobs_latency_percentiles AS
SELECT
    model,
    COUNT(*)                                                     AS request_count,
    MIN(latency_ms)                                              AS p0_ms,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY latency_ms)    AS p50_ms,
    PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY latency_ms)    AS p90_ms,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY latency_ms)    AS p95_ms,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY latency_ms)    AS p99_ms,
    MAX(latency_ms)                                              AS p100_ms,
    window_start,
    window_end
FROM TUMBLE(llmobs_api_logs, logged_at, INTERVAL '5 minutes')
WHERE status = 'success'
GROUP BY model, window_start, window_end;

The 5-minute tumbling window gives you fine-grained latency snapshots. Filtering to status = 'success' keeps error timeouts from inflating the percentiles, which is important because a 30-second timeout error is not the same signal as a slow successful response.

Querying the view:

SELECT model, request_count, p0_ms, p50_ms, p90_ms, p95_ms, p99_ms, p100_ms
FROM llmobs_latency_percentiles
ORDER BY p95_ms DESC;
     model     | request_count | p0_ms | p50_ms | p90_ms | p95_ms | p99_ms | p100_ms
---------------+---------------+-------+--------+--------+--------+--------+---------
 gpt-4o        |             3 |   720 |   1850 |   2050 |   2075 |   2095 |    2100
 gpt-4o        |             1 |   850 |    850 |    850 |    850 |    850 |     850
 gpt-4o-mini   |             2 |   330 |    375 |    411 |  415.5 |  419.1 |     420
 gpt-4o-mini   |             1 |   380 |    380 |    380 |    380 |    380 |     380
 gpt-3.5-turbo |             2 |   290 |    305 |    317 |  318.5 |  319.7 |     320
 gpt-3.5-turbo |             1 |   310 |    310 |    310 |    310 |    310 |     310
(6 rows)

gpt-4o shows a P95 of 2075 ms. gpt-3.5-turbo and gpt-4o-mini are both under 350 ms P95. If your SLA requires responses under 2 seconds, this view tells you immediately when gpt-4o starts pushing past that limit.

Step 4: Monitor Error Rates and Error Types

Error rate monitoring tells you two things: whether your integration is healthy and whether your users are hitting provider limits. Breaking errors down by type adds the operational context to act quickly.

CREATE MATERIALIZED VIEW llmobs_error_rates AS
SELECT
    model,
    COUNT(*)                                                          AS total_requests,
    COUNT(*) FILTER (WHERE status = 'error')                          AS error_count,
    COUNT(*) FILTER (WHERE status = 'success')                        AS success_count,
    ROUND(
        (COUNT(*) FILTER (WHERE status = 'error') * 100.0 / COUNT(*))::NUMERIC,
        2
    )                                                                 AS error_rate_pct,
    COUNT(*) FILTER (WHERE error_code = 'rate_limit_error')           AS rate_limit_errors,
    COUNT(*) FILTER (WHERE error_code = 'timeout')                    AS timeout_errors,
    COUNT(*) FILTER (WHERE error_code = 'context_overflow')           AS context_overflow_errors,
    window_start,
    window_end
FROM TUMBLE(llmobs_api_logs, logged_at, INTERVAL '5 minutes')
GROUP BY model, window_start, window_end;

Querying the view:

SELECT model, total_requests, error_count, success_count,
       error_rate_pct, rate_limit_errors, timeout_errors, context_overflow_errors
FROM llmobs_error_rates
ORDER BY error_rate_pct DESC;
     model     | total_requests | error_count | success_count | error_rate_pct | rate_limit_errors | timeout_errors | context_overflow_errors
---------------+----------------+-------------+---------------+----------------+-------------------+----------------+-------------------------
 gpt-4o        |              4 |           1 |             3 |          25.00 |                 1 |              0 |                       0
 gpt-3.5-turbo |              4 |           1 |             3 |          25.00 |                 0 |              1 |                       0
 gpt-4o-mini   |              4 |           0 |             4 |           0.00 |                 0 |              0 |                       0
(3 rows)

A 25% error rate on gpt-4o with one rate_limit_error tells you that one or more users are hitting per-minute token limits. A timeout error on gpt-3.5-turbo often means a network path issue or a provider-side problem. A context_overflow error means a request exceeded the model's context window, which points to a bug in your prompt construction logic.

Each error type maps to a different remediation: rate limiting on your side, a retry with backoff, or a context truncation strategy.

Step 5: Cost Monitoring with Per-Token Breakdown

Cost visibility is the most requested LLM observability feature. This materialized view tracks actual spend per model per hour, with a per-1,000-token cost that lets you compare model efficiency directly.

CREATE MATERIALIZED VIEW llmobs_cost_per_model AS
SELECT
    model,
    COUNT(*)                                                          AS request_count,
    SUM(total_tokens)                                                 AS total_tokens,
    ROUND(SUM(cost_usd)::NUMERIC, 4)                                  AS total_cost_usd,
    ROUND(AVG(cost_usd)::NUMERIC, 6)                                  AS avg_cost_per_request,
    ROUND(
        (SUM(cost_usd) * 1000 / NULLIF(SUM(total_tokens), 0))::NUMERIC,
        6
    )                                                                 AS cost_per_1k_tokens,
    window_start,
    window_end
FROM TUMBLE(llmobs_api_logs, logged_at, INTERVAL '1 hour')
WHERE status = 'success'
GROUP BY model, window_start, window_end;
SELECT model, request_count, total_tokens, total_cost_usd,
       avg_cost_per_request, cost_per_1k_tokens
FROM llmobs_cost_per_model
ORDER BY total_cost_usd DESC;
     model     | request_count | total_tokens | total_cost_usd | avg_cost_per_request | cost_per_1k_tokens
---------------+---------------+--------------+----------------+----------------------+--------------------
 gpt-4o        |             4 |         3365 |         0.0990 |             0.024750 |           0.029421
 gpt-3.5-turbo |             3 |         1125 |         0.0020 |             0.000667 |           0.001778
 gpt-4o-mini   |             4 |         1600 |         0.0000 |             0.000000 |           0.000000
(3 rows)

gpt-4o is costing $0.029 per 1,000 tokens versus $0.0018 for gpt-3.5-turbo. If a feature can use the cheaper model without degrading quality, the cost difference is 16x. This view makes that trade-off visible continuously, not in a monthly invoice.

For per-user budget enforcement, a companion view tracks individual spend:

CREATE MATERIALIZED VIEW llmobs_cost_per_user AS
SELECT
    user_id,
    COUNT(*)                             AS request_count,
    SUM(total_tokens)                    AS total_tokens,
    ROUND(SUM(cost_usd)::NUMERIC, 4)     AS total_cost_usd,
    ROUND(AVG(cost_usd)::NUMERIC, 6)     AS avg_cost_per_request,
    window_start,
    window_end
FROM TUMBLE(llmobs_api_logs, logged_at, INTERVAL '1 hour')
WHERE status = 'success'
GROUP BY user_id, window_start, window_end;
SELECT user_id, request_count, total_tokens, total_cost_usd, avg_cost_per_request
FROM llmobs_cost_per_user
ORDER BY total_cost_usd DESC;
  user_id   | request_count | total_tokens | total_cost_usd | avg_cost_per_request
------------+---------------+--------------+----------------+----------------------
 user_alice |             3 |         2240 |         0.0660 |             0.022000
 user_carol |             3 |         2250 |         0.0330 |             0.011000
 user_bob   |             3 |         1125 |         0.0020 |             0.000667
 user_dave  |             2 |          475 |         0.0000 |             0.000000
(4 rows)

user_alice has spent $0.066 in one hour. If your per-user hourly budget is $0.10, you have time to send a soft warning before hitting the limit. If your budget is $0.05, the limit has already been exceeded and you can use this view to trigger a rate limiter in real time.

Step 6: Detect Model Quality Drift

Token usage, latency, and error rates tell you whether your pipeline is functioning. Model quality drift tells you whether your model is still delivering good outputs. The most reliable quality signal available in raw API logs is finish_reason.

A response with finish_reason = 'stop' means the model reached a natural conclusion. A response with finish_reason = 'length' means the model ran out of token budget and was truncated mid-output. A rising truncation rate almost always means your context is growing unbounded, your max token limit is too tight for the use case, or a provider has quietly changed how a model generates text.

CREATE MATERIALIZED VIEW llmobs_model_quality_drift AS
SELECT
    model,
    COUNT(*)                                                              AS total_requests,
    COUNT(*) FILTER (WHERE finish_reason = 'stop')                        AS natural_stops,
    COUNT(*) FILTER (WHERE finish_reason = 'length')                      AS truncated,
    COUNT(*) FILTER (WHERE finish_reason = 'content_filter')              AS filtered,
    ROUND(
        (COUNT(*) FILTER (WHERE finish_reason = 'stop') * 100.0
         / NULLIF(COUNT(*) FILTER (WHERE status = 'success'), 0))::NUMERIC,
        2
    )                                                                     AS completion_rate_pct,
    ROUND(
        (COUNT(*) FILTER (WHERE finish_reason = 'length') * 100.0
         / NULLIF(COUNT(*) FILTER (WHERE status = 'success'), 0))::NUMERIC,
        2
    )                                                                     AS truncation_rate_pct,
    ROUND(
        AVG(completion_tokens) FILTER (WHERE status = 'success')::NUMERIC,
        1
    )                                                                     AS avg_completion_tokens,
    window_start,
    window_end
FROM TUMBLE(llmobs_api_logs, logged_at, INTERVAL '1 hour')
GROUP BY model, window_start, window_end;
SELECT model, total_requests, natural_stops, truncated, filtered,
       completion_rate_pct, truncation_rate_pct, avg_completion_tokens
FROM llmobs_model_quality_drift
ORDER BY truncation_rate_pct DESC;
     model     | total_requests | natural_stops | truncated | filtered | completion_rate_pct | truncation_rate_pct | avg_completion_tokens
---------------+----------------+---------------+-----------+----------+---------------------+---------------------+-----------------------
 gpt-4o        |              6 |             3 |         1 |        0 |               75.00 |               25.00 |                 168.3
 gpt-3.5-turbo |              4 |             3 |         0 |        0 |              100.00 |                0.00 |                  75.0
 gpt-4o-mini   |              4 |             4 |         0 |        0 |              100.00 |                0.00 |                  80.0
(3 rows)

gpt-4o has a 25% truncation rate. One in four successful requests was cut off before the model finished its response. gpt-3.5-turbo and gpt-4o-mini have 0% truncation. This is a strong signal to investigate the token budget configuration for gpt-4o workloads, or to look at which specific requests are being truncated using the llmobs_api_logs source directly.

Step 7: Sink to Dashboards and Alerts

RisingWave can sink materialized view data to Grafana (via PostgreSQL data source), Kafka topics, or any BI tool that supports PostgreSQL. Because RisingWave is wire-compatible with PostgreSQL, connecting Grafana requires no special plugin: use the standard PostgreSQL data source, point it at localhost:4566, and query your materialized views directly.

For real-time cost alerts, sink the llmobs_cost_per_user view to a Kafka topic:

CREATE SINK llmobs_cost_alerts_sink
FROM llmobs_cost_per_user
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka:9092',
    topic = 'llm-cost-alerts',
    type = 'upsert',
    primary_key = 'user_id,window_start'
)
FORMAT UPSERT ENCODE JSON;

A downstream consumer reads from llm-cost-alerts and sends a Slack notification or calls a rate-limiting API when any user's total_cost_usd exceeds your threshold. The materialized view updates the moment a new log event arrives, so the consumer reacts within seconds, not minutes.

For a deeper dive on routing streaming alerts to notification channels, see Building a Real-Time Alerting System on Streaming Data.

Comparing Batch vs. Streaming LLM Observability

DimensionBatch Logging (Warehouse)Streaming SQL (RisingWave)
Data freshnessHours to daysSeconds
Cost alert latencyNext ETL runUnder 10 seconds
Latency percentile accuracyApproximate (sampled)Exact (all events)
Error rate detectionHourly or dailySub-minute
Truncation rate driftVisible next dayVisible as it happens
Per-user budget enforcementPost-hoc billingReal-time throttling
Query complexityFull SQL on cold storageSQL on pre-computed views
InfrastructureETL + warehouse + BI toolRisingWave + Grafana

The streaming approach adds marginal operational overhead (one additional service: RisingWave), but it fundamentally changes what you can do with the data. Budget enforcement becomes possible. Model quality drift becomes catchable. Cost spikes become stoppable.

What Is an LLM Observability Pipeline?

An LLM observability pipeline is a data infrastructure system that collects, processes, and analyzes telemetry from large language model API calls in real time. It captures raw signal from each API response, including token counts, latency, error codes, and output quality indicators like finish_reason, and transforms that raw signal into operational metrics that support cost management, performance monitoring, and model quality tracking. A streaming SQL implementation uses a system like RisingWave to process log events continuously and maintain always-current metric views, rather than processing logs in scheduled batch jobs that introduce hours of latency between an event happening and a team knowing about it.

How Do You Track LLM Costs in Real Time?

To track LLM costs in real time, capture the cost_usd field from each API response (or compute it from token counts using published pricing tables) and ingest those events into a streaming database. Create a materialized view that aggregates cost by user, model, and time window using a tumbling window function. Because the materialized view updates incrementally with each new event, querying it always returns the current hourly spend for each user and model, with no need to run a batch job. You can set a Kafka sink on the view to trigger downstream alerts the moment any user or model crosses a cost threshold.

What Is Model Quality Drift in LLM Monitoring?

Model quality drift refers to a change in the distribution of output quality signals from an LLM over time. Unlike traditional ML models where drift is typically measured against labeled ground truth, LLM quality drift is often detected through proxy signals available in raw API logs: finish_reason distribution (a rising length rate signals truncation), completion_tokens average (a sudden drop means shorter outputs), and error rate by type. Provider model updates, changes to context window behavior, or bugs in your prompt construction can all cause drift. Streaming SQL materialized views are particularly effective for detecting drift because they compute these distributions continuously, making it possible to see a shift in the truncation_rate_pct within minutes of it starting rather than discovering it in a weekly report.

Can RisingWave Connect Directly to Grafana for LLM Dashboards?

Yes. RisingWave is wire-compatible with PostgreSQL, which means Grafana's standard PostgreSQL data source connects to it without any modification or special plugin. Configure the Grafana PostgreSQL data source to point at your RisingWave instance (default port 4566), and then write Grafana panel queries against your materialized views directly. Because the materialized views are continuously updated, Grafana auto-refresh panels will always show current data. For detailed setup instructions, see the RisingWave documentation on monitoring and observability.

Conclusion

LLM observability is not a reporting problem. It is a real-time operational problem. The gap between a cost spike happening and you knowing about it should be measured in seconds, not hours.

The core takeaways from this pipeline:

  • Ingest LLM API logs from Kafka into RisingWave using CREATE SOURCE and a straightforward schema that captures token counts, latency, status, and cost fields.
  • Token usage materialized views give you per-user, per-model consumption in rolling windows, making budget enforcement actionable rather than retrospective.
  • Latency percentile views using PERCENTILE_CONT expose P95 and P99 behavior that averages hide, letting you catch provider regressions before users escalate.
  • Error rate views with per-error-type breakdowns differentiate rate limit exhaustion, timeouts, and context overflows, each requiring a different response.
  • Cost views per model and per user make the economics of your LLM usage visible in real time, enabling routing decisions and per-user throttling.
  • Model quality drift views track finish_reason distributions to catch truncation rate increases, output length shifts, and content filter spikes as they develop.

The full pipeline runs in standard SQL against RisingWave, connects to Grafana through the PostgreSQL wire protocol, and routes alerts downstream via Kafka sinks. You can have it running in under an hour.

For the next step, explore how to use incremental materialized views in RisingWave for other real-time analytics use cases, or read about building streaming SQL pipelines on Kafka without managing a separate stream processing cluster.


Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.