You can build a streaming security audit trail by connecting RisingWave to PostgreSQL and MySQL CDC sources plus Kafka topics, enriching those events with SQL joins, and writing the results into Apache Iceberg tables on S3. Historical compliance queries then run against those tables using Spark or Trino with full time travel support.
Why use Apache Iceberg for security audit trails?
Apache Iceberg gives security teams an immutable, queryable record of every access event without requiring specialized audit databases. Each write creates a new snapshot; older snapshots are never overwritten, so your audit history is tamper-evident by design. Because Iceberg stores data on S3-compatible object storage, retention costs are a fraction of what you would pay for equivalent rows in a relational database. The table format is open: Spark, Trino, Flink, and DuckDB can all query the same files without any data movement. Snapshot history means you can reconstruct the exact state of the audit log at any past point in time, which satisfies the "point-in-time reconstruction" requirements in SOC 2, PCI DSS, and GDPR audits.
What are the limitations of traditional audit log approaches?
Most teams start with one of two approaches, and both have structural weaknesses.
Trigger-based auditing attaches database triggers to every table that needs tracking. Each INSERT, UPDATE, or DELETE fires the trigger synchronously, writing a shadow row to an audit table before the transaction commits. The problems compound at scale. Every write now carries extra latency because the audit write is in the same transaction. Triggers can be disabled or dropped by a privileged user, which means the audit record can be silently suppressed. Audit tables in the same database as production data share the same failure domain: if the database goes down, audit writes stop too.
Batch ETL pipelines poll source tables on a schedule, typically hourly or nightly, and load deltas into a data warehouse. The coverage gap between poll intervals means events that happen and are reversed before the next poll window are never captured. An attacker who creates a privileged account, uses it, and then deletes it within a two-hour batch window leaves no trace in the audit warehouse. Detection latency measured in hours is incompatible with real-time threat response.
How does RisingWave stream events into Iceberg audit tables?
RisingWave reads directly from PostgreSQL and MySQL replication logs using logical replication (CDC). This captures every INSERT, UPDATE, and DELETE with before-and-after row values, with no write latency added to the source database because replication is asynchronous. RisingWave also reads from Kafka topics, which is where application-layer events such as authentication attempts, API calls, and permission changes typically land.
Once RisingWave has both streams, you use SQL to join them in real time. A login event from Kafka, for example, can be enriched with the current employee record from the HR database CDC stream: you get the employee's department, their manager, and their current role attached to every authentication event before it is written to Iceberg.
RisingWave writes to Iceberg through its native Iceberg sink. The sink appends new rows to Iceberg data files on S3, commits a new snapshot for each batch, and updates the Iceberg metadata so that Spark or Trino can immediately query the new data. RisingWave does not query Iceberg tables with time travel. Time travel and historical compliance queries are the job of Spark, Trino, or another Iceberg-compatible query engine running directly against S3.
Step by step: building the audit trail pipeline
Step 1: Create a CDC source for the users table
CREATE SOURCE pg_users
WITH (
connector = 'postgres-cdc',
hostname = 'db.internal',
port = '5432',
username = 'replicator',
password = '${PG_REPL_PASSWORD}',
database.name = 'appdb',
slot.name = 'audit_slot',
publication.name = 'audit_pub'
);
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
email VARCHAR,
role VARCHAR,
department VARCHAR
) FROM pg_users TABLE 'public.users';
This registers the PostgreSQL replication slot and begins streaming row changes. RisingWave maintains the current state of the users table locally so you can join against it in stream processing.
Step 2: Create a Kafka source for authentication events
CREATE SOURCE auth_events (
event_id VARCHAR,
user_id BIGINT,
event_type VARCHAR, -- LOGIN_SUCCESS, LOGIN_FAILURE, LOGOUT, MFA_BYPASS
ip_address VARCHAR,
user_agent VARCHAR,
resource VARCHAR,
ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'auth.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest-offset'
)
FORMAT PLAIN ENCODE JSON;
Step 3: Create a materialized view that enriches events
CREATE MATERIALIZED VIEW enriched_audit_events AS
SELECT
a.event_id,
a.user_id,
a.event_type,
a.ip_address,
a.user_agent,
a.resource,
a.ts AS event_time,
u.email AS user_email,
u.role AS user_role,
u.department AS user_department,
CASE
WHEN a.event_type = 'MFA_BYPASS' THEN 'HIGH'
WHEN a.event_type = 'LOGIN_FAILURE' THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_level
FROM auth_events a
LEFT JOIN users u ON a.user_id = u.user_id;
The join runs continuously. Every new authentication event from Kafka is matched against the current user record from PostgreSQL CDC. If a user's role changes between events, the next event picks up the new role automatically.
Step 4: Write the enriched view to Iceberg
CREATE SINK audit_trail_sink
FROM enriched_audit_events
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'glue',
s3.region = 'us-east-1',
s3.bucket = 'your-audit-bucket',
database.name = 'security',
table.name = 'audit_trail'
);
RisingWave now continuously appends enriched audit rows to the Iceberg table on S3. Each commit creates an immutable Iceberg snapshot. No rows are ever modified or deleted in the sink table, which is the foundation of a tamper-evident record.
How do you query the audit trail for compliance?
Because RisingWave writes standard Iceberg data files, any Iceberg-compatible engine can query them. Spark and Trino are the most common choices for compliance workloads.
Find all high-risk events in the past 30 days (Trino):
SELECT
event_time,
user_email,
user_role,
event_type,
ip_address,
resource
FROM security.audit_trail
WHERE risk_level = 'HIGH'
AND event_time >= NOW() - INTERVAL '30' DAY
ORDER BY event_time DESC;
Time travel to reconstruct the audit log as of a specific snapshot (Spark SQL):
SELECT *
FROM security.audit_trail
VERSION AS OF 5432109876543
WHERE user_email = 'contractor@example.com'
ORDER BY event_time;
The VERSION AS OF clause tells Spark to read the Iceberg snapshot with that specific snapshot ID rather than the current head. You can list all snapshots for an Iceberg table with:
SELECT snapshot_id, committed_at, operation
FROM security.audit_trail.snapshots
ORDER BY committed_at DESC;
This lets auditors reconstruct exactly what the audit log showed at any past moment, satisfying requirements to demonstrate that the audit record itself has not been modified after the fact.
Identify all access by a terminated employee (Trino):
SELECT
MIN(event_time) AS first_seen,
MAX(event_time) AS last_seen,
COUNT(*) AS total_events,
COUNT(DISTINCT resource) AS resources_accessed
FROM security.audit_trail
WHERE user_email = 'former.employee@example.com'
AND event_time >= TIMESTAMP '2025-01-01 00:00:00';
What compliance requirements does this architecture address?
SOC 2 CC6 (Logical and Physical Access Controls)
SOC 2 CC6.2 requires organizations to log and monitor access to systems. CC6.3 requires that access is removed promptly when no longer needed and that removal is verifiable. The streaming pipeline satisfies both: every login, logout, privilege escalation, and MFA bypass is captured within seconds and stored in an immutable Iceberg table. An auditor can query the table for any date range and confirm no access records have been altered.
GDPR Article 30 (Records of Processing Activities)
Article 30 requires data controllers to maintain records of processing activities, including who accessed personal data and when. The enriched audit trail includes the user identity, department, role, the resource accessed (which can encode the data category), and the timestamp. Because Iceberg snapshots are immutable, the record cannot be retroactively modified, which supports demonstrating compliance to a supervisory authority.
PCI DSS Requirement 10 (Log and Monitor All Access to System Components)
PCI DSS Requirement 10 mandates logging all individual user access to cardholder data, logging all actions taken by any individual with root or administrative privileges, and retaining logs for at least 12 months with the most recent 3 months available for immediate analysis. The Iceberg table satisfies the retention requirement because S3 lifecycle policies can enforce minimum retention. The real-time streaming pipeline satisfies the "immediate analysis" requirement because the lag between an event occurring and it being queryable in Iceberg is typically under 30 seconds.
Frequently asked questions
Does RisingWave support time travel queries on Iceberg tables?
No. RisingWave writes to Iceberg tables but does not query them with time travel. Time travel queries using VERSION AS OF or TIMESTAMP AS OF are executed by Spark, Trino, Flink, or another Iceberg-compatible query engine running directly against your S3 storage.
What happens to audit records if RisingWave restarts?
RisingWave maintains a persistent checkpoint of its processing progress in S3. On restart, it resumes from the last committed checkpoint, so no CDC or Kafka events are lost. The Iceberg sink only commits a new snapshot after the data has been durably written to S3, so there are no partial writes.
Can we audit UPDATE and DELETE operations from the database, not just INSERT?
Yes. RisingWave reads from PostgreSQL and MySQL replication logs, which capture INSERT, UPDATE, and DELETE with both before-and-after row values. You can write a materialized view that surfaces the op field (insert, update-before, update-after, delete) and include it in the Iceberg sink for a complete change history.
How do we prevent the audit records themselves from being tampered with?
The Iceberg sink in RisingWave operates in append-only mode: it never modifies or deletes existing Iceberg data files. Restrict S3 bucket permissions so that only the RisingWave service account has write access, and lock down the Glue or Hive metastore so that table definitions cannot be altered. Enable S3 Object Lock in Compliance mode for the audit bucket to prevent deletion of objects even by privileged AWS users.
Get started
RisingWave is open source under the Apache 2.0 license. You can run it locally with Docker in under five minutes and connect it to a PostgreSQL CDC source immediately.

