How to Build a Real-Time SIEM Data Pipeline
A real-time SIEM data pipeline ingests security logs from Kafka, normalizes them into a unified schema using materialized views, and applies continuous SQL-based detection rules that fire alerts within seconds. RisingWave, a PostgreSQL-compatible streaming database built in Rust, lets security teams implement the entire pipeline in standard SQL without Java, Scala, or custom checkpoint configuration.
What Makes a SIEM Pipeline "Real-Time"?
Traditional Security Information and Event Management (SIEM) platforms ingest logs in micro-batches, often introducing detection latency measured in minutes. A real-time SIEM pipeline closes this gap by treating the log stream as a continuous, queryable data structure rather than a sequence of files to be periodically scanned.
Three properties define a genuinely real-time pipeline:
- Sub-second ingestion from distributed sources into a durable message bus (Kafka or Kinesis)
- Continuous normalization that maps heterogeneous vendor schemas to a common event format as logs arrive
- Incremental detection where aggregation windows and correlation rules are maintained in memory and updated on each event, not recomputed from scratch on a timer
RisingWave satisfies all three requirements. Its sources connect directly to Kafka, Pulsar, and Kinesis. Materialized views are continuously maintained as events arrive, with no polling interval. And because RisingWave is PostgreSQL-compatible, detection logic is plain SQL that any analyst can read and modify.
How Do You Model Security Log Sources in RisingWave?
The first step is creating source objects that map Kafka topic schemas to typed RisingWave tables. Security environments typically produce several distinct log classes: network flow logs, authentication events, endpoint process execution records, and cloud audit trails.
-- Network flow logs from a firewall Kafka topic
CREATE SOURCE firewall_logs (
event_time TIMESTAMPTZ,
src_ip VARCHAR,
dst_ip VARCHAR,
src_port INT,
dst_port INT,
protocol VARCHAR,
bytes_transferred BIGINT,
action VARCHAR,
rule_id VARCHAR
)
WITH (
connector = 'kafka',
topic = 'firewall.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Authentication events from an identity provider
CREATE SOURCE auth_events (
event_time TIMESTAMPTZ,
user_id VARCHAR,
source_ip VARCHAR,
country_code VARCHAR,
result VARCHAR, -- 'success' | 'failure' | 'mfa_challenge'
app_name VARCHAR,
session_id VARCHAR
)
WITH (
connector = 'kafka',
topic = 'auth.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
-- Endpoint process execution telemetry
CREATE SOURCE endpoint_events (
event_time TIMESTAMPTZ,
host_id VARCHAR,
process_name VARCHAR,
parent_pid INT,
child_pid INT,
command_line VARCHAR,
user_name VARCHAR,
sha256_hash VARCHAR
)
WITH (
connector = 'kafka',
topic = 'endpoint.telemetry',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;
Each CREATE SOURCE statement registers a continuously flowing input stream. RisingWave begins consuming from Kafka immediately and makes the data available to downstream materialized views.
How Do You Normalize Logs Into a Common Event Schema?
Raw logs from different vendors use incompatible field names, timestamp formats, and severity conventions. Before writing detection rules, you need a normalization layer that maps every source to a canonical event structure inspired by standards like the Elastic Common Schema (ECS) or OCSF.
-- Unified normalized event view across all sources
CREATE MATERIALIZED VIEW normalized_events AS
-- Network events
SELECT
event_time,
'network' AS event_category,
'firewall' AS event_source,
rule_id AS event_id,
src_ip AS actor_ip,
NULL AS actor_user,
dst_ip AS target_resource,
CAST(dst_port AS VARCHAR) AS target_detail,
action AS outcome,
bytes_transferred AS magnitude,
NULL AS raw_command
FROM firewall_logs
UNION ALL
-- Authentication events
SELECT
event_time,
'authentication' AS event_category,
'identity_provider' AS event_source,
session_id AS event_id,
source_ip AS actor_ip,
user_id AS actor_user,
app_name AS target_resource,
country_code AS target_detail,
result AS outcome,
NULL AS magnitude,
NULL AS raw_command
FROM auth_events
UNION ALL
-- Endpoint process events
SELECT
event_time,
'process_execution' AS event_category,
'edr_agent' AS event_source,
sha256_hash AS event_id,
NULL AS actor_ip,
user_name AS actor_user,
host_id AS target_resource,
process_name AS target_detail,
'executed' AS outcome,
NULL AS magnitude,
command_line AS raw_command
FROM endpoint_events;
Because this is a materialized view, RisingWave continuously updates it as new events arrive from any of the three sources. No scheduler, no cron job, and no ETL orchestration tool is required.
You can layer additional enrichment on top, such as IP geolocation lookups or asset classification, by joining the normalized view against reference tables:
-- Enrichment view: attach asset tier and geo region
CREATE MATERIALIZED VIEW enriched_events AS
SELECT
ne.*,
COALESCE(at.asset_tier, 'unknown') AS asset_tier,
COALESCE(gi.region, 'unknown') AS geo_region
FROM normalized_events ne
LEFT JOIN asset_taxonomy at ON ne.target_resource = at.host_id
LEFT JOIN geo_ip_reference gi ON inet_aton(ne.actor_ip)
BETWEEN gi.ip_range_start AND gi.ip_range_end;
How Do You Write Continuous Detection Rules With SQL?
Detection rules in a RisingWave SIEM pipeline are materialized views over the enriched event stream. Common patterns include threshold-based rules (brute-force login detection), behavioral rules (impossible travel), and correlation rules (lateral movement chains).
-- Rule 1: Brute-force login detection
-- Fire when a single IP has more than 10 auth failures in a 5-minute window
CREATE MATERIALIZED VIEW alert_brute_force AS
SELECT
window_start,
window_end,
actor_ip,
actor_user,
COUNT(*) AS failure_count,
'brute_force' AS rule_name,
'high' AS severity
FROM TUMBLE(
enriched_events,
event_time,
INTERVAL '5 minutes'
)
WHERE event_category = 'authentication'
AND outcome = 'failure'
GROUP BY window_start, window_end, actor_ip, actor_user
HAVING COUNT(*) > 10;
-- Rule 2: Impossible travel detection
-- Fire when the same user authenticates from two different countries
-- within a 30-minute window
CREATE MATERIALIZED VIEW alert_impossible_travel AS
SELECT
a.actor_user,
a.actor_ip AS ip_first,
b.actor_ip AS ip_second,
a.geo_region AS region_first,
b.geo_region AS region_second,
a.event_time AS time_first,
b.event_time AS time_second,
'impossible_travel' AS rule_name,
'critical' AS severity
FROM enriched_events a
JOIN enriched_events b
ON a.actor_user = b.actor_user
AND a.event_category = 'authentication'
AND b.event_category = 'authentication'
AND a.outcome = 'success'
AND b.outcome = 'success'
AND a.geo_region <> b.geo_region
AND b.event_time > a.event_time
AND b.event_time < a.event_time + INTERVAL '30 minutes';
-- Rule 3: Suspicious process execution on high-value assets
-- PowerShell with encoded commands on tier-1 servers
CREATE MATERIALIZED VIEW alert_suspicious_process AS
SELECT
event_time,
target_resource AS host_id,
actor_user,
raw_command,
asset_tier,
'encoded_powershell' AS rule_name,
'high' AS severity
FROM enriched_events
WHERE event_category = 'process_execution'
AND asset_tier = 'tier_1'
AND target_detail ILIKE '%powershell%'
AND raw_command ILIKE '%-EncodedCommand%';
-- Aggregated alert summary: roll-up across all rule views
CREATE MATERIALIZED VIEW alert_summary AS
SELECT window_start, window_end, rule_name, severity, COUNT(*) AS alert_count
FROM TUMBLE(
(
SELECT event_time, rule_name, severity FROM alert_brute_force
UNION ALL
SELECT time_second AS event_time, rule_name, severity FROM alert_impossible_travel
UNION ALL
SELECT event_time, rule_name, severity FROM alert_suspicious_process
),
event_time,
INTERVAL '1 minute'
)
GROUP BY window_start, window_end, rule_name, severity;
RisingWave maintains the state for each windowed aggregation incrementally. As each new event is processed, only the affected window buckets are updated. This is fundamentally different from executing a batch query on a schedule, because no event is ever reprocessed and latency scales with ingestion lag rather than query complexity.
How Do You Route Alerts to Downstream Systems?
A detection pipeline is only valuable if alerts reach the teams and tools that act on them. RisingWave supports sinking materialized view output directly to Kafka, enabling downstream consumers such as ticketing systems, SOAR platforms, and dashboards to subscribe to the alert stream.
-- Sink high-severity alerts to a Kafka topic for the SOAR platform
CREATE SINK soar_alerts_sink
FROM (
SELECT
NOW() AS alert_fired_at,
rule_name,
severity,
actor_ip,
actor_user,
target_resource,
failure_count AS signal_value,
window_start,
window_end
FROM alert_brute_force
WHERE severity IN ('high', 'critical')
UNION ALL
SELECT
NOW() AS alert_fired_at,
rule_name,
severity,
ip_first AS actor_ip,
actor_user,
NULL AS target_resource,
NULL AS signal_value,
time_first AS window_start,
time_second AS window_end
FROM alert_impossible_travel
WHERE severity = 'critical'
)
WITH (
connector = 'kafka',
topic = 'siem.alerts.high',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
-- Sink all alerts to an audit log topic for long-term retention
CREATE SINK alert_audit_sink
FROM alert_summary
WITH (
connector = 'kafka',
topic = 'siem.alerts.audit',
properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;
Any SOAR platform, PagerDuty integration, or Elasticsearch ingest pipeline can subscribe to the siem.alerts.high topic and act on alerts within milliseconds of detection.
How Does the RisingWave Pipeline Compare to a Flink and Elasticsearch Stack?
The traditional open-source approach for real-time SIEM pipelines combines Apache Flink for stream processing with Elasticsearch for storage and search. This combination works, but it introduces significant operational overhead that RisingWave eliminates.
| Dimension | Flink + Elasticsearch | RisingWave |
| Programming model | Java or Scala DataStream API, or Flink SQL | Standard SQL (PostgreSQL-compatible) |
| State management | Requires RocksDB state backend, manual checkpoint intervals, savepoint coordination | Automatic, no configuration required |
| Detection latency | Sub-second possible but requires careful operator chaining and checkpoint tuning | Sub-second by default; materialized views update on each event |
| Schema flexibility | Schema changes require job restart and state migration | ALTER MATERIALIZED VIEW with online schema evolution |
| Queryability | Requires separate Elasticsearch query layer; no SQL joins across streams | Any PostgreSQL client can query live materialized views directly |
| Operational footprint | Flink cluster (JobManager, TaskManagers) plus Elasticsearch cluster | Single RisingWave cluster handles ingestion, processing, and serving |
| Learning curve | Java/Scala expertise for DataStream API; SQL dialect differs from PostgreSQL | Security analysts with SQL knowledge can write rules immediately |
| Checkpoint configuration | Must tune checkpoint intervals, state TTL, and parallelism to avoid data loss | No checkpoints to configure; RisingWave manages durability internally |
The operational gap is most visible when teams need to update detection rules. In a Flink pipeline, modifying an aggregation means updating a Java or Scala job, recompiling, deploying a new JAR, and potentially restoring from a savepoint if state schema changed. In RisingWave, updating a detection rule is an ALTER MATERIALIZED VIEW statement or a DROP and CREATE pair, and it can be done from any SQL client in seconds.
Flink's DataStream API does offer lower-level flexibility for highly custom stateful logic, and its mature ecosystem includes connectors for many data sources. But for the normalization, windowing, and threshold-based patterns that cover the majority of SIEM detection use cases, RisingWave's SQL model is strictly more productive.
For teams already heavily invested in Elasticsearch, RisingWave and Elasticsearch are not mutually exclusive. You can sink normalized events from RisingWave into Elasticsearch for historical full-text search while using RisingWave materialized views for real-time detection, getting the benefits of both without running a Flink cluster.
What Does the Complete Pipeline Architecture Look Like?
Putting it all together, a production-grade real-time SIEM pipeline with RisingWave has these layers:
- Collection: Log shippers (Fluent Bit, Vector, Filebeat) forward events from endpoints, network devices, and cloud services to Kafka topics segmented by source type.
- Ingestion: RisingWave
CREATE SOURCEstatements connect to each Kafka topic and make the raw stream available as a queryable relation. - Normalization: A
normalized_eventsmaterialized view unifies all sources into the common schema. - Enrichment: An
enriched_eventsview joins the normalized stream against reference data for asset classification and IP geolocation. - Detection: Individual alert materialized views implement detection rules using SQL window functions, joins, and aggregations.
- Delivery:
CREATE SINKstatements route alerts to downstream Kafka topics consumed by SOAR platforms, ticketing systems, and notification channels. - Querying: Security analysts connect via
psqlor any PostgreSQL-compatible BI tool to query live alert views, run ad-hoc threat hunting queries, and inspect the raw event stream.
The entire pipeline can be defined in a single SQL migration file, version-controlled alongside application code, and reviewed by any engineer familiar with SQL.
Frequently Asked Questions
What log sources does RisingWave support natively?
RisingWave natively ingests from Kafka, Apache Pulsar, Amazon Kinesis, PostgreSQL CDC, and MySQL CDC. For sources that do not have a native connector, route logs through Kafka first using a log shipper or Kafka Connect.
Can RisingWave replace our existing SIEM platform entirely?
RisingWave handles the real-time detection and normalization layers. Features like case management, threat intelligence enrichment feeds, and compliance reporting that traditional SIEM vendors bundle are better handled by purpose-built tools sitting downstream of the RisingWave alert sinks.
How does RisingWave handle schema evolution when log formats change?
Because RisingWave sources and materialized views are defined in SQL, schema changes follow standard SQL ALTER semantics. You can add nullable columns to sources without restarting the pipeline, and you can recreate detection views with updated logic while the rest of the pipeline continues running.
Is RisingWave suitable for high-volume environments?
RisingWave is built in Rust for high throughput and low latency. It supports horizontal scaling by increasing parallelism and adding compute nodes. Benchmarks show it processing millions of events per second with sub-second end-to-end latency on commodity hardware.
What happens to alerts when RisingWave restarts?
RisingWave persists materialized view state durably to object storage (S3-compatible). On restart, it recovers state from the last consistent checkpoint and resumes consuming from the Kafka offset where it left off, ensuring no events are missed or double-counted.
Ready to build a real-time SIEM pipeline for your security team? Explore how RisingWave powers threat detection use cases at risingwave.com/cybersecurity-threat-detection/.

