Real-time threat detection monitors and analyzes data in real time to identify and respond to potential security threats as they occur. In contrast to security measures that rely on periodic scans or retrospective analysis, real-time threat detection systems provide immediate alerts and can initiate automated responses to mitigate risks without significant delays.

Real-time threat detection has many different use cases, such as identifying and preventing distributed denial-of-service (DDoS) attacks, detecting threats from log analysis, as well as analysis and real-time reporting based on Common Vulnerabilities and Exposures (CVE).

In this article, you'll learn how to use RisingWave and Kafka to build a monitoring solution that can identify cyberthreats in real time.


What Is Real-Time Threat Detection?


Real-time threat detection is achieved by using a combination of event-driven architecture and streaming analytics. Data from different sources (such as network traffic, system logs, and application activity) is continuously collected and processed. This data is then streamed in real time, often through platforms like Kafka, which allows for efficient ingestion of large amounts of data. During this process, sophisticated algorithms and machine learning models can analyze the data stream to look for patterns and anomalies that could indicate potential threats. Once suspicious activity is detected, the system can alert administrators and trigger automated security protocols to mitigate the potential threat. This offers an advanced layer of protection in an increasingly digital world where threats can quickly evolve and cause substantial damage.

Real-time threat detection can identify and prevent DDoS attacks, which are attacks that flood a network or service with excessive traffic to destabilize it and make it unavailable to its intended users. Real-time threat detection systems can recognize the early signs of an attack, like receiving traffic from locations where you typically don't have users or a sudden spike in traffic that is much higher than usual. All of this could allow you to respond quickly with countermeasures to prevent or minimize disruption.

You can use real-time threat detection to analyze items such as audit logs, which are a rich source of information about your system or network activities. Real-time threat detection tools analyze these logs as they're generated and identify any unusual or suspicious behavior that could indicate security breaches or malicious activities.

Real-time threat detection can also constantly monitor new publications in the CVE database, which is a list of publicly known cybersecurity vulnerabilities. When a new CVE is published, the systems can analyze and determine how the vulnerability could affect the digital assets of the organization, such as by enabling unauthorized access, data theft, or service disruption. Depending on the kind of threat, it could also trigger automated responses like isolating the affected systems, applying security updates, or sending alerts to the cybersecurity team.


Utilizing RisingWave to Build a Real-Time Threat Detection System


Imagine you are part of an e-commerce company that generates a vast amount of network traffic and server log data. This data encompasses information like timestamps, IP addresses, usernames, actions, resources, and status codes. The company wants you to implement a threat detection system that utilizes RisingWave. You should design the system to identify potential cyberthreats by observing unusual patterns or activities in the server log data.

The threat detection system consists of several different parts. First, you need to establish a streaming platform to realize real-time data pipelines. For this, you choose Kafka because it's known for its ability to handle and process large amounts of data. The server log data will be fed into a Kafka topic named log_data, managed within a Docker container, and will act as the main data source for RisingWave.

Following that, you'll set up RisingWave to connect to this topic. In RisingWave, you'll apply specific SQL queries in order to identify entries in the server log data that might indicate a potential cyberthreat. These identified data records will be sent back to another Kafka topic called anomalies.

Close
Featured This architecture diagram depicts how the application works.


Prerequisites


You'll need the following to complete this tutorial:

  • A Python virtual environment created and activated; all Python-related commands should be run in this environment
  • Python 3.11 or higher
  • A recent version of Docker installed on your machine (this tutorial uses Docker 24.0.6)
  • A Kafka instance running on Docker; Apache Kafka is a distributed event streaming platform, and you'll need an instance running for handling data streams
  • The psql Postgres interactive terminal


Generating the Data Set


In this section, you'll generate a synthetic data set for the log analysis use case. To generate the data, create a file called generate_data.py in your home directory and paste the code below into this file:


import json

import random

from datetime import datetime

def generate_synthetic_log_entry():

    # Define a set of fake IP addresses to simulate network traffic

    ip_addresses = ["192.168.1.{}".format(i) for i in range(1, 101)]

    # Define a set of usernames

    users = ["user{}".format(i) for i in range(1, 21)]

    # Define a set of actions

    actions = ["login", "logout", "access", "modify", "delete"]

    # Define a set of resources

    resources = ["/api/data", "/api/user", "/api/admin", "/api/settings", "/api/info"]

    # Define a set of status codes

    status_codes = [200, 301, 400, 401, 404, 500]

    # Generate a random log entry

    log_entry = {

        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),

        "ip_address": random.choice(ip_addresses),

        "user": random.choice(users),

        "action": random.choice(actions),

        "resource": random.choice(resources),

        "status_code": random.choice(status_codes)

    }

    return log_entry

# File to save the synthetic log entries in JSON format

json_file_path = 'synthetic_log_data.json'

# Generate and save synthetic log entries

with open(json_file_path, 'w') as file:

    for _ in range(100):

        log_entry = generate_synthetic_log_entry()

        file.write(json.dumps(log_entry) + '\n')  # Write each JSON object in one line

json_file_path

The script generates synthetic log entries with a timestamp, IP address, username, action, resource, and status code. These entries simulate typical logs you might find in server or application log files. The data is saved as a JSON file called synthetic_log_data.json.

To execute the script and generate the data, open a terminal in your main directory and run the following command in the virtual environment that you created beforehand:

python generate_data.py

After the data set is generated, the first rows in the generated synthetic_log_data.json file should look similar to this:

{"timestamp": "2023-12-15 14:04:38", "ip_address": "192.168.1.41", "user": "user14", "action": "login", "resource": "/api/user", "status_code": 401} 

{"timestamp": "2023-12-15 14:04:38", "ip_address": "192.168.1.71", "user": "user19", "action": "modify", "resource": "/api/user", "status_code": 200} 

{"timestamp": "2023-12-15 14:04:38", "ip_address": "192.168.1.48", "user": "user7", "action": "logout", "resource": "/api/admin", "status_code": 401}


Installing RisingWave in Docker


You're going to run RisingWave in a Docker container. Installing RisingWave with Docker is a straightforward process. Docker simplifies installation by containerizing the RisingWave environment, making it easier to manage and isolate from other system dependencies. To install RisingWave, open a terminal or command prompt and get the latest RisingWave Docker image using this Docker command:

docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground

After the image is pulled and RisingWave is up and running, you can connect to it via the Postgres interactive terminal, psql. To do so, open a new terminal window and run the following command:

psql -h localhost -p 4566 -d dev -U root

If the connection was successful, you should see something like this:

psql (14.10 (Ubuntu 14.10-0ubuntu0.22.04.1), server 9.5.0)

Type "help" for help.

dev=> 


Creating Kafka Topics


In this section, you'll create two Kafka topics: log_data and anomalieslog_data is the source for the streaming data. You'll use anomalies later in the tutorial as a sink that will receive processed data from RisingWave. To create the individual topics, execute the following command in your terminal:

docker exec container_name kafka-topics.sh --create --topic topic_name --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092

In this command, container_name should be replaced with the name of your Docker container where Kafka is running. When executing the command, change topic_name to log_data or anomalies accordingly.

To verify that the topics have been successfully created, execute the following command in the terminal:

docker exec container_name kafka-topics.sh --list --bootstrap-server kafka:9092

The result should look like this:

anomalies

log_data

This output verifies that the topics log_data and anomalies were successfully created.


Connecting RisingWave to Kafka


You'll now create a "source," which is an external data stream or a storage system from which RisingWave ingests data for processing and a fundamental component in RisingWave's architecture. A source acts as an entry point for data. It defines how data is ingested from external systems into RisingWave. In this case, the source will be the previously created Kafka topic log_data.

Create a new Python file, create_data_source.py, in the home directory and paste the following code into it:

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")

conn.autocommit = True # Set queries to be automatically committed

with conn.cursor() as cur:

    cur.execute("""

CREATE SOURCE IF NOT EXISTS log_data2 (

 timestamp varchar,

 ip_address varchar,

 user varchar,

 action varchar,

 resource varchar,

 status_code varchar

 )

WITH (

 connector='kafka',

 topic='log_data',

 properties.bootstrap.server='localhost:9093',

 scan.startup.mode='earliest'

 ) FORMAT PLAIN ENCODE JSON;""") # Execute the query

conn.close() # Close the connection

The script connects RisingWave and the previously created Kafka topic log_data using psycopg2, a PostgreSQL database adapter for Python. The core of the script is an SQL command to create a new data source named log_data in RisingWave. This source is designed to ingest data from the Kafka topic. The data structure includes fields from the created synthetic data set.

Executing the script create_data_source.py will create the source. But first, you'll need to install Psycopg, which is a popular PostgreSQL database adapter for the Python programming language. To install the binary, please follow these instructions.

After you have installed the binary, open a terminal in the home directory and execute the script in the virtual environment that you created beforehand by using the following command:

python create_data_source.py


Producing Data to a Kafka Topic


You'll also need to create a script that reads the log data from synthetic_log_data.json and produces it to the Kafka topic log_data. But first, you must install the Confluent Kafka-Python library (this tutorial uses version 2.2.0), which will provide the Consumer and Producer classes needed. You need to install the library in the Python virtual environment that you created beforehand. To install the library, open a terminal and execute the following command in your activated virtual environment:

pip install confluent-kafka

Following this, create a Python file called produce.py in the home directory and paste the following code into it:

import json

from confluent_kafka import Producer

# Configuration for connecting to Kafka

config = {

    'bootstrap.servers': 'localhost:9093',  # Replace with your Kafka server address

}

# Create a producer instance

producer = Producer(config)

# Topic to produce to

topic_name = 'log_data'

# Callback function to check if message delivery was successful

def delivery_report(err, msg):

    if err is not None:

        print(f'Message delivery failed: {err}')

    else:

        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Function to produce messages

def produce_message(data):

    # Trigger any available delivery report callbacks from previous produce() calls

    producer.poll(0)

    # Asynchronously produce a message; the delivery report callback will be triggered once the message has been successfully produced or failed

    producer.produce(topic=topic_name, value=json.dumps(data), callback=delivery_report)

# Produce messages from the data file

def produce_data_from_file(file_path):

    with open(file_path, 'r') as file:

        for line in file:

            record = json.loads(line.strip())

            produce_message(record)

    # Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered

    producer.flush()

# Path to your data file

file_path = 'synthetic_log_data.json'  # Replace with your actual file path

# Start producing data to the Kafka topic

produce_data_from_file(file_path)

This Python script sets up a Kafka producer to send messages to the Kafka topic log_data. It begins by configuring the Kafka producer with the necessary server address. The core functionality is in the produce_message function, which sends messages asynchronously to the specified Kafka topic. produce_data_from_file reads data from synthetic_log_data.json, processes each line as a JSON object, and sends these objects as messages to the Kafka topic.

To execute the script, open a terminal in the home directory and run the command in your virtual environment:

python log_data_producer.py

If the data is successfully sent to the Kafka topic, the output in the terminal should look like this:

Message delivered to log_data [0]

…output omitted…

Message delivered to log_data [0]


Threat Detection with RisingWave


In this section, you'll transform and process the source data for threat detection. Specifically, you'll define an SQL query that filters out the log data from the Kafka source log_data that may indicate a cybersecurity threat, using RisingWave's materialized views. A materialized view is a database object that stores the result of a query for efficient data retrieval. In contrast to a standard view, which dynamically generates results at query time, a materialized view precomputes and saves the query results. This feature complements RisingWave's real-time analytics and stream processing capabilities, as it allows rapid access to frequently queried data. This is useful for complex aggregations or calculations over streaming data.

To generate a materialized view for cyberthreat detection, create a Python file called create_mv.py in your home directory and paste the following code into it:

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")

conn.autocommit = True # Set queries to be automatically committed

with conn.cursor() as cur:

    cur.execute("""

    CREATE MATERIALIZED VIEW anomaly_detection_by_error_status AS

    SELECT 

        ip_address, 

        COUNT(*) AS error_count

    FROM 

        log_data

    WHERE 

        status_code::integer >= 400

    GROUP BY 

        ip_address

    HAVING 

        COUNT(*) > 3; -- Threshold for error occurrences

 """) # Execute the query

conn.close() # Close the connection

The materialized view anomaly_detection_by_error_status in this script selects IP addresses with a high frequency of error status codes that might indicate cyberthreats. In particular, the query filters records from the log_data topic where the status_code indicates an error (400 and above). Following that, the query counts the number of these error occurrences for each ip_address. Using the HAVING clause only includes those IP addresses where the count of error occurrences is greater than 3 in the materialized view.

To create this materialized view, open a terminal and execute the script using the following command in your virtual environment:

python create_mv.py

As previously mentioned, the materialized view stores the result of a query. In order to access the result and print it in the console, create a Python file called show_result.py and paste the following code into it:

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")

conn.autocommit = True 

with conn.cursor() as cur:

    cur.execute("""

 SELECT * FROM anomaly_detection_by_error_status; """) # Execute the query

    # Fetch all rows from the executed query

    rows = cur.fetchall()

    # Iterate through the rows and print each one

    for row in rows:

        print(row)

conn.close() 

As before, run the script in the terminal in your virtual environment by using the following command:

python show_result.py

The result of the query is then printed in the terminal and should look something like this:

('192.168.1.43', 4) ('192.168.1.55', 4) ('192.168.1.59', 4) ('192.168.1.52', 4) ('192.168.1.33', 5) ('192.168.1.62', 4) ('192.168.1.60', 5) ('192.168.1.94', 6)

The output displays each IP address from the Kafka source log_data along with a count of how many times that particular IP address encountered an error code. This could be an indication that a cyberattack has occurred from these specific IP addresses.


Sending the Data Back to Kafka


In RisingWave, a sink is the endpoint for data processed within its stream processing system. It represents where the final output, after real-time analysis and computation, is sent. This can be a database, a file system, or another data management system. In this use case, the previously created Kafka topic anomalies will serve as a sink to which you will send the log data that was identified as a potential cyberthreat.

To set up your sink, create a file called sink.py and paste the following Python script into it:

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")

conn.autocommit = True 

with conn.cursor() as cur:

    cur.execute("""

        CREATE SINK send_data_to_kafka FROM anomaly_detection_by_error_status

        WITH (

        connector='kafka',

        properties.bootstrap.server='localhost:9093',

        topic='anomalies'

        ) FORMAT PLAIN ENCODE JSON (

        force_append_only='true',

        );""") # Execute the query

conn.close() 

This Python script connects to RisingWave and executes an SQL command to create a Kafka sink called send_data_to_kafka. This sink is configured to send data from the previously created RisingWave view anomaly_detection_by_error_status to the Kafka topic anomalies. The script can be executed in the terminal by using the following command:

python sink.py

You can execute the following command in a terminal to verify that the data was successfully sent from RisingWave to the specified Kafka topic anomalies:

docker exec -it kafka bash

Executing the command gives you access to the Bash shell of the running container kafka. This allows you to directly interact with the Kafka environment inside the container. Inside the container, execute this command:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic anomalies --from-beginning

By executing this command, you list all the data from the topic anomalies in the terminal. The output should resemble the following:

{"error_count":6,"ip_address":"192.168.1.94"} {"error_count":4,"ip_address":"192.168.1.52"} {"error_count":5,"ip_address":"192.168.1.60"} {"error_count":5,"ip_address":"192.168.1.33"} {"error_count":4,"ip_address":"192.168.1.55"} {"error_count":4,"ip_address":"192.168.1.59"} {"error_count":4,"ip_address":"192.168.1.43"} {"error_count":4,"ip_address":"192.168.1.62"}

As you can see, the data identified as a potential threat was successfully sent from RisingWave to the Kafka topic anomalies.


Streamlining Monitoring and Alerting Pipelines for Threat Response


After you identify potential cyberthreats, you also need to set up proper monitoring and alerting pipelines. To do this, you can integrate the Kafka topic anomalies, which contains potential threat data, with a monitoring tool. The integration would involve building a connection between the Kafka topic and the monitoring tool so that the tool can continuously receive and analyze data from the topic.

You can achieve this with a data pipeline or an API that allows the monitoring tool to subscribe to the Kafka topic. Once connected, the monitoring tool will start to receive data streamed from the anomalies topic. It can then check the incoming data to estimate the severity of the threat. Based on some predetermined criteria, the tool can classify threats into different security categories and prioritize responses based on the category. For example, several failed login attempts from a single source in a location where no traffic is expected might be flagged as a more severe cyberthreat. In contrast, the monitoring tool might classify a series of successful logins to a low-sensitivity internal application outside of normal working hours as a less severe cyberthreat.

Once the monitoring tool has classified the threats into different categories, the next step involves implementing an automated alerting system. The alerting system could immediately notify the cybersecurity team when a potential high-severity threat is identified. The alerts could be tailored to include information about the threat, like its origin, type, and potential impact. All of this information could help the security team quickly assess and prioritize the response to the threat.

Conclusion

In this tutorial, you learned how to build a real-time threat detection system using RisingWave and Kafka. Using RisingWave for stream processing and Kafka for data transport forms a robust infrastructure for real-time threat detection. The ability to create materialized views in RisingWave provides efficient querying and analysis of streaming data, while Kafka ensures reliable data delivery between different components of the system.

For those interested in a deeper dive, the complete code from this tutorial is available in this GitHub repository, which contains all the necessary scripts and configurations.

Avatar

Artem Oppermann

Senior AI Research Engineer

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.