Building a Streaming Compliance Monitoring Pipeline

Building a Streaming Compliance Monitoring Pipeline

Building a Streaming Compliance Monitoring Pipeline

Compliance monitoring traditionally runs as scheduled batch jobs that scan audit logs overnight. A streaming compliance pipeline evaluates regulatory rules continuously as events occur, cutting audit-to-alert latency from days to seconds. This guide shows how to build one with RisingWave using only SQL, covering PCI DSS, HIPAA, SOX, and GDPR use cases.

What Compliance Violations Require Real-Time Detection?

Batch jobs catch violations after the fact. Most regulatory frameworks now expect organizations to detect and contain incidents quickly, and the gap between a violation occurring and a team being notified is the window during which damage accumulates.

The following regulatory scenarios demand near-real-time detection:

PCI DSS governs cardholder data environments. Violations that need immediate attention include access to cardholder data outside approved business hours, access from IP addresses that fall outside known corporate or VPN ranges, and access by users whose role does not require payment data. A nightly batch report means an attacker who exfiltrates card data at 2 AM goes undetected until morning.

HIPAA requires safeguarding Protected Health Information (PHI). Critical violations include a staff member accessing patient records outside their job function, a contractor querying records for patients they have no care relationship with, and any bulk export of PHI records that exceeds normal operational volumes. HIPAA's Breach Notification Rule imposes strict timelines once an organization becomes aware of a breach, making fast detection directly relevant to legal exposure.

SOX focuses on the integrity of financial records. Unauthorized changes to financial data, modifications to system configuration in financial applications, and access by users who have been offboarded but retain active credentials all fall under SOX audit scope. A streaming pipeline can flag a configuration change to a financial system seconds after it occurs rather than in next quarter's audit report.

GDPR requires controlling access to personal data. Bulk exports of data subject records, access by parties not listed in data processing agreements, and cross-border transfers initiated by unexpected users are all GDPR risk events. Real-time detection lets a data protection officer respond before data leaves the organization's perimeter.

Beyond specific regulations, any access policy violation applies here: access outside approved hours, from unapproved geographic locations, using service accounts that should be dormant, or against resource types the user is not permitted to reach. Streaming catches these patterns as they form.

How Does the Streaming Architecture Work?

The architecture has three layers: an event source (Kafka), a stream processing layer (RisingWave), and output sinks (another Kafka topic for alerts, and optionally Apache Iceberg for archival). RisingWave sits in the middle and holds the compliance logic as materialized views written in standard SQL.

Defining the Sources

Start by connecting RisingWave to the audit events topic in Kafka:

CREATE SOURCE audit_events (
    event_id     VARCHAR,
    user_id      VARCHAR,
    resource_type VARCHAR,
    resource_id  VARCHAR,
    action       VARCHAR,
    source_ip    INET,
    event_time   TIMESTAMPTZ
)
WITH (
    connector  = 'kafka',
    topic      = 'audit_events',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Next, create a reference table for user access policies. This table stores per-user rules: which resource types the user is permitted to access, what hours they are allowed to work, and which IP ranges are considered trusted for that user. Unlike the Kafka source, this is a regular RisingWave table that can be updated as policies change.

CREATE TABLE user_access_policies (
    user_id              VARCHAR PRIMARY KEY,
    allowed_resource_types VARCHAR[],
    allowed_hours_start  INT,   -- 0..23 in UTC
    allowed_hours_end    INT,
    allowed_ip_ranges    VARCHAR[]  -- CIDR notation
);

Populate the policy table from your identity and access management system, or load it via a CDC connector pointed at a PostgreSQL policy store.

How Do You Flag After-Hours and Out-of-Scope Resource Access?

This materialized view joins the live audit stream against the policy table and flags any event where the event time falls outside the user's permitted window or the resource type is not in their allowed list:

CREATE MATERIALIZED VIEW access_policy_violations AS
SELECT
    e.event_id,
    e.user_id,
    e.resource_type,
    e.resource_id,
    e.action,
    e.source_ip,
    e.event_time,
    CASE
        WHEN NOT (e.resource_type = ANY(p.allowed_resource_types))
            THEN 'UNAUTHORIZED_RESOURCE_TYPE'
        WHEN EXTRACT(HOUR FROM e.event_time) < p.allowed_hours_start
          OR EXTRACT(HOUR FROM e.event_time) > p.allowed_hours_end
            THEN 'OUTSIDE_ALLOWED_HOURS'
        ELSE 'UNKNOWN_VIOLATION'
    END AS violation_type
FROM audit_events e
JOIN user_access_policies p ON e.user_id = p.user_id
WHERE
    NOT (e.resource_type = ANY(p.allowed_resource_types))
    OR EXTRACT(HOUR FROM e.event_time) < p.allowed_hours_start
    OR EXTRACT(HOUR FROM e.event_time) > p.allowed_hours_end;

This view is always current. Every new audit event is evaluated as soon as it arrives in Kafka. There is no polling interval to tune and no batch window to wait for.

How Do You Detect Bulk Data Export Anomalies?

Bulk access is a HIPAA and GDPR risk indicator. This view counts access events per user per resource type in a rolling five-minute window, surfacing any user who exceeds a threshold:

CREATE MATERIALIZED VIEW bulk_access_alerts AS
SELECT
    user_id,
    resource_type,
    COUNT(*) AS access_count,
    window_start,
    window_end
FROM TUMBLE(audit_events, event_time, INTERVAL '5 MINUTES')
GROUP BY user_id, resource_type, window_start, window_end
HAVING COUNT(*) > 100;

Adjust the threshold and window size to match your data sensitivity. PHI records may warrant a lower threshold than general application logs.

How Do You Detect Access from Unexpected IP Addresses?

The stream-table join makes IP-based detection straightforward. RisingWave joins each event against the user's allowed IP ranges and flags any event originating from a CIDR block that is not in the approved list:

CREATE MATERIALIZED VIEW ip_anomaly_alerts AS
SELECT
    e.event_id,
    e.user_id,
    e.source_ip,
    e.resource_type,
    e.event_time,
    'IP_NOT_IN_APPROVED_RANGES' AS violation_type
FROM audit_events e
JOIN user_access_policies p ON e.user_id = p.user_id
WHERE NOT EXISTS (
    SELECT 1
    FROM unnest(p.allowed_ip_ranges) AS allowed_range
    WHERE e.source_ip << allowed_range::inet
);

This uses PostgreSQL's << network containment operator, which RisingWave supports natively because it is PostgreSQL-compatible at the protocol and SQL dialect level.

How Do You Stream Violations to a Downstream Alert Topic?

Once violations are materialized, sink them to a Kafka topic so your SIEM, ticketing system, or on-call alerting pipeline can consume them:

CREATE SINK compliance_violations_sink
FROM access_policy_violations
WITH (
    connector = 'kafka',
    topic = 'compliance_violations',
    properties.bootstrap.server = 'broker:9092'
)
FORMAT PLAIN ENCODE JSON;

You can create additional sinks from bulk_access_alerts and ip_anomaly_alerts to the same topic with a violation_type field differentiating the alert categories, or route them to separate topics if your downstream consumers need independent feeds.

How Does Streaming Compare to Batch Compliance Jobs?

Traditional compliance pipelines run as scheduled jobs. A typical setup involves an ETL process that collects audit logs from application databases, writes them to a data warehouse, and runs SQL queries against the warehouse on a nightly or hourly schedule. The audit team receives a report the next morning.

The core problem is the detection window. If a violation occurs at 1 AM and the batch job runs at 6 AM, the organization has no awareness of the violation for five hours. In that window, an attacker can complete an exfiltration, an insider can copy sensitive records, and a misconfiguration can propagate across systems.

Streaming eliminates the detection window at the cost of slightly more infrastructure complexity. The table below summarizes the key differences:

DimensionBatch Compliance JobsRisingWave Streaming
Detection latencyHours to daysSeconds
SQL complexityStandard SQL on static tablesStreaming SQL with windows and joins
InfrastructureData warehouse + schedulerKafka + RisingWave
Policy updatesRedeploy job or change queryUPDATE the reference table
Historical queriesNative warehouse queriesDelegate to Spark or Trino on Iceberg
Operational loadLow (scheduled jobs)Low (declarative views, no custom operators)

Flink is a common alternative for streaming compliance pipelines, but it requires Java or Scala for stateful logic, RocksDB and checkpoint configuration for fault tolerance, and significant infrastructure expertise to operate. RisingWave runs on Rust, stores state in S3 by default, and exposes a PostgreSQL-compatible wire protocol, which means compliance logic can be written by anyone who knows SQL without a JVM toolchain.

How Does RisingWave Support Long-Term Compliance Archival with Apache Iceberg?

Regulatory frameworks impose retention requirements. HIPAA requires retaining certain records for six years. PCI DSS requires audit logs for at least one year with three months of immediate availability. GDPR does not set a universal retention period, but requires retaining personal data only as long as necessary and being able to prove it.

Apache Iceberg on object storage such as Amazon S3 is a cost-effective archival layer. RisingWave can write directly to Iceberg as a sink, creating an immutable, schema-tracked record of every audit event and every detected violation:

CREATE SINK audit_archive_sink
FROM audit_events
WITH (
    connector          = 'iceberg',
    type               = 'append-only',
    warehouse.path     = 's3://your-compliance-bucket/audit-archive',
    s3.region          = 'us-east-1',
    database.name      = 'compliance',
    table.name         = 'audit_events'
);

RisingWave writes to Iceberg continuously. The archive grows in real time rather than in nightly batches. When auditors or investigators need to query historical data spanning months or years, they run those queries against the Iceberg tables using Spark or Trino. These engines are purpose-built for large-scale analytical queries over Iceberg and handle petabyte-scale scans efficiently.

This division of labor keeps each system in its optimal role: RisingWave handles real-time detection, Iceberg provides the immutable archive, and Spark or Trino handle retrospective forensic analysis. You do not need RisingWave to read from Iceberg, and you do not need a separate batch archival job.

FAQ

What compliance frameworks does streaming monitoring support?

A streaming compliance pipeline applies to any framework that requires detecting policy violations promptly. PCI DSS, HIPAA, SOX, GDPR, and ISO 27001 are the most common, but the same pattern applies to internal security policies and contractual obligations. The compliance rules are encoded as SQL materialized views, so adapting them to a new framework means writing new SQL, not rebuilding infrastructure.

How does RisingWave handle stream-table joins for policy enforcement?

RisingWave stores access policies in a regular table, which can be updated at any time without restarting any pipeline. When an audit event arrives in the stream, RisingWave joins it against the policy table using standard SQL JOIN syntax. The result is evaluated immediately and included in the materialized view. Policy changes take effect for new events as soon as the table is updated.

How does streaming compliance monitoring compare to nightly batch jobs?

Batch jobs scan audit logs on a fixed schedule, leaving a detection window that can span hours. A streaming pipeline evaluates every event as it arrives, reducing detection latency to seconds. This is critical for active threats, where early containment limits damage, and for regulated industries, where breach notification timelines begin from the moment the organization becomes aware of an incident.

Can RisingWave write compliance data to Apache Iceberg for long-term retention?

Yes. RisingWave supports Iceberg as a sink and writes audit events and violation records continuously to Iceberg tables in object storage. This creates a low-cost, immutable compliance archive. Historical forensic queries run against the Iceberg tables using Spark or Trino, while RisingWave continues real-time detection upstream. The two layers operate independently and complement each other.

Build Your Compliance Archive on Iceberg

If you are evaluating a streaming compliance pipeline and need a production-ready Iceberg audit trail, RisingWave provides a complete reference architecture. Visit risingwave.com/iceberg-audit-trail/ to see how RisingWave, Iceberg, and Kafka work together to give your compliance team both real-time alerts and long-term archival without two separate pipelines.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.