Towards Proactive Agent: Build Event-Driven Agent with Streaming Databases

Towards Proactive Agent: Build Event-Driven Agent with Streaming Databases

Q&A agent only generates response when users ask it a question. Does it work? It works very well. LLM is already a part of my life. It told me everything I need to know about going to an unfamiliar country. It also helps me to improve my presentation. But all these magic only happen when I give the LLM the good question and the precise description about the current situation. Is it possible to implement a proactive agent which is able to know what to do without instructions from human? To achieve that, the agent needs to know what’s happening in real-time. There are countless events happening every moment. The agent needs “eyes and ears” to monitor the changing environment, so that it can take action when something happen, before we come up with the good question and the precise description about the current environment.

Proactive Agent

Let’s take a look at examples about the proactive agent:

: Send an email to me when the sale amount break the monthly record.
: Turn on the AC as I approach home.
: Sell 40% TSLA stocks when the return is above 3%.

Most of the LLMs can send emails for you, it can turn on your AC if it have access to the Home Assistant APIs. It can also sell TSLA stocks for you if you allow it. But these things can only happen when you ask it to do it. It cannot be invoked by some specific events, like when the return rate is above three percents. Or turn on the AC as you’re approaching home. Of course we can let our brilliant developers to finish these features. But what if we want it to sell stocks when some other factors touch the threshold? Or maybe LLMs can process every single events for us.

Letting the LLM process every single event is not possible for the moment. Making the LLM handle every single event is inefficient. It’s slow and expensive. Not to mention that there might be thousands, maybe millions of events happening every second. What the LLM need is actually letting it create event listeners.

The Architecture

What would you do if you want to wake up at 7 AM. Do you need to check the clock every 5 minutes while you’re sleeping? You simply set an alarm. The alarm will “invoke” the buzzer to wake you up. The event here is the time, the interesting event is “The time is 7 AM”. Same thing for the LLM, it can set up an event listener, which will invoke the LLM when the interesting event occurs.

Featured

The figure above shows the flow of the event-driven agent. Streaming database is used here for building an event-driven agent, it can store, process, transform events and also allow users to query the data.

Featured

First the user illustrate the event to watch, for example, the user can say: “tell me when @risingwave.com folks register”. Apparently, the user’s agent is an assistant for managing the user’s website. The agent has access to some data that can help it to determine who just registered in the website. It could be the log in the backend, or a committed INSERT statement in the USER table. Let’s assume the event looks like this:

{"id": "x7cj7Hjis52-H", "payload": {"email": "peter.shen@example.com"}, "timestamp": 2024-11-14T06:00:00""}
{"id": "gdgs+52djKFO", "payload": {"email": "john.doe@example.com"}, "timestamp": 2024-11-14T07:00:00""}
{"id": "289ghnwNFoiu9dK", "payload": {"email": "mike.wang@risingwave.com"}, "timestamp": 2024-11-14T08:00:00""}
......

All these events will be ingested to the streaming database. The LLM will create some rules in the streaming database to filter events. And when something interesting happens, the streaming database will notify the event handler. Then, the event handler invoke the LLM with the prompt containing the event and the context. The prompt can be like:

Send a message to the user: mike.wang@risingwave.com just registerred.

Then the LLM can use tool calls to send a message to user about the event.

Let’s take a look at the SQL statement:

SELECT
  'Send message to user: ' | (payload).email | 'just registered.' AS prompt
FROM register_events
WHERE created_at > '2024-11-14 20:00:00' 
  AND (payload).email LIKE '@%risingwave.com'

This SQL first filter out all historical events by created_at > '2024-11-14 20:00:00', because the user want to be informed when @risingwave.com folks register from now. Then, it only gets event whose email field in the payload ends with risingwave.com , then transforms this register event to a prompt to invoke the LLM. This is the “eyes and ears” for a proactive agent.

You may wonder why this SQL statement can filter event and invoke LLM. This SQL is used to construct a materialized view. The full SQL statement looks like:

CREATE MATERIALIZED VIEW event_listener_risingwavecom_register AS
SELECT ... FROM ...

The materialized view stores the latest result of a query. You can run query on this materialized view, or create a new materialized view based on this materialized view:

SELECT * FROM event_listener_risingwavecom_register

In streaming databases, the materialized view is updated automatically and incrementally when the upstream data changes. So, when a new event is inserted into the register_events table, the streaming database will process this insert event using the rules defined by the SQL statement. If the the payload of the new inserted row ends with “@risingwave.com”, then a new prompt will be inserted into this materialized view. The update of the materialized view is also an event, RisingWave provides publish/subscribe mechanism to notify the downstream when the materialized view is updated.

Why Streaming Database

The streaming database is where the magic happens. It has everything we need: stream processing engine, data storage, data serving, and the SQL interface. There are a lot of stream processing engines, but one with SQL interface can help us to save a lot of time to build an event-driven agent. RisingWave is one of the suitable choices.

SQL Interface is essential

SQL is the language to describe the result you want, you don’t need to write the computation procedure. The database will generate an optimized computation graph according to the SQL. If you want to get the top 10 sales transaction record by the sales amount, you can simply put:

SELECT * FROM sales_global ORDER BY amount_usd LIMIT 10;

You can use MATERIALIZED VIEW to maintain the latest view of this query:

CREATE MATERAILIZED VIEW top_10_sales AS
SELECT * FROM sales_global ORDER BY amount_usd LIMIT 10;

Every time you query the materialized view SELECT * FROM top_10_sales , you will always get the latest top 10 sales transaction by the sale amount. The materialized view is incremental updated when the upstream data is changed.

How about letting the LLM write some Python code to get what we want? Well, the LLM would have to write the correct computation procedure. In this case, the LLM should write an incremental-update distributed top-K algorithm to process events. This requires heavy prompt engineering effort to make the LLM an expert in stream processing and distributed system. While using SQL, the LLM only needs to describe the result it want. That means less LLM thinking steps, less prompt engineering, less domain knowledge, and thus, less cost.

Input & Output

RisingWave is Postgres-compatible, meaning it can seamlessly fit in the Postgres ecosystem. You can access it with tools like psql, dbt, or the Postgres client libraries in programming languages. It can be used like a Postgres database, like serving as a Grafana Postgres Data Source, or a Postgres FDW. It also allows you to create sources to fetch data from the external data stores, or sink data into the stores, like Kafka, MySQL, ClickHouse, Iceberg tables, etc.

In the case discussed above, we will use the insert statement to ingest data, and use subscription to notify the event handler.

Streaming Joins

RisingWave is powerful in cases with the streaming join, which is a common use case in event-driven applications, as RisingWave is tailored for stream processing. Let’s take a look at an example about the streaming join:

Imagine you owns an international corporation and your products are sold all over the world. Now you’re sitting in your headquarter, located at Beijing, China. You want to monitor your real-time sales amount in CNY. And there are two tables in your database you can use:

  • sales_global : All sales transaction records.

  • exchange_rate : The latest known exchange rate at a specific timestamp.

Featured

Since the sales happen globally, the sales can be settled in any currency. Before summing up the sales amount, we need to covert the amount in a sales record with the latest known exchange rate at that moment. For example, to convert the amount of the record 2024-11-05T07:00:00, 10 USD , we need to first find the exchange_rates rows whose current_from is USD, current_to is CNY, and ts is the largest timestamp that cannot exceeds 2024-11-05T07:00:00 . The SQL looks like:

Featured

This is a streaming join example that the columnar-storage database cannot handle efficiently. In the real-time event processing, things can be more complicated, there might be a lot of joins instead of just one like this example. Especially you have a lot of dimension tables in your database.

The Demo

I built a demo to help you understand how the LLM can leverage the streaming database to monitor events. Check out the link for more details.

Featured

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.