How to Build a Real-Time SIEM Data Pipeline

How to Build a Real-Time SIEM Data Pipeline

·

10 min read

How to Build a Real-Time SIEM Data Pipeline

A real-time SIEM data pipeline ingests security logs from Kafka, normalizes them into a unified schema using materialized views, and applies continuous SQL-based detection rules that fire alerts within seconds. RisingWave, a PostgreSQL-compatible streaming database built in Rust, lets security teams implement the entire pipeline in standard SQL without Java, Scala, or custom checkpoint configuration.

What Makes a SIEM Pipeline "Real-Time"?

Traditional Security Information and Event Management (SIEM) platforms ingest logs in micro-batches, often introducing detection latency measured in minutes. A real-time SIEM pipeline closes this gap by treating the log stream as a continuous, queryable data structure rather than a sequence of files to be periodically scanned.

Three properties define a genuinely real-time pipeline:

  • Sub-second ingestion from distributed sources into a durable message bus (Kafka or Kinesis)
  • Continuous normalization that maps heterogeneous vendor schemas to a common event format as logs arrive
  • Incremental detection where aggregation windows and correlation rules are maintained in memory and updated on each event, not recomputed from scratch on a timer

RisingWave satisfies all three requirements. Its sources connect directly to Kafka, Pulsar, and Kinesis. Materialized views are continuously maintained as events arrive, with no polling interval. And because RisingWave is PostgreSQL-compatible, detection logic is plain SQL that any analyst can read and modify.

How Do You Model Security Log Sources in RisingWave?

The first step is creating source objects that map Kafka topic schemas to typed RisingWave tables. Security environments typically produce several distinct log classes: network flow logs, authentication events, endpoint process execution records, and cloud audit trails.

-- Network flow logs from a firewall Kafka topic
CREATE SOURCE firewall_logs (
    event_time        TIMESTAMPTZ,
    src_ip            VARCHAR,
    dst_ip            VARCHAR,
    src_port          INT,
    dst_port          INT,
    protocol          VARCHAR,
    bytes_transferred BIGINT,
    action            VARCHAR,
    rule_id           VARCHAR
)
WITH (
    connector = 'kafka',
    topic     = 'firewall.events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Authentication events from an identity provider
CREATE SOURCE auth_events (
    event_time   TIMESTAMPTZ,
    user_id      VARCHAR,
    source_ip    VARCHAR,
    country_code VARCHAR,
    result       VARCHAR,   -- 'success' | 'failure' | 'mfa_challenge'
    app_name     VARCHAR,
    session_id   VARCHAR
)
WITH (
    connector = 'kafka',
    topic     = 'auth.events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

-- Endpoint process execution telemetry
CREATE SOURCE endpoint_events (
    event_time   TIMESTAMPTZ,
    host_id      VARCHAR,
    process_name VARCHAR,
    parent_pid   INT,
    child_pid    INT,
    command_line VARCHAR,
    user_name    VARCHAR,
    sha256_hash  VARCHAR
)
WITH (
    connector = 'kafka',
    topic     = 'endpoint.telemetry',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Each CREATE SOURCE statement registers a continuously flowing input stream. RisingWave begins consuming from Kafka immediately and makes the data available to downstream materialized views.

How Do You Normalize Logs Into a Common Event Schema?

Raw logs from different vendors use incompatible field names, timestamp formats, and severity conventions. Before writing detection rules, you need a normalization layer that maps every source to a canonical event structure inspired by standards like the Elastic Common Schema (ECS) or OCSF.

-- Unified normalized event view across all sources
CREATE MATERIALIZED VIEW normalized_events AS
    -- Network events
    SELECT
        event_time,
        'network'             AS event_category,
        'firewall'            AS event_source,
        rule_id               AS event_id,
        src_ip                AS actor_ip,
        NULL                  AS actor_user,
        dst_ip                AS target_resource,
        CAST(dst_port AS VARCHAR) AS target_detail,
        action                AS outcome,
        bytes_transferred     AS magnitude,
        NULL                  AS raw_command
    FROM firewall_logs

    UNION ALL

    -- Authentication events
    SELECT
        event_time,
        'authentication'      AS event_category,
        'identity_provider'   AS event_source,
        session_id            AS event_id,
        source_ip             AS actor_ip,
        user_id               AS actor_user,
        app_name              AS target_resource,
        country_code          AS target_detail,
        result                AS outcome,
        NULL                  AS magnitude,
        NULL                  AS raw_command
    FROM auth_events

    UNION ALL

    -- Endpoint process events
    SELECT
        event_time,
        'process_execution'   AS event_category,
        'edr_agent'           AS event_source,
        sha256_hash           AS event_id,
        NULL                  AS actor_ip,
        user_name             AS actor_user,
        host_id               AS target_resource,
        process_name          AS target_detail,
        'executed'            AS outcome,
        NULL                  AS magnitude,
        command_line          AS raw_command
    FROM endpoint_events;

Because this is a materialized view, RisingWave continuously updates it as new events arrive from any of the three sources. No scheduler, no cron job, and no ETL orchestration tool is required.

You can layer additional enrichment on top, such as IP geolocation lookups or asset classification, by joining the normalized view against reference tables:

-- Enrichment view: attach asset tier and geo region
CREATE MATERIALIZED VIEW enriched_events AS
SELECT
    ne.*,
    COALESCE(at.asset_tier, 'unknown')   AS asset_tier,
    COALESCE(gi.region, 'unknown')       AS geo_region
FROM normalized_events ne
LEFT JOIN asset_taxonomy  at ON ne.target_resource = at.host_id
LEFT JOIN geo_ip_reference gi ON inet_aton(ne.actor_ip)
                                  BETWEEN gi.ip_range_start AND gi.ip_range_end;

How Do You Write Continuous Detection Rules With SQL?

Detection rules in a RisingWave SIEM pipeline are materialized views over the enriched event stream. Common patterns include threshold-based rules (brute-force login detection), behavioral rules (impossible travel), and correlation rules (lateral movement chains).

-- Rule 1: Brute-force login detection
-- Fire when a single IP has more than 10 auth failures in a 5-minute window
CREATE MATERIALIZED VIEW alert_brute_force AS
SELECT
    window_start,
    window_end,
    actor_ip,
    actor_user,
    COUNT(*)            AS failure_count,
    'brute_force'       AS rule_name,
    'high'              AS severity
FROM TUMBLE(
    enriched_events,
    event_time,
    INTERVAL '5 minutes'
)
WHERE event_category = 'authentication'
  AND outcome        = 'failure'
GROUP BY window_start, window_end, actor_ip, actor_user
HAVING COUNT(*) > 10;

-- Rule 2: Impossible travel detection
-- Fire when the same user authenticates from two different countries
-- within a 30-minute window
CREATE MATERIALIZED VIEW alert_impossible_travel AS
SELECT
    a.actor_user,
    a.actor_ip          AS ip_first,
    b.actor_ip          AS ip_second,
    a.geo_region        AS region_first,
    b.geo_region        AS region_second,
    a.event_time        AS time_first,
    b.event_time        AS time_second,
    'impossible_travel' AS rule_name,
    'critical'          AS severity
FROM enriched_events a
JOIN enriched_events b
  ON  a.actor_user   = b.actor_user
  AND a.event_category = 'authentication'
  AND b.event_category = 'authentication'
  AND a.outcome      = 'success'
  AND b.outcome      = 'success'
  AND a.geo_region  <> b.geo_region
  AND b.event_time   > a.event_time
  AND b.event_time   < a.event_time + INTERVAL '30 minutes';

-- Rule 3: Suspicious process execution on high-value assets
-- PowerShell with encoded commands on tier-1 servers
CREATE MATERIALIZED VIEW alert_suspicious_process AS
SELECT
    event_time,
    target_resource     AS host_id,
    actor_user,
    raw_command,
    asset_tier,
    'encoded_powershell' AS rule_name,
    'high'              AS severity
FROM enriched_events
WHERE event_category = 'process_execution'
  AND asset_tier     = 'tier_1'
  AND target_detail  ILIKE '%powershell%'
  AND raw_command    ILIKE '%-EncodedCommand%';

-- Aggregated alert summary: roll-up across all rule views
CREATE MATERIALIZED VIEW alert_summary AS
SELECT window_start, window_end, rule_name, severity, COUNT(*) AS alert_count
FROM TUMBLE(
    (
      SELECT event_time, rule_name, severity FROM alert_brute_force
      UNION ALL
      SELECT time_second AS event_time, rule_name, severity FROM alert_impossible_travel
      UNION ALL
      SELECT event_time, rule_name, severity FROM alert_suspicious_process
    ),
    event_time,
    INTERVAL '1 minute'
)
GROUP BY window_start, window_end, rule_name, severity;

RisingWave maintains the state for each windowed aggregation incrementally. As each new event is processed, only the affected window buckets are updated. This is fundamentally different from executing a batch query on a schedule, because no event is ever reprocessed and latency scales with ingestion lag rather than query complexity.

How Do You Route Alerts to Downstream Systems?

A detection pipeline is only valuable if alerts reach the teams and tools that act on them. RisingWave supports sinking materialized view output directly to Kafka, enabling downstream consumers such as ticketing systems, SOAR platforms, and dashboards to subscribe to the alert stream.

-- Sink high-severity alerts to a Kafka topic for the SOAR platform
CREATE SINK soar_alerts_sink
FROM (
    SELECT
        NOW()                                  AS alert_fired_at,
        rule_name,
        severity,
        actor_ip,
        actor_user,
        target_resource,
        failure_count                          AS signal_value,
        window_start,
        window_end
    FROM alert_brute_force
    WHERE severity IN ('high', 'critical')

    UNION ALL

    SELECT
        NOW()                                  AS alert_fired_at,
        rule_name,
        severity,
        ip_first                               AS actor_ip,
        actor_user,
        NULL                                   AS target_resource,
        NULL                                   AS signal_value,
        time_first                             AS window_start,
        time_second                            AS window_end
    FROM alert_impossible_travel
    WHERE severity = 'critical'
)
WITH (
    connector = 'kafka',
    topic     = 'siem.alerts.high',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

-- Sink all alerts to an audit log topic for long-term retention
CREATE SINK alert_audit_sink
FROM alert_summary
WITH (
    connector = 'kafka',
    topic     = 'siem.alerts.audit',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Any SOAR platform, PagerDuty integration, or Elasticsearch ingest pipeline can subscribe to the siem.alerts.high topic and act on alerts within milliseconds of detection.

The traditional open-source approach for real-time SIEM pipelines combines Apache Flink for stream processing with Elasticsearch for storage and search. This combination works, but it introduces significant operational overhead that RisingWave eliminates.

DimensionFlink + ElasticsearchRisingWave
Programming modelJava or Scala DataStream API, or Flink SQLStandard SQL (PostgreSQL-compatible)
State managementRequires RocksDB state backend, manual checkpoint intervals, savepoint coordinationAutomatic, no configuration required
Detection latencySub-second possible but requires careful operator chaining and checkpoint tuningSub-second by default; materialized views update on each event
Schema flexibilitySchema changes require job restart and state migrationALTER MATERIALIZED VIEW with online schema evolution
QueryabilityRequires separate Elasticsearch query layer; no SQL joins across streamsAny PostgreSQL client can query live materialized views directly
Operational footprintFlink cluster (JobManager, TaskManagers) plus Elasticsearch clusterSingle RisingWave cluster handles ingestion, processing, and serving
Learning curveJava/Scala expertise for DataStream API; SQL dialect differs from PostgreSQLSecurity analysts with SQL knowledge can write rules immediately
Checkpoint configurationMust tune checkpoint intervals, state TTL, and parallelism to avoid data lossNo checkpoints to configure; RisingWave manages durability internally

The operational gap is most visible when teams need to update detection rules. In a Flink pipeline, modifying an aggregation means updating a Java or Scala job, recompiling, deploying a new JAR, and potentially restoring from a savepoint if state schema changed. In RisingWave, updating a detection rule is an ALTER MATERIALIZED VIEW statement or a DROP and CREATE pair, and it can be done from any SQL client in seconds.

Flink's DataStream API does offer lower-level flexibility for highly custom stateful logic, and its mature ecosystem includes connectors for many data sources. But for the normalization, windowing, and threshold-based patterns that cover the majority of SIEM detection use cases, RisingWave's SQL model is strictly more productive.

For teams already heavily invested in Elasticsearch, RisingWave and Elasticsearch are not mutually exclusive. You can sink normalized events from RisingWave into Elasticsearch for historical full-text search while using RisingWave materialized views for real-time detection, getting the benefits of both without running a Flink cluster.

What Does the Complete Pipeline Architecture Look Like?

Putting it all together, a production-grade real-time SIEM pipeline with RisingWave has these layers:

  1. Collection: Log shippers (Fluent Bit, Vector, Filebeat) forward events from endpoints, network devices, and cloud services to Kafka topics segmented by source type.
  2. Ingestion: RisingWave CREATE SOURCE statements connect to each Kafka topic and make the raw stream available as a queryable relation.
  3. Normalization: A normalized_events materialized view unifies all sources into the common schema.
  4. Enrichment: An enriched_events view joins the normalized stream against reference data for asset classification and IP geolocation.
  5. Detection: Individual alert materialized views implement detection rules using SQL window functions, joins, and aggregations.
  6. Delivery: CREATE SINK statements route alerts to downstream Kafka topics consumed by SOAR platforms, ticketing systems, and notification channels.
  7. Querying: Security analysts connect via psql or any PostgreSQL-compatible BI tool to query live alert views, run ad-hoc threat hunting queries, and inspect the raw event stream.

The entire pipeline can be defined in a single SQL migration file, version-controlled alongside application code, and reviewed by any engineer familiar with SQL.

Frequently Asked Questions

What log sources does RisingWave support natively?

RisingWave natively ingests from Kafka, Apache Pulsar, Amazon Kinesis, PostgreSQL CDC, and MySQL CDC. For sources that do not have a native connector, route logs through Kafka first using a log shipper or Kafka Connect.

Can RisingWave replace our existing SIEM platform entirely?

RisingWave handles the real-time detection and normalization layers. Features like case management, threat intelligence enrichment feeds, and compliance reporting that traditional SIEM vendors bundle are better handled by purpose-built tools sitting downstream of the RisingWave alert sinks.

How does RisingWave handle schema evolution when log formats change?

Because RisingWave sources and materialized views are defined in SQL, schema changes follow standard SQL ALTER semantics. You can add nullable columns to sources without restarting the pipeline, and you can recreate detection views with updated logic while the rest of the pipeline continues running.

Is RisingWave suitable for high-volume environments?

RisingWave is built in Rust for high throughput and low latency. It supports horizontal scaling by increasing parallelism and adding compute nodes. Benchmarks show it processing millions of events per second with sub-second end-to-end latency on commodity hardware.

What happens to alerts when RisingWave restarts?

RisingWave persists materialized view state durably to object storage (S3-compatible). On restart, it recovers state from the last consistent checkpoint and resumes consuming from the Kafka offset where it left off, ensuring no events are missed or double-counted.


Ready to build a real-time SIEM pipeline for your security team? Explore how RisingWave powers threat detection use cases at risingwave.com/cybersecurity-threat-detection/.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.