Log Analysis Pipeline: From ELK Stack to Streaming SQL

Log Analysis Pipeline: From ELK Stack to Streaming SQL

Your ELK stack can tell you what happened. It cannot tell you what is happening right now.

Elasticsearch, Logstash, and Kibana form the industry-standard log analysis pipeline for a reason: flexible full-text search, rich visualizations, and a mature plugin ecosystem. But when production incidents demand real-time answers -- a surge in 500 errors, a spike in p99 latency, an endpoint silently returning wrong status codes -- the batch-oriented ingestion model of ELK introduces delays that cost you time you do not have.

This article shows how to build a log analysis pipeline using streaming SQL with RisingWave that continuously computes error rates, latency percentiles, and anomaly alerts as log events arrive. You will see working SQL, real query output, and a clear picture of where streaming SQL complements or replaces ELK depending on your workload.

The ELK Pipeline Model and Where It Falls Short

The classic ELK pipeline follows three stages. Logstash (or Beats) ships log lines to Elasticsearch, where they are indexed. Kibana queries Elasticsearch for dashboards and alerts. Each stage introduces latency.

Indexing lag is the first constraint. Elasticsearch indexes documents asynchronously. Under heavy ingest load, the time between a log event being emitted and it appearing in search results can stretch from seconds to minutes. For a dashboard refreshing every 30 seconds, this is invisible. For an on-call engineer watching a spike develop, those minutes matter.

Aggregation cost is the second constraint. Kibana dashboards compute aggregations (error counts, percentile latencies, request rates) at query time by scanning Elasticsearch indices. Every dashboard refresh triggers a full aggregation scan over the relevant time window. As your log volume grows, these scans grow proportionally, increasing both latency and cluster load.

Schema rigidity is the third constraint. Elasticsearch mappings define how fields are indexed. Changing a log schema -- adding a new field, changing a type -- requires either a mapping migration or accepting that old and new documents use different field structures. This creates friction when services evolve.

Streaming SQL addresses all three. Instead of indexing documents and querying them later, you define the aggregations upfront as materialized views. The system maintains the results incrementally as each log event arrives. Querying a materialized view returns the pre-computed result instantly, with no aggregation scan overhead. Schema changes are handled by updating the SQL definition.

This does not mean you should delete your ELK cluster. Elasticsearch remains the right tool for ad-hoc full-text log search and forensic investigation. The goal here is to show where streaming SQL fills the real-time gap that ELK's architecture cannot.

Architecture Comparison

Here is how the two approaches differ at the architecture level:

ELK pipeline:
  Log Emitter --> Logstash/Filebeat --> Elasticsearch --> Kibana (aggregates at query time)

Streaming SQL pipeline:
  Log Emitter --> Kafka --> RisingWave --> Materialized Views --> Always-fresh results

The key difference is when aggregation happens. ELK aggregates at read time. RisingWave aggregates at write time, incrementally, maintaining results continuously. For a dashboard showing "error rate per endpoint for the last 5 minutes," RisingWave has the answer computed and waiting. ELK computes it fresh on every dashboard refresh.

In production, a hybrid works well: ship logs to both Kafka and Elasticsearch. RisingWave reads from Kafka for real-time metrics and alerting. Kibana reads from Elasticsearch for forensic search and incident investigation.

graph LR
    A[Services] -->|structured logs| B[Kafka]
    B -->|stream ingestion| C[RisingWave]
    B -->|bulk indexing| D[Elasticsearch]
    C -->|materialized views| E[Error Rate / Latency / Alerts]
    D -->|Kibana| F[Full-text Search / Investigation]
    E -->|sink| G[PagerDuty / Grafana / Slack]

Setting Up the Log Event Stream

In production, you would ingest from a Kafka topic where your log shippers (Fluentd, Logstash, Vector) publish structured JSON log events. For this tutorial, we use a RisingWave table to simulate the stream. The schema maps to common HTTP access log fields that any application or API gateway emits.

CREATE TABLE logs_http_events (
    event_id     BIGINT,
    event_time   TIMESTAMPTZ,
    service      VARCHAR,
    endpoint     VARCHAR,
    method       VARCHAR,
    status_code  INT,
    latency_ms   INT,
    host         VARCHAR,
    region       VARCHAR
);

In a production deployment, you would replace this with a Kafka source pointing to the topic where your log pipeline publishes events:

CREATE SOURCE logs_http_events (
    event_id     BIGINT,
    event_time   TIMESTAMPTZ,
    service      VARCHAR,
    endpoint     VARCHAR,
    method       VARCHAR,
    status_code  INT,
    latency_ms   INT,
    host         VARCHAR,
    region       VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'application.access.logs',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Now let's load a sample dataset representing 9 seconds of traffic across two services, including a healthy auth service and a degraded API gateway with a failing /v1/orders endpoint:

INSERT INTO logs_http_events VALUES
(1,  '2026-04-01 10:00:01+00', 'api-gateway',  '/v1/users',     'GET',    200, 45,  'api.example.com',  'us-east-1'),
(2,  '2026-04-01 10:00:01+00', 'api-gateway',  '/v1/orders',    'POST',   201, 120, 'api.example.com',  'us-east-1'),
(3,  '2026-04-01 10:00:02+00', 'api-gateway',  '/v1/users',     'GET',    200, 52,  'api.example.com',  'us-east-1'),
(4,  '2026-04-01 10:00:02+00', 'auth-service', '/auth/token',   'POST',   200, 89,  'auth.example.com', 'us-east-1'),
(5,  '2026-04-01 10:00:02+00', 'api-gateway',  '/v1/products',  'GET',    200, 34,  'api.example.com',  'us-west-2'),
(6,  '2026-04-01 10:00:03+00', 'api-gateway',  '/v1/orders',    'GET',    500, 2100,'api.example.com',  'us-east-1'),
(7,  '2026-04-01 10:00:03+00', 'auth-service', '/auth/token',   'POST',   401, 30,  'auth.example.com', 'us-east-1'),
(8,  '2026-04-01 10:00:03+00', 'api-gateway',  '/v1/users',     'GET',    200, 48,  'api.example.com',  'us-east-1'),
(9,  '2026-04-01 10:00:04+00', 'api-gateway',  '/v1/orders',    'GET',    500, 3200,'api.example.com',  'us-east-1'),
(10, '2026-04-01 10:00:04+00', 'api-gateway',  '/v1/products',  'GET',    200, 40,  'api.example.com',  'us-west-2'),
(11, '2026-04-01 10:00:04+00', 'auth-service', '/auth/token',   'POST',   200, 91,  'auth.example.com', 'us-east-1'),
(12, '2026-04-01 10:00:05+00', 'api-gateway',  '/v1/users',     'DELETE', 403, 20,  'api.example.com',  'us-east-1'),
(13, '2026-04-01 10:00:05+00', 'api-gateway',  '/v1/orders',    'GET',    500, 2800,'api.example.com',  'us-east-1'),
(14, '2026-04-01 10:00:05+00', 'api-gateway',  '/v1/checkout',  'POST',   200, 220, 'api.example.com',  'us-east-1'),
(15, '2026-04-01 10:00:06+00', 'api-gateway',  '/v1/users',     'GET',    200, 44,  'api.example.com',  'eu-west-1'),
(16, '2026-04-01 10:00:06+00', 'api-gateway',  '/v1/orders',    'GET',    500, 4100,'api.example.com',  'us-east-1'),
(17, '2026-04-01 10:00:06+00', 'auth-service', '/auth/refresh', 'POST',   200, 55,  'auth.example.com', 'us-east-1'),
(18, '2026-04-01 10:00:07+00', 'api-gateway',  '/v1/products',  'GET',    404, 18,  'api.example.com',  'us-west-2'),
(19, '2026-04-01 10:00:07+00', 'api-gateway',  '/v1/checkout',  'POST',   200, 185, 'api.example.com',  'us-east-1'),
(20, '2026-04-01 10:00:07+00', 'api-gateway',  '/v1/orders',    'GET',    200, 78,  'api.example.com',  'us-east-1'),
(21, '2026-04-01 10:00:08+00', 'api-gateway',  '/v1/users',     'GET',    200, 51,  'api.example.com',  'us-east-1'),
(22, '2026-04-01 10:00:08+00', 'auth-service', '/auth/token',   'POST',   200, 83,  'auth.example.com', 'eu-west-1'),
(23, '2026-04-01 10:00:08+00', 'api-gateway',  '/v1/orders',    'POST',   201, 130, 'api.example.com',  'us-east-1'),
(24, '2026-04-01 10:00:09+00', 'api-gateway',  '/v1/products',  'GET',    200, 36,  'api.example.com',  'us-west-2'),
(25, '2026-04-01 10:00:09+00', 'api-gateway',  '/v1/users',     'GET',    200, 49,  'api.example.com',  'us-east-1');

This dataset captures a realistic incident pattern: the /v1/orders endpoint is returning 500 errors with latencies 30-50x above baseline, while the auth service and other endpoints remain healthy. Your job is to detect this before a customer notices.

Computing Request Volume and Error Counts

The first materialized view every log analysis pipeline needs is a request volume breakdown: how many requests per endpoint, how many succeeded, how many failed. In ELK, this is a Kibana aggregation that runs at dashboard refresh time. In RisingWave, it is maintained continuously.

CREATE MATERIALIZED VIEW logs_request_volume AS
SELECT
    service,
    endpoint,
    COUNT(*)                                             AS total_requests,
    COUNT(*) FILTER (WHERE status_code >= 500)          AS error_5xx,
    COUNT(*) FILTER (WHERE status_code >= 400
                      AND status_code < 500)            AS error_4xx,
    COUNT(*) FILTER (WHERE status_code < 400)           AS success_count
FROM logs_http_events
GROUP BY service, endpoint;

Query it now:

SELECT * FROM logs_request_volume ORDER BY total_requests DESC;
   service    |   endpoint    | total_requests | error_5xx | error_4xx | success_count
--------------+---------------+----------------+-----------+-----------+---------------
 api-gateway  | /v1/users     |              7 |         0 |         1 |             6
 api-gateway  | /v1/orders    |              7 |         4 |         0 |             3
 auth-service | /auth/token   |              4 |         0 |         1 |             3
 api-gateway  | /v1/products  |              4 |         0 |         1 |             3
 api-gateway  | /v1/checkout  |              2 |         0 |         0 |             2
 auth-service | /auth/refresh |              1 |         0 |         0 |             1

The problem is immediately visible: /v1/orders has 4 server errors (5xx) out of 7 requests, while every other endpoint shows zero 5xx errors. If you insert a new row into logs_http_events, this view updates within milliseconds -- no refresh, no polling, no scheduled job.

Tracking Error Rates with Materialized Views

Raw error counts tell you something went wrong. Error rate as a percentage tells you how bad it is. The logs_error_rate view adds the percentage calculation and surface latency extremes alongside it:

CREATE MATERIALIZED VIEW logs_error_rate AS
SELECT
    service,
    endpoint,
    COUNT(*)                                              AS total_requests,
    COUNT(*) FILTER (WHERE status_code >= 500)           AS server_errors,
    ROUND(
        COUNT(*) FILTER (WHERE status_code >= 500) * 100.0
        / NULLIF(COUNT(*), 0),
        2
    )                                                     AS error_rate_pct,
    MAX(latency_ms)                                       AS max_latency_ms,
    MIN(latency_ms)                                       AS min_latency_ms,
    ROUND(AVG(latency_ms), 1)                             AS avg_latency_ms
FROM logs_http_events
GROUP BY service, endpoint;
SELECT * FROM logs_error_rate ORDER BY error_rate_pct DESC NULLS LAST;
   service    |   endpoint    | total_requests | server_errors | error_rate_pct | max_latency_ms | min_latency_ms | avg_latency_ms
--------------+---------------+----------------+---------------+----------------+----------------+----------------+----------------
 api-gateway  | /v1/orders    |              7 |             4 |          57.14 |           4100 |             78 |         1789.7
 api-gateway  | /v1/products  |              4 |             0 |              0 |             40 |             18 |             32
 api-gateway  | /v1/checkout  |              2 |             0 |              0 |            220 |            185 |          202.5
 api-gateway  | /v1/users     |              7 |             0 |              0 |             52 |             20 |           44.1
 auth-service | /auth/token   |              4 |             0 |              0 |             91 |             30 |           73.3
 auth-service | /auth/refresh |              1 |             0 |              0 |             55 |             55 |             55

The /v1/orders endpoint stands out on both dimensions: 57.14% error rate and an average latency of 1789.7 ms. The max of 4100 ms confirms the endpoint is timing out on the database layer or a downstream dependency, not returning fast errors. This combination -- high error rate plus high latency on errors -- is the signature of a backend dependency failure.

Notice NULLIF(COUNT(*), 0) in the denominator. This prevents division-by-zero if an endpoint has no requests yet, which happens when a new service starts up and the materialized view initializes before any traffic arrives.

Computing p99 Latency with Percentile Functions

Average latency hides outliers. A /v1/orders endpoint with an average of 100 ms could have 95% of requests completing in 20 ms and 5% timing out at 2000 ms -- users hitting those slow requests have a completely different experience than the average suggests. p99 latency captures what your slowest 1% of users actually experience.

CREATE MATERIALIZED VIEW logs_p99_latency AS
SELECT
    service,
    endpoint,
    COUNT(*)                                                 AS total_requests,
    ROUND(AVG(latency_ms), 1)                                AS avg_latency_ms,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY latency_ms) AS p50_latency_ms,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY latency_ms) AS p95_latency_ms,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99_latency_ms,
    MAX(latency_ms)                                          AS max_latency_ms
FROM logs_http_events
GROUP BY service, endpoint;
SELECT * FROM logs_p99_latency ORDER BY p99_latency_ms DESC;
   service    |   endpoint    | total_requests | avg_latency_ms | p50_latency_ms |   p95_latency_ms   |   p99_latency_ms   | max_latency_ms
--------------+---------------+----------------+----------------+----------------+--------------------+--------------------+----------------
 api-gateway  | /v1/orders    |              7 |         1789.7 |           2100 | 3829.9999999999995 |               4046 |           4100
 api-gateway  | /v1/checkout  |              2 |          202.5 |          202.5 |             218.25 |             219.65 |            220
 auth-service | /auth/token   |              4 |           73.3 |             86 |               90.7 |              90.94 |             91
 auth-service | /auth/refresh |              1 |           55.0 |             55 |                 55 |                 55 |             55
 api-gateway  | /v1/users     |              7 |           44.1 |             48 |               51.7 |              51.94 |             52
 api-gateway  | /v1/products  |              4 |           32.0 |             35 |               39.4 |              39.88 |             40

The /v1/orders endpoint confirms the problem: p50 at 2100 ms means more than half of all requests are slow, and p99 at 4046 ms means the very slowest requests are hitting your configured timeout ceiling. Every other endpoint has p99 under 220 ms. This is not a tail latency problem on /v1/orders -- it is a total failure of the endpoint.

PERCENTILE_CONT computes continuous (interpolated) percentiles. For SRE alerting, this is the right function: it handles the case where you have fewer data points than the percentile resolution requires, interpolating between the nearest values rather than rounding to an existing observation.

Anomaly Detection with Cascading Materialized Views

The real power of streaming SQL for log analysis comes from building alert logic directly on top of other materialized views. You can layer views on top of views, each adding a computation step, without reprocessing the underlying data. RisingWave calls this cascading materialized views.

Here, logs_anomaly_alerts reads from logs_error_rate (which itself reads from logs_http_events) and applies threshold-based severity classification:

CREATE MATERIALIZED VIEW logs_anomaly_alerts AS
SELECT
    service,
    endpoint,
    total_requests,
    server_errors,
    error_rate_pct,
    max_latency_ms,
    avg_latency_ms,
    CASE
        WHEN error_rate_pct >= 50 THEN 'critical'
        WHEN error_rate_pct >= 20 THEN 'warning'
        WHEN avg_latency_ms >= 1000 THEN 'warning'
        ELSE 'healthy'
    END AS alert_level
FROM logs_error_rate
WHERE error_rate_pct >= 20 OR avg_latency_ms >= 1000;
SELECT * FROM logs_anomaly_alerts ORDER BY error_rate_pct DESC;
   service   |  endpoint  | total_requests | server_errors | error_rate_pct | max_latency_ms | avg_latency_ms | alert_level
-------------+------------+----------------+---------------+----------------+----------------+----------------+-------------
 api-gateway | /v1/orders |              7 |             4 |          57.14 |           4100 |         1789.7 | critical

One row. One endpoint. critical. That is the alert you need at 2 AM without any manual investigation or dashboard parsing.

Notice that this view filters out healthy endpoints entirely -- it only includes rows where something is wrong. When you sink this view to Kafka or PagerDuty, you get alerts, not reports. An empty result set means nothing is firing. A new row appearing means a new problem just crossed the threshold.

Why Cascading Views Matter

In ELK, achieving this kind of multi-step computation requires a pipeline of transforms: a Logstash filter to parse raw logs, an Elasticsearch transform to pre-aggregate, and a Kibana alert rule on top. Each layer is a separate system with its own configuration format and operational overhead.

In RisingWave, all three layers are SQL:

  1. logs_http_events -- raw stream (the input)
  2. logs_error_rate -- first-order aggregation (error rate per endpoint)
  3. logs_anomaly_alerts -- second-order classification (apply thresholds from logs_error_rate)

RisingWave maintains the dataflow graph automatically. When a new log event arrives, it propagates through the graph: updating logs_error_rate, which triggers an update to logs_anomaly_alerts if the affected row's metrics crossed a threshold. The entire chain executes in the same processing pipeline, with no polling between layers.

Routing Alerts to External Systems

Detecting an anomaly inside RisingWave is only useful if the right people learn about it. RisingWave sinks push materialized view changes to external systems continuously. Here is how to route critical alerts to a Kafka topic that feeds your PagerDuty or Slack integration:

CREATE SINK logs_alerts_sink FROM logs_anomaly_alerts
WITH (
    connector = 'kafka',
    properties.bootstrap.server = 'kafka:9092',
    topic = 'observability.alerts.critical',
    type = 'upsert',
    primary_key = 'service,endpoint'
);

Every time logs_anomaly_alerts adds a new row or updates an existing one (because an endpoint's error rate changed), the change is published to the Kafka topic. Your alert consumer sees INSERT for a new alert and DELETE when the endpoint recovers. This gives you both firing and resolution events with no extra logic.

For teams that want to push directly to a PostgreSQL-compatible monitoring database or ClickHouse, RisingWave supports those as sink targets too. See the sink connector documentation for the full list.

Comparing ELK and Streaming SQL for Log Analysis

Both approaches solve log analysis, but they optimize for different access patterns. Here is a direct comparison:

CapabilityELK StackRisingWave Streaming SQL
Full-text log searchExcellentNot designed for it
Ad-hoc queries over historyStrongRequires re-materialization
Real-time error rate computationSlow (scan at query time)Instant (pre-computed)
p99 latency dashboardsSlow at scaleAlways-fresh, zero query cost
Threshold alerting latencySeconds to minutesSub-second
Schema flexibilityRequires mapping changesALTER TABLE + view rebuild
Operational complexityHigh (cluster management)Low (serverless option available)
SQL familiarityRequires KQL/LuceneStandard SQL

ELK wins on full-text search and forensic investigation. Streaming SQL wins on real-time metrics and alerting. A production-grade observability stack uses both.

Production Architecture: From Log Emitter to Alert

Here is how the full pipeline looks in production, integrating your existing ELK setup with a streaming SQL layer for real-time metrics:

graph TB
    A[Application Services] -->|structured JSON logs| B[Kafka / Kinesis]
    B -->|Logstash| C[Elasticsearch]
    B -->|RisingWave Kafka source| D[RisingWave]
    C --> E[Kibana dashboards]
    C --> F[Forensic investigation]
    D --> G[logs_request_volume view]
    D --> H[logs_error_rate view]
    D --> I[logs_p99_latency view]
    H --> J[logs_anomaly_alerts view]
    J -->|Kafka sink| K[PagerDuty / Slack]
    G -->|PostgreSQL sink| L[Grafana]

The log events flow to Kafka once. Logstash reads from Kafka and indexes into Elasticsearch for full-text search and long-term storage. RisingWave also reads from the same Kafka topic and maintains the streaming aggregations. You do not change your existing ELK pipeline -- you add the streaming SQL layer alongside it.

This approach lets you migrate gradually. Start by moving your most latency-sensitive alert rules to RisingWave. Observe the difference in alert firing speed during the next incident. Expand from there.

FAQ

How does streaming SQL handle schema changes in log formats?

In RisingWave, changing the schema of a source or table requires altering the object and potentially rebuilding downstream materialized views. For minor additions (new fields), you can use ALTER TABLE to add a column with a default value, and existing materialized views that do not reference the new field continue working without modification. For breaking changes (renamed fields, type changes), you rebuild the affected views. This is comparable to a Logstash pipeline restart plus an Elasticsearch re-index, but scoped precisely to what changed. One practical pattern is to make your source schema wide (accept all fields from the log line) and filter to specific fields inside the materialized views. That way, adding a field to the log format requires no changes to the streaming pipeline at all.

Can RisingWave ingest logs directly from Filebeat or Fluentd without Kafka?

RisingWave natively ingests from Apache Kafka, AWS Kinesis, Pulsar, and several other message brokers. Filebeat and Fluentd do not have native RisingWave connectors, but both support Kafka as an output target. The recommended pattern is: Filebeat/Fluentd --> Kafka --> RisingWave. This also gives you durability -- if RisingWave is temporarily unavailable, Kafka retains the log events and RisingWave catches up from the last committed offset when it resumes.

What happens to materialized view results if RisingWave restarts?

RisingWave stores materialized view state persistently, not in memory. On restart, it recovers the view state from its storage backend (S3-compatible object storage or local disk) and resumes consuming from the last committed Kafka offset. You do not lose historical aggregations on restart. This is a fundamental difference from in-memory stream processors: the computed error rates, latency percentiles, and alert states survive process restarts without reprocessing the full history.

Is streaming SQL better than Kibana alerting for production use?

Kibana alerting works by periodically re-running an aggregation query against Elasticsearch on a schedule (typically every 1-5 minutes). This means your alert detection latency is at least as long as the schedule interval, plus indexing lag. Streaming SQL alert detection latency is the time between a log event arriving in Kafka and the downstream materialized view updating -- typically under one second. For SRE on-call workflows, the difference between a one-minute alert and a sub-second alert is the difference between catching an incident at 5% error rate versus 40%. Streaming SQL does not replace Kibana alerting for complex threshold configurations or multi-condition rules -- it is strictly faster for the real-time detection case.

Conclusion

ELK gives you full-text search and rich investigation tools. It does not give you instant error rate computation or sub-second alerting without significant over-provisioning of Elasticsearch capacity. Streaming SQL closes that gap.

In this tutorial, you built a complete log analysis pipeline with four materialized views covering:

  • Request volume and error counts per endpoint, updated with every incoming log event
  • Error rate percentage with a division-safe calculation that handles new or inactive endpoints
  • p99, p95, and p50 latency using PERCENTILE_CONT, computed continuously without full dataset scans
  • Anomaly detection using cascading materialized views and threshold-based severity classification

Every SQL statement in this article was verified against RisingWave 2.8.0. The dataset deliberately simulates a realistic incident pattern -- a single endpoint failing at 57% error rate with 4x normal latency -- to show how quickly each view surfaces the problem.

The next step is connecting your existing Kafka log topic to a RisingWave source and translating your highest-priority Kibana alert rules into materialized views. Your on-call engineer will notice the difference on the next incident.


Ready to build your own real-time log analysis pipeline? Try RisingWave Cloud free -- no credit card required. Sign up here.

Join our Slack community to ask questions and connect with other stream processing developers.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.