Overview


In this blog, we’ll explore the design of a real-time security monitoring system that combines RisingWave, WarpStream, and Grafana. This architectural design ensures scalability and responsiveness, enabling quick identification and mitigation of security risks in web applications. By leveraging the synergies of RisingWave, WarpStream, and Grafana, the system offers comprehensive security monitoring capabilities for improved threat detection and response.

WarpStream, a data streaming platform compatible with Apache Kafka, serves as the central messaging backbone for collecting and distributing website audit logs in real-time. It is built directly on top of object storage. These logs are seamlessly ingested into RisingWave, a streaming database, enabling continuous analysis and filtering of the incoming data stream. By leveraging RisingWave's powerful processing capabilities, the system effectively detects potential security threats.

Grafana creates a unified real-time dashboard that presents detailed insights into user activities, website referrer analytics by user interaction, user status code analysis, and security profiling of users. This holistic view enables efficient monitoring and analysis of security-related metrics. Additionally, the system can generate alerts and support automated actions in response to detected threats, thereby adopting a proactive approach to security monitoring.


Step 1: Set up WarpStream


First, we generate random website logs and then send all this data to a topic in WarpStream, which is then ingested into RisingWave. For more information, refer to data ingestion from WarpStream into RisingWave in the documentation.

The sample data sent to the WarpStream topic contains information about user activities, requests, and responses on a website.

{
  "request_timestamp": "2024-01-25T12:30:45.678Z",
  "ip_address": "192.168.1.100",
  "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.4567.89 Safari/537.36",
  "url": "<https://streamprocessingdemo.com>",
  "http_method": "GET",
  "status_code": 200,
  "response_time": 120,
  "referrer": "google",
  "user_id": "12345",
  "username": "john_doe",
  "user_action": "view_page",
  "security_level": "low"
}


Step 2: Ingest data streams into RisingWave


Now that we have sent the data stream to WarpStream (in JSON), we can ingest the stream with the following SQL statement. For more information on setting up RisingWave and getting started, refer to the Quick start in RisingWave documentation.


Create source

This query configures a source named website_logs_source to ingest data from the WarpStream topic named website_logs. The query also defines the data schema using JSON format, including fields such as request timestamp, IP address, user agent, URL, HTTP method, status code, response time, and more.

CREATE SOURCE website_logs_source (
    request_timestamp TIMESTAMP,
    ip_address VARCHAR,
    user_agent TEXT,
    url TEXT,
    http_method VARCHAR,
    status_code INTEGER,
    response_time INTEGER,
    referrer TEXT,
    user_id VARCHAR,
    username VARCHAR,
    user_action VARCHAR,
    security_level VARCHAR
)WITH (
  connector='kafka',
  topic = 'website_logs', 
  properties.bootstrap.server = 'message_queue:29092',
  scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;


Create materialized views for data analysis in RisingWave


Analyzing website user metrics


This query establishes a materialized view named website_user_metrics to generate aggregated statistics on user activity using website logs. The materialized view calculates various metrics, including total requests, response times, error counts, login/logout counts, and unique IP addresses, within one-minute intervals. These aggregated statistics are then presented for each user, along with the corresponding time window.

CREATE MATERIALIZED VIEW website_user_metrics AS  
WITH UserActivityStats AS (
    SELECT  
        username,
        COUNT(username) AS total_requests,
        window_start,
        window_end,
        MIN(response_time) AS min_response_time,
        MAX(response_time) AS max_response_time,
        AVG(response_time) AS avg_response_time,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY response_time) AS median_response_time,
        SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) AS total_errors,
        SUM(CASE WHEN user_action = 'login' THEN 1 ELSE 0 END) AS login_count,
        SUM(CASE WHEN user_action = 'logout' THEN 1 ELSE 0 END) AS logout_count,
        COUNT(DISTINCT ip_address) AS unique_ips
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY username, window_start, window_end
)

SELECT
    username,
    total_requests,
    min_response_time,
    max_response_time,
    avg_response_time,
    median_response_time,
    total_errors,
    login_count,
    logout_count,
    unique_ips,
    window_start,
    window_end
FROM UserActivityStats;


Ranking top user actions in a time window


This query generates a materialized view called top_user_actions to identify and rank the three most frequent user actions within one-minute intervals of website logs. It utilizes window functions to assign rankings based on the occurrence of actions. The materialized view presents the action name, the count of occurrences, and the corresponding time window for each ranked action.

CREATE MATERIALIZED VIEW top_user_actions AS
WITH ranked_user_actions AS (
    SELECT  
        user_action,
        COUNT(user_action) AS count_user_activity,
        window_start,
        window_end,
        ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY COUNT(user_action) DESC) AS action_rank
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY user_action, window_start, window_end
)
SELECT
    user_action,
    count_user_activity,
    window_start,
    window_end
FROM ranked_user_actions
WHERE action_rank <= 5
ORDER BY window_start, action_rank;


Analyzing referrer activity of website by user interaction


This query establishes a materialized view called referrer_activity_summary to provide a summary of website activity based on referrers within one-minute intervals. The materialized view aggregates multiple metrics, including referrer visit count, page visits, interactions, content interactions, and window interactions. These metrics are presented alongside the corresponding time window for analysis and evaluation.

CREATE MATERIALIZED VIEW referrer_activity_summary AS
SELECT
    referrer,
    COUNT(referrer) AS referrer_visit_count,
    SUM(CASE WHEN user_action IN ('view_page', 'navigate_page') THEN 1 ELSE 0 END) AS page_visits,
    SUM(CASE WHEN user_action IN ('submit_form', 'login', 'logout') THEN 1 ELSE 0 END) AS interactions,
    SUM(CASE WHEN user_action IN ('scroll_page', 'download_file', 'upload_file') THEN 1 ELSE 0 END) AS content_interactions,
    SUM(CASE WHEN user_action IN ('close_window', 'open_new_tab') THEN 1 ELSE 0 END) AS window_interactions,
    window_start,
    window_end
FROM TUMBLE(website_logs_source, request_timestamp, INTERVAL '1 MINUTES')     
GROUP BY referrer, window_start, window_end;


User HTTP status code analysis


This query establishes a materialized view called status_code_analysis_summary to analyze and summarize the distribution of HTTP status codes within one-minute intervals of website logs. The materialized view calculates various metrics, including the count of each status code, the average response time for each status code, and the cumulative count and percentage of status codes within each time window. These metrics are presented alongside the corresponding time window, ordered in descending order by both the time window and status code.

CREATE MATERIALIZED VIEW status_code_analysis_summary 
WITH Status_Code_Analysis AS (
    SELECT  
        status_code,
        COUNT(status_code) AS count_status_code,
        AVG(response_time) AS avg_response_time,
        SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end ORDER BY status_code) AS cumulative_count,
        100.0 * SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end ORDER BY status_code) / SUM(COUNT(status_code)) OVER (PARTITION BY window_start, window_end) AS cumulative_percentage,
        window_start,
        window_end
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY status_code, window_start, window_end
)

SELECT  
    status_code,
    count_status_code,
    avg_response_time,
    cumulative_count,
    cumulative_percentage,
    window_start,
    window_end
FROM Status_Code_Analysis
ORDER BY window_start DESC, status_code;


User security profiling and security level analysis


This query establishes a materialized view named security_level_analysis_summary to analyze and summarize security levels within one-minute intervals of website logs. The materialized view calculates various metrics, including the count of each security level, the average response time for each security level, and the median count of security levels within each time window. These metrics are presented alongside the corresponding time window, ordered in descending order by both the time window and the median count of security levels.

CREATE MATERIALIZED VIEW security_level_analysis_summary AS
WITH Security_Profiling AS (
    SELECT  
        security_level,
        COUNT(security_level) AS count_security_level,
        AVG(response_time) AS avg_response_time,
        PERCENTILE_DISC (0.5) WITHIN GROUP (ORDER BY security_level) AS median_count_security_level,
        window_start,
        window_end
    FROM TUMBLE (website_logs_source, request_timestamp, INTERVAL '1 MINUTES')
    GROUP BY security_level, window_start, window_end
)

SELECT  
    security_level,
    count_security_level,
    avg_response_time,
    median_count_security_level,
    window_start,
    window_end
FROM Security_Profiling
ORDER BY window_start DESC, median_count_security_level;


Step 3: Send the data from RisingWave to Apache Grafana for visualization


We’ll configure Grafana to read data from RisingWave and build visualizations.


Connect RisingWave to Grafana


To utilize RisingWave as a data source in Grafana and create visualizations and dashboards, follow the instructions provided in Configure Grafana to read data from RisingWave. Once the connection between RisingWave and Grafana is established, you can incorporate materialized views from RisingWave as tables to design charts and build a comprehensive dashboard.


Visualizing data with Grafana: table, charts, and dashboards


This table is generated from the website_logs_source source that was create earlier.

Close
Featured

This chart is generated from referrer_activity_summary materialized view to summarize website activity based on referrers.

Close
Featured

This chart is generated from a materialized view named website_user_metrics to provide aggregated statistics on user activity based on website logs.

Close
Featured

This chart is generated from the security_level_analysis_summary materialized view to analyze and summarize security levels within one-minute intervals in website logs.

Close
Featured

This chart is created on a materialized view top_user_actions to identify and rank the top five user actions based on their frequency within one-minute intervals of website logs.

Close
Featured

This chart is generated from the status_code_analysis_summary materialized view to analyze and summarize the distribution of HTTP status codes over one-minute intervals in website logs.

Close
Featured

This unified dashboard offers a comprehensive collection of charts for real-time monitoring of a website or online platform. It provides valuable insights into user and website activity, including metrics based on referrers, distribution of HTTP status codes, top five user actions, and summaries of security levels. By combining these charts, the dashboard provides a holistic view and enables enhanced threat detection and response through comprehensive security monitoring capabilities.

Close
Featured

Conclusion

In this blog, we have presented the development of a real-time security threat monitoring system that integrates RisingWave, WarpStream, and Grafana. The setup process for the entire system is quite straightforward. To monitor each metric, you only need to create a single materialized view in RisingWave and visualize it in Grafana. The analysis showcased in the blog serves as an example and inspiration. If your data points are readily available and provide the necessary data, you can express sophisticated analytical and transformational logic. Feel free to explore further and reach out to us if you have any questions or need assistance.

Avatar

Fahad Shah

Developer Advocate

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.