Apache Iceberg tables that receive continuous writes from RisingWave require regular maintenance to prevent performance degradation: compaction merges small checkpoint files into larger ones, snapshot expiration reclaims S3 storage, and orphan file cleanup removes abandoned Parquet files. These operations are safe to run concurrently with live streaming ingestion.
The Small File Problem in Streaming Lakehouses
Every time RisingWave checkpoints (typically every 30–60 seconds), it writes one or more Parquet files to the Iceberg table. Over a 24-hour period, a single active sink produces thousands of small files. Small files hurt query performance dramatically:
- File listing overhead: S3
ListObjectscalls scale with file count, not file size - Parquet footer reads: Each file requires a separate HTTP request to read metadata
- Parallelism waste: Trino/Spark may spin up thousands of tiny tasks for small files
A table with 10,000 files of 1 MB each queries 10x–100x slower than the same data in 100 files of 100 MB each. Iceberg's maintenance procedures exist to solve this problem without blocking readers or writers.
Maintenance Operations Overview
| Operation | Purpose | Frequency | Safe During Writes? |
optimize / compaction | Merge small files | Daily or continuous | Yes |
expire_snapshots | Remove old snapshot metadata | Weekly | Yes |
remove_orphan_files | Delete unreferenced files | Weekly | Yes (with delay) |
rewrite_manifests | Consolidate manifest files | Monthly | Yes |
Setting Up the Streaming Pipeline
Before configuring maintenance, establish the streaming ingestion:
-- High-frequency IoT sensor stream
CREATE SOURCE sensor_readings (
sensor_id VARCHAR,
device_type VARCHAR,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION,
reading_ts TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'iot.sensors',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE JSON;
-- 1-minute aggregations per sensor
CREATE MATERIALIZED VIEW sensor_metrics AS
SELECT
sensor_id,
device_type,
window_start,
window_end,
AVG(temperature) AS avg_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(humidity) AS avg_humidity,
AVG(pressure) AS avg_pressure,
COUNT(*) AS reading_count
FROM TUMBLE(sensor_readings, reading_ts, INTERVAL '1 MINUTE')
GROUP BY sensor_id, device_type, window_start, window_end;
-- Write to Iceberg — this creates many small files over time
CREATE SINK sensor_metrics_sink AS
SELECT * FROM sensor_metrics
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://iot-lakehouse/warehouse',
s3.region = 'us-east-1',
database.name = 'iot',
table.name = 'sensor_metrics'
);
At 1,000 sensors writing every minute, this sink produces 1,440,000 files per day in the worst case (one file per minute per partition). In practice, RisingWave batches writes more efficiently, but compaction is still essential.
File Compaction (Optimize)
Run compaction in Trino to merge small files into target-size files:
-- In Trino: compact files smaller than 128MB into ~128MB files
ALTER TABLE iceberg.iot.sensor_metrics
EXECUTE optimize(file_size_threshold => '128MB');
In Spark (PySpark):
from pyiceberg.catalog import load_catalog
catalog = load_catalog("rest", uri="http://iceberg-catalog:8181")
table = catalog.load_table("iot.sensor_metrics")
# Compact files (PyIceberg 0.6+)
table.rewrite_data_files()
Compaction is safe during active writes: RisingWave continues writing new files while compaction rewrites old ones. Iceberg's snapshot isolation means readers and writers never block each other. The compaction job produces a new snapshot that replaces the small files with larger ones; RisingWave's next checkpoint will build on top of this new snapshot.
Snapshot Expiration
Each RisingWave checkpoint produces a new Iceberg snapshot. Snapshots accumulate metadata in the catalog and keep Parquet files from being garbage collected. Expire old snapshots to allow cleanup:
-- In Trino: expire snapshots older than 7 days
ALTER TABLE iceberg.iot.sensor_metrics
EXECUTE expire_snapshots(retention_threshold => '7d');
After expiration, the data files referenced only by expired snapshots become eligible for garbage collection. They are not deleted immediately — that requires the orphan file cleanup step.
Warning: Never expire snapshots that your query engine relies on for time travel. If your SLA requires 30 days of historical queries, retain at least 30 days of snapshots.
Orphan File Cleanup
Orphan files are Parquet files on S3 that are not referenced by any Iceberg snapshot. They arise from:
- Failed RisingWave checkpoint commits (files written but snapshot not committed)
- Partial compaction jobs that were interrupted
- Bugs in older engine versions
-- In Trino: remove orphan files older than 3 days
-- The 3-day buffer ensures files from in-progress operations are safe
ALTER TABLE iceberg.iot.sensor_metrics
EXECUTE remove_orphan_files(retention_threshold => '3d');
Always set a generous retention threshold (at least 24 hours) to avoid accidentally deleting files from in-flight transactions.
Automating Maintenance
For production pipelines, automate maintenance with a scheduled job. Here is a Python script suitable for Airflow or any cron-based scheduler:
import subprocess
from datetime import datetime
TRINO_HOST = "trino.internal:8080"
TABLES = [
"iceberg.iot.sensor_metrics",
"iceberg.analytics.session_summary",
]
def run_trino(query):
result = subprocess.run(
["trino", f"--server={TRINO_HOST}", "--execute", query],
capture_output=True, text=True
)
return result.stdout
for table in TABLES:
# Daily: compaction
run_trino(f"ALTER TABLE {table} EXECUTE optimize(file_size_threshold => '128MB')")
# Weekly: snapshot expiration
if datetime.now().weekday() == 0: # Monday
run_trino(f"ALTER TABLE {table} EXECUTE expire_snapshots(retention_threshold => '7d')")
run_trino(f"ALTER TABLE {table} EXECUTE remove_orphan_files(retention_threshold => '3d')")
print(f"Maintenance complete for {table}")
Monitoring Table Health
Track file count and average file size to know when compaction is needed:
-- In Trino: inspect file statistics for an Iceberg table
SELECT
partition,
file_count,
total_size / 1024 / 1024 AS total_size_mb,
total_size / file_count / 1024 / 1024 AS avg_file_size_mb
FROM iceberg.iot."sensor_metrics$files"
ORDER BY file_count DESC
LIMIT 20;
Flag any partition with average file size below 32 MB for compaction.
FAQ
Q: Does compaction interrupt active RisingWave writes? A: No. Compaction produces a new snapshot alongside the write stream. RisingWave's next checkpoint will commit on top of the compacted snapshot. There is no blocking or locking.
Q: How often should I run compaction for a high-frequency sink? A: For sinks writing every 30–60 seconds, daily compaction is a good starting point. Monitor average file size; if it drops below 32 MB, increase compaction frequency or consider a longer checkpoint interval.
Q: Can I use Iceberg's auto-compaction service instead of scheduled jobs? A: Some catalog implementations (e.g., Tabular, AWS Glue with Iceberg integration) offer automatic compaction. For open-source stacks, scheduled jobs (Airflow, cron) are the standard approach.
Q: What happens if I run expire_snapshots too aggressively? A: If you expire a snapshot that a long-running Trino query is using for read isolation, the query may fail with a "snapshot not found" error. Use a retention threshold at least 2x your longest expected query runtime.
Q: Does manifest rewriting help query performance? A: Yes. As files accumulate, the manifest files (which list data files) also grow large. Rewriting manifests consolidates them, reducing the catalog reads needed before each query. Run this monthly for busy tables.
Get Started
Keep your Iceberg streaming lakehouse healthy with RisingWave:

