Streaming SIEM: Process Security Events in Real Time

Streaming SIEM: Process Security Events in Real Time

Streaming SIEM: Process Security Events in Real Time

A traditional Security Information and Event Management (SIEM) platform works like a search engine for logs. Events arrive, get indexed, and sit waiting for an analyst to query them. Detection rules run on a schedule, typically every minute or every five minutes. That gap is where attackers move laterally, escalate privileges, and exfiltrate data before any alert fires.

A streaming SIEM inverts this model. Instead of querying historical data on demand, it maintains continuously updated views of security patterns. When a detection condition is satisfied, an alert fires immediately -- no waiting for the next scheduled rule evaluation.

RisingWave, an open-source PostgreSQL-compatible streaming database, provides the primitives to build a streaming SIEM: Kafka ingestion, SQL-defined detection logic, materialized views for real-time aggregation, and a PostgreSQL-compatible serving layer for analyst queries.

What events does a streaming SIEM process?

A production streaming SIEM correlates events from multiple sources simultaneously:

  • Authentication events: login attempts, MFA challenges, session creation and termination
  • Firewall and network logs: connection attempts, port scans, blocked traffic
  • Endpoint telemetry: process creation, file modifications, registry changes
  • Application logs: API calls, error rates, unusual request patterns
  • Identity provider events: role changes, group membership modifications, privilege grants

Each source becomes a Kafka topic. RisingWave maintains a source connector for each topic and correlates them through materialized view joins.

How to build a streaming SIEM with RisingWave

Step 1: Define event sources

CREATE SOURCE auth_events (
    event_id     VARCHAR,
    user_id      VARCHAR,
    source_ip    VARCHAR,
    event_type   VARCHAR,  -- login_success, login_failure, mfa_bypass
    event_time   TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'auth_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE firewall_logs (
    log_id       VARCHAR,
    src_ip       VARCHAR,
    dst_ip       VARCHAR,
    dst_port     INT,
    action       VARCHAR,  -- allow, deny, drop
    bytes        BIGINT,
    log_time     TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'firewall_logs',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE process_events (
    event_id     VARCHAR,
    host_id      VARCHAR,
    parent_pid   INT,
    child_pid    INT,
    process_name VARCHAR,
    command_line VARCHAR,
    event_time   TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'process_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Detect brute-force attacks

A burst of authentication failures from one IP within a short window is the signature of a brute-force or password spray attack.

CREATE MATERIALIZED VIEW brute_force_detections AS
SELECT
    source_ip,
    window_start,
    window_end,
    COUNT(*) FILTER (WHERE event_type = 'login_failure') AS failures,
    COUNT(DISTINCT user_id)                               AS targeted_accounts
FROM TUMBLE(auth_events, event_time, INTERVAL '5 minutes')
GROUP BY source_ip, window_start, window_end
HAVING COUNT(*) FILTER (WHERE event_type = 'login_failure') > 20;

Step 3: Detect lateral movement via process chains

Lateral movement often involves spawning unusual child processes from common parent processes like PowerShell or cmd.exe.

CREATE MATERIALIZED VIEW suspicious_process_chains AS
SELECT
    e.host_id,
    e.parent_pid,
    e.child_pid,
    e.process_name,
    e.command_line,
    e.event_time
FROM process_events e
WHERE
    e.process_name IN ('cmd.exe', 'powershell.exe', 'bash', 'sh')
    AND (
        e.command_line LIKE '%net user%'
        OR e.command_line LIKE '%whoami%'
        OR e.command_line LIKE '%net localgroup%'
        OR e.command_line LIKE '%mimikatz%'
        OR e.command_line LIKE '%procdump%'
    );

Step 4: Correlate network and auth events for multi-stage detection

A login success from an IP that was previously blocked by the firewall is a high-confidence indicator of compromise.

CREATE MATERIALIZED VIEW recently_blocked_ips AS
SELECT DISTINCT src_ip, MAX(log_time) AS last_blocked
FROM firewall_logs
WHERE action IN ('deny', 'drop')
GROUP BY src_ip
HAVING MAX(log_time) > NOW() - INTERVAL '1 hour';

CREATE MATERIALIZED VIEW auth_from_blocked_ip AS
SELECT
    a.user_id,
    a.source_ip,
    a.event_time   AS auth_time,
    b.last_blocked AS block_time,
    a.event_type
FROM auth_events a
JOIN recently_blocked_ips b ON a.source_ip = b.src_ip
WHERE a.event_type = 'login_success';

Step 5: Aggregate alerts into a unified alert stream

CREATE MATERIALIZED VIEW siem_alerts AS
SELECT
    'brute_force'                AS alert_type,
    source_ip                   AS entity,
    window_end                  AS detected_at,
    failures || ' failures in 5 min from ' || source_ip AS description
FROM brute_force_detections

UNION ALL

SELECT
    'suspicious_process'        AS alert_type,
    host_id                     AS entity,
    event_time                  AS detected_at,
    'Suspicious command: ' || command_line AS description
FROM suspicious_process_chains

UNION ALL

SELECT
    'auth_from_blocked_ip'      AS alert_type,
    user_id                     AS entity,
    auth_time                   AS detected_at,
    'Login from previously blocked IP ' || source_ip AS description
FROM auth_from_blocked_ip;

Step 6: Route alerts to a response platform

CREATE SINK siem_alerts_sink
FROM siem_alerts
WITH (
    connector = 'kafka',
    topic = 'siem_alerts',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

Security orchestration tools like PagerDuty, Splunk SOAR, or a custom webhook consumer can subscribe to the siem_alerts Kafka topic to trigger automated response playbooks.

How does this compare to Splunk or QRadar?

Splunk and QRadar are indexing platforms. They ingest logs, build inverted indexes for text search, and let analysts write correlation rules that run as scheduled searches. The minimum detection latency is the rule scheduling interval, typically 1 to 5 minutes.

RisingWave is a streaming processor. Detection views evaluate continuously. When the 21st login failure arrives from an IP within a 5-minute window, the brute_force_detections view updates immediately. There is no scheduling interval.

The trade-off: Splunk is better at ad-hoc log search across months of history. RisingWave is better at real-time pattern detection and multi-stream correlation. Most security programs benefit from running both: RisingWave for real-time detection, and a log archive (S3 plus Athena or Iceberg plus Trino) for forensic investigation.

Apache Flink can build the same streaming SIEM, but it imposes a Java or Scala tax on every detection rule. Your security team writes YAML or SQL detection rules today; rewriting them as Flink DataStream jobs requires a JVM engineer. Flink also requires a separate serving database for analysts to query results. RisingWave serves results directly over its PostgreSQL interface.

When a detection threshold changes, Flink requires a full job redeployment with state drain. RisingWave accepts an ALTER MATERIALIZED VIEW statement that takes effect on the next checkpoint. Security detection rules change frequently; the operational cost of Flink redeployments compounds quickly.


RisingWave is an open-source PostgreSQL-compatible streaming database. Explore the cybersecurity threat detection landing page for deployment guides and architecture diagrams.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.