Monitor Your AI Agent Data Pipeline with SQL

Monitor Your AI Agent Data Pipeline with SQL

·

13 min read

{
  "@context": "https://schema.org",
  "@type": "FAQPage",
  "mainEntity": [
    {
      "@type": "Question",
      "name": "What is AI agent data pipeline monitoring?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "AI agent data pipeline monitoring tracks the health of the systems that supply data to AI agents — CDC connectors, Kafka sources, materialized views, and streaming jobs. It is distinct from LLM observability, which tracks model inputs and outputs. Data pipeline monitoring catches problems like Kafka consumer lag, CDC replication delays, and stale materialized views before they cause agents to make wrong decisions."
      }
    },
    {
      "@type": "Question",
      "name": "Which RisingWave system tables are most useful for monitoring?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "The most useful tables are: rw_streaming_jobs (status and parallelism of all streaming jobs), rw_kafka_job_lag (consumer lag per Kafka source), rw_kafka_source_metrics (throughput and error rates), rw_cdc_progress (CDC replication progress per source), and rw_fragment_backfill_progress (backfill status for newly created materialized views)."
      }
    },
    {
      "@type": "Question",
      "name": "Can I monitor everything with SQL alone, or do I need Grafana?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "SQL covers a lot: job status, Kafka lag, CDC progress, and backfill state. But some metrics — backpressure between operators and barrier latency trends — are only available in the Grafana dashboard that ships with RisingWave, or via the meta-node dashboard. For production monitoring, use both: SQL for job-level health checks and Grafana for deep internal metrics."
      }
    },
    {
      "@type": "Question",
      "name": "How does Kafka lag affect AI agent accuracy?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "Kafka lag means RisingWave is behind in processing events from a source topic. Materialized views built on that source will reflect a stale state by approximately the lag amount. If an agent queries those views while lag is high, it reasons about outdated data — which can cause wrong recommendations, stale order statuses, or incorrect risk scores."
      }
    },
    {
      "@type": "Question",
      "name": "How do I build a monitoring materialized view that alerts on lag?",
      "acceptedAnswer": {
        "@type": "Answer",
        "text": "Create a materialized view that joins rw_kafka_job_lag with a threshold check. You can then query this view from an alerting system or a scheduled agent to receive notifications when Kafka consumer lag exceeds an acceptable limit. The view updates continuously as lag changes, so your alerts always reflect current state."
      }
    }
  ]
}

The Gap Nobody Talks About

The AI observability space has exploded. Langfuse traces LLM calls. Arize monitors model performance. Braintrust runs evaluations. These tools do exactly what they promise: they give you visibility into the model layer — what prompts went in, what responses came out, which calls were slow, which answers were wrong.

But here is what none of them monitor: the data pipeline that feeds your agents.

Before an agent decides what to say, it retrieves context from somewhere. That might be a materialized view in a streaming database, a Kafka-backed feature store, or a CDC pipeline that syncs operational data. If any part of that pipeline is unhealthy — if Kafka consumer lag is high, if CDC replication has fallen behind, if a materialized view is stuck in backfill — the agent receives stale data. It makes wrong decisions. And your LLM traces will show perfectly reasonable reasoning applied to a world that no longer exists.

LLM observability tools will not catch this. They see clean inputs and clean outputs. The corruption happened upstream, in the data layer, before the model ever saw it.

This post covers how to monitor your AI agent's data pipeline using SQL queries against RisingWave's system tables. We will be honest about what SQL can tell you and where you need Grafana to go deeper.

What Can Go Wrong in the Data Layer

Before looking at monitoring queries, it helps to understand the failure modes:

CDC lag: Your CDC connector has fallen behind the source database. New rows are being written to PostgreSQL or MySQL faster than they are being replicated to your streaming database. Materialized views built on CDC sources will be stale by approximately the lag duration.

Kafka consumer lag: RisingWave is reading from a Kafka topic but cannot keep up with the producer's write rate. Similar effect as CDC lag — materialized views trail behind real time.

Schema drift: The source database added or changed a column and the change was not reflected in the CDC source definition. The pipeline may error silently or drop the new column entirely.

Backfill delay: A new materialized view is still catching up with historical data. Queries against it will return partial results during backfill, with no obvious warning to the agent.

Job failure: A streaming job crashed, silently stopped processing, or is being recreated after a node failure. Materialized views stop updating.

Each of these failure modes produces agents that make decisions on stale or incomplete data. None of them are visible in LLM traces.

SQL Monitoring Queries

Check streaming job status

The rw_streaming_jobs table shows every active streaming job in the system with its current status and parallelism:

-- Check the status of all streaming jobs
SELECT
    name,
    status,
    parallelism,
    job_id
FROM rw_streaming_jobs
ORDER BY name;

Expected output for a healthy system:

            name             | status  | parallelism | job_id
-----------------------------+---------+-------------+--------
 user_context                | CREATED | ADAPTIVE    |    142
 product_context             | CREATED | ADAPTIVE    |    158
 active_promotions           | CREATED | ADAPTIVE    |    165
 recent_order_status         | CREATED | ADAPTIVE    |    171
(4 rows)

Any job not in CREATED status is worth investigating. A job in CREATING state is still backfilling historical data. If a job does not appear at all, it may have failed silently during a cluster restart.

Check Kafka consumer lag

If your pipeline ingests from Kafka, rw_kafka_job_lag is the most important table to watch:

-- Check Kafka consumer lag per source
SELECT *
FROM rw_kafka_job_lag;

A typical output:

 source_id |   source_name    | topic            | partition | lag  | offset
-----------+------------------+------------------+-----------+------+--------
       103 | inventory_events | inventory.updates |         0 |    0 |  48291
       103 | inventory_events | inventory.updates |         1 |    0 |  47834
       103 | inventory_events | inventory.updates |         2 |   47 |  46102
(3 rows)

The lag column shows how many messages behind the consumer is on each partition. A lag of 0 means the consumer is caught up. A lag of 47 on partition 2 means 47 messages have been produced but not yet consumed.

In isolation, 47 messages might be acceptable or might indicate a problem — it depends on your producer's message rate. If the partition produces 10 messages per second, a lag of 47 means about 5 seconds of staleness. If it produces 1 message per second, that is 47 seconds.

For a cleaner view of which sources have significant lag, filter and aggregate:

-- Sources with non-zero lag, ordered by total lag
SELECT
    source_name,
    SUM(lag)            AS total_lag,
    MAX(lag)            AS max_partition_lag,
    COUNT(*)            AS partition_count
FROM rw_kafka_job_lag
GROUP BY source_name
HAVING SUM(lag) > 0
ORDER BY total_lag DESC;

Check Kafka source metrics

For throughput and error information, rw_kafka_source_metrics provides per-source metrics:

-- Kafka source throughput and error metrics
SELECT *
FROM rw_kafka_source_metrics;

This table surfaces metrics like messages consumed per second and error counts per source. If error counts are increasing, your Kafka pipeline has a connectivity or deserialization problem. Errors here mean events are not reaching your materialized views at all.

Check CDC replication progress

For CDC-sourced pipelines, rw_cdc_progress tracks how far along replication has gotten:

-- Check CDC replication progress per source
SELECT *
FROM rw_cdc_progress;

This table shows the current replication position (log sequence number or binlog position) per CDC source. If the position has not advanced in several minutes, CDC replication has stalled. This can happen due to network issues, source database overload, or replication slot exhaustion on the PostgreSQL side.

Check backfill progress for new materialized views

When you create a new materialized view on top of existing data, RisingWave must first backfill the view with historical records before it begins serving current data. During this period, queries to the view return incomplete results.

-- Check backfill progress for materialized views still catching up
SELECT *
FROM rw_fragment_backfill_progress;

If this table returns rows, one or more views are still backfilling. The output shows which view, which fragment, and how much of the historical data has been processed. An agent should not query a view that is still in backfill without understanding it may receive partial results.

Building a Monitoring Materialized View

Rather than querying system tables manually, you can create a monitoring view that flags unhealthy conditions automatically. This view itself stays continuously updated:

-- Monitoring view: flag Kafka sources with significant lag
CREATE MATERIALIZED VIEW kafka_lag_alerts AS
SELECT
    source_name,
    SUM(lag)            AS total_lag,
    MAX(lag)            AS max_partition_lag,
    NOW()               AS checked_at,
    CASE
        WHEN SUM(lag) > 10000 THEN 'critical'
        WHEN SUM(lag) > 1000  THEN 'warning'
        ELSE 'ok'
    END                 AS lag_status
FROM rw_kafka_job_lag
GROUP BY source_name;

Now query it from any monitoring agent or alerting system:

-- Show only sources in warning or critical state
SELECT source_name, total_lag, max_partition_lag, lag_status, checked_at
FROM kafka_lag_alerts
WHERE lag_status != 'ok'
ORDER BY total_lag DESC;

You can extend this pattern to combine multiple health signals:

-- Unified pipeline health check
CREATE MATERIALIZED VIEW pipeline_health AS
SELECT
    j.name                          AS job_name,
    j.status                        AS job_status,
    COALESCE(kl.total_lag, 0)       AS kafka_lag,
    CASE
        WHEN j.status != 'CREATED'        THEN 'job_unhealthy'
        WHEN COALESCE(kl.total_lag, 0) > 10000 THEN 'lag_critical'
        WHEN COALESCE(kl.total_lag, 0) > 1000  THEN 'lag_warning'
        ELSE 'healthy'
    END                             AS health_status
FROM rw_streaming_jobs j
LEFT JOIN (
    SELECT source_name, SUM(lag) AS total_lag
    FROM rw_kafka_job_lag
    GROUP BY source_name
) kl ON j.name = kl.source_name;

This view gives you a single table to query for an overview of the entire pipeline's health, continuously updated as conditions change.

What Requires Grafana

SQL gives you a lot: job status, Kafka lag per partition, CDC replication position, and backfill progress. But some observability signals are not available as SQL queries. Be honest with yourself about what falls into this category.

Backpressure between operators: When one stage of a streaming pipeline produces data faster than the downstream stage can consume it, backpressure builds up. This internal pressure propagates upstream and slows the entire pipeline. Backpressure is visible in the Grafana dashboard that ships with RisingWave (or the meta-node dashboard at port 5691), but it is not exposed in SQL system tables.

Barrier latency trends: RisingWave uses barriers to maintain consistency across operators. Barrier latency — how long it takes a barrier to flow through the entire pipeline — is a critical freshness metric. You can sample recent barrier events from rw_event_logs, but the trending data and per-fragment breakdown require the Grafana dashboard.

Per-actor CPU and memory usage: Individual parallel workers (actors) may be overloaded even when the overall job looks healthy. Identifying hot actors requires the Grafana metrics view.

If you are running RisingWave in production to feed agent pipelines, set up the Grafana dashboard early. SQL monitoring covers the job-level view; Grafana covers the internal view. You need both.

How Lag Causes Agents to Make Wrong Decisions

Here is a concrete example of how pipeline lag translates into agent failure.

A fraud detection agent evaluates transactions in real time. It queries a materialized view called user_risk_context that summarizes each user's recent transaction patterns — frequency, typical amounts, typical geographies. The source is a Kafka topic that receives all transaction events from the payments system.

On a Tuesday morning, the payments system processes a batch of refunds, generating 50,000 events in 90 seconds. The Kafka producer writes them all. RisingWave's consumer starts processing but falls behind, accumulating 30 seconds of lag on the high-volume partitions.

During those 30 seconds, new transactions continue arriving. The fraud agent evaluates them using user_risk_context — but that view is still processing the refund batch. Users whose risk profiles changed because of the refunds (their recent transaction count went up, their typical geography shifted) are being evaluated on their pre-refund profiles.

A user who made 20 small transactions this morning as part of the refund processing gets flagged as anomalous when they make one more normal purchase, because the agent sees their pre-refund baseline and interprets 21 transactions in one day as suspicious.

The LLM trace looks fine. The agent reasoned correctly. The user_risk_context data was 30 seconds stale at a moment when 30 seconds mattered.

This is why monitoring Kafka lag is not optional for agent pipelines. The query to catch this early:

-- Alert if any Kafka source has lag exceeding 5000 messages
SELECT
    source_name,
    SUM(lag) AS total_lag
FROM rw_kafka_job_lag
GROUP BY source_name
HAVING SUM(lag) > 5000;

If this returns rows, your agents may be making decisions on stale context right now. Investigate immediately.

FAQ

What is AI agent data pipeline monitoring?

AI agent data pipeline monitoring tracks the health of the systems that supply data to AI agents — CDC connectors, Kafka sources, materialized views, and streaming jobs. It is distinct from LLM observability, which tracks model inputs and outputs. Data pipeline monitoring catches problems like Kafka consumer lag, CDC replication delays, and stale materialized views before they cause agents to make wrong decisions.

Which RisingWave system tables are most useful for monitoring?

The most useful tables are: rw_streaming_jobs (status and parallelism of all streaming jobs), rw_kafka_job_lag (consumer lag per Kafka source), rw_kafka_source_metrics (throughput and error rates), rw_cdc_progress (CDC replication progress per source), and rw_fragment_backfill_progress (backfill status for newly created materialized views).

Can I monitor everything with SQL alone, or do I need Grafana?

SQL covers a lot: job status, Kafka lag, CDC progress, and backfill state. But some metrics — backpressure between operators and barrier latency trends — are only available in the Grafana dashboard that ships with RisingWave, or via the meta-node dashboard. For production monitoring, use both: SQL for job-level health checks and Grafana for deep internal metrics.

How does Kafka lag affect AI agent accuracy?

Kafka lag means RisingWave is behind in processing events from a source topic. Materialized views built on that source will reflect a stale state by approximately the lag amount. If an agent queries those views while lag is high, it reasons about outdated data — which can cause wrong recommendations, stale order statuses, or incorrect risk scores.

How do I build a monitoring materialized view that alerts on lag?

Create a materialized view that reads from rw_kafka_job_lag with a threshold check and a CASE expression for severity levels. Query this view from an alerting system or a scheduled monitoring agent. The view updates continuously as lag changes, so alerts always reflect current state.

Conclusion

LLM observability tools track what the model does. They cannot tell you why the model had bad inputs. That explanation lives in the data pipeline — in Kafka lag, CDC delay, stale backfills, and failed streaming jobs.

The gap between LLM observability and data pipeline observability is where a lot of production AI failures originate. An agent that gives a wrong answer because the model reasoned incorrectly shows up in your traces. An agent that gives a wrong answer because it queried a view that was 45 seconds behind does not.

Closing this gap requires monitoring the data layer with the same rigor you apply to the model layer. RisingWave's system tables give you job status, Kafka lag, CDC progress, and backfill state — all queryable with plain SQL through any PostgreSQL client. For the internal metrics that SQL does not surface, the Grafana dashboard fills the gap.

Key takeaways:

  • Monitor rw_streaming_jobs to catch failed or stalled streaming jobs before agents query stale views.
  • Watch rw_kafka_job_lag — it is the clearest leading indicator of data staleness in your agent pipelines.
  • Check rw_cdc_progress when using CDC sources; a stalled position means your operational data is no longer flowing.
  • Query rw_fragment_backfill_progress before routing agents to newly created views.
  • Use Grafana for backpressure and barrier latency — be honest that SQL alone does not cover everything.
  • Build a monitoring materialized view that consolidates health signals so you have one place to check pipeline status.

Ready to try this yourself? Get started at RisingWave Cloud — no credit card required.

Join our Slack community to ask questions and connect with other developers building real-time AI data infrastructure.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.