Manage Flows
Each flow is a continuous aggregation query in GreptimeDB.
It continuously updates the aggregated data based on the incoming data.
This document describes how to create, and delete a flow.
Create a sink table
Before creating a flow, you need a sink table to store the aggregated data generated by the flow. While it is the same to a regular time series table, there are a few important considerations:
- Column order and type: Ensure the order and type of the columns in the sink table match the query result of the flow.
- Time index: Specify the TIME INDEXfor the sink table, typically using the time window column generated by the time window function.
- Specify update_atas the last column of the schema: The flow automatically writes the update time of the data to theupdate_atcolumn. Ensure this column is the last one in the sink table schema.
- Tags: Use PRIMARY KEYto specify Tags, which together with the time index serves as a unique identifier for row data and optimizes query performance.
For example:
/* Create sink table */
CREATE TABLE temp_alerts (
  sensor_id INT,
  loc STRING,
  max_temp DOUBLE,
  time_window TIMESTAMP TIME INDEX,
  update_at TIMESTAMP,
  PRIMARY KEY(sensor_id, loc)
);
CREATE FLOW temp_monitoring
SINK TO temp_alerts
AS
SELECT
  sensor_id,
  loc,
  max(temperature) AS max_temp,
  date_bin(INTERVAL '10 seconds', ts) AS time_window
FROM temp_sensor_data
GROUP BY
  sensor_id,
  loc,
  time_window
HAVING max_temp > 100;
The sink table has the columns sensor_id, loc, max_temp, time_window, and update_at.
- The first four columns correspond to the query result columns of flow: sensor_id,loc,max(temperature)anddate_bin(INTERVAL '10 seconds', ts)respectively.
- The time_windowcolumn is specified as theTIME INDEXfor the sink table.
- The update_atcolumn is the last one in the schema to store the update time of the data.
- The PRIMARY KEYat the end of the schema definition specifiessensor_idandlocas the tag columns. This means the flow will insert or update data based on the tagssensor_idandlocalong with the time indextime_window.
Create a flow
The grammar to create a flow is:
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
AS 
<SQL>;
- flow-nameis an unique identifier in the catalog level.
- sink-table-nameis the table name where the materialized aggregated data is stored. It can be an existing table or a new one.- flowwill create the sink table if it doesn't exist.
- EXPIRE AFTERis an optional interval to expire the data from the Flow engine. For more details, please refer to the- EXPIRE AFTERpart.
- COMMENTis the description of the flow.
- SQLpart defines the continuous aggregation query. It defines the source tables provide data for the flow. Each flow can have multiple source tables. Please Refer to Write a Query for the details.
A simple example to create a flow:
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT 'My first flow in GreptimeDB'
AS
SELECT
    max(temperature) as max_temp,
    date_bin(INTERVAL '10 seconds', ts) as time_window,
FROM temp_sensor_data
GROUP BY time_window;
The created flow will compute max(temperature) for every 10 seconds and store the result in my_sink_table. All data comes within 1 hour will be used in the flow.
EXPIRE AFTER clause
The EXPIRE AFTER clause specifies the interval after which data will expire from the flow engine.
This expiration only affects the data in the flow engine and does not impact the data in the source table.
When the flow engine processes the aggregation operation (the update_at time),
data with a time index older than the specified interval will expire.
For example, if the flow engine processes the aggregation at 10:00:00 and the INTERVAL '1 hour' is set,
any data older than 1 hour (before 09:00:00) will expire.
Only data timestamped from 09:00:00 onwards will be used in the aggregation.
Write a SQL query
The SQL part of the flow is similar to a standard SELECT clause with a few differences. The syntax of the query is as follows:
SELECT AGGR_FUNCTION(column1, column2,..), TIME_WINDOW_FUNCTION() as time_window FROM <source_table> GROUP BY time_window;
Only the following types of expressions are allowed after the SELECT keyword:
- Aggregate functions: Refer to the Expression documentation for details.
- Time window functions: Refer to the define time window section for details.
- Scalar functions: Such as col,to_lowercase(col),col + 1, etc. This part is the same as in a standardSELECTclause in GreptimeDB.
The following points should be noted about the rest of the query syntax:
- The query must include a FROMclause to specify the source table. As join clauses are currently not supported, the query can only aggregate columns from a single table.
- WHEREand- HAVINGclauses are supported. The- WHEREclause filters data before aggregation, while the- HAVINGclause filters data after aggregation.
- DISTINCTcurrently only works with the- SELECT DISTINCT column1 ..syntax. It is used to remove duplicate rows from the result set. Support for- SELECT count(DISTINCT column1) ...is not available yet but will be added in the future.
- The GROUP BYclause works the same as a standard queries, grouping data by specified columns. The time window column in theGROUP BYclause is crucial for continuous aggregation scenarios. Other expressions inGROUP BYcan include literals, columns, or scalar expressions.
- ORDER BY,- LIMIT, and- OFFSETare not supported.
Refer to Usecase Examples for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboards.
Define time window
A time window is a crucial attribute of your continuous aggregation query. It determines how data is aggregated within the flow. These time windows are left-closed and right-open intervals.
A time window represents a specific range of time. Data from the source table is mapped to the corresponding window based on the time index column. The time window also defines the scope for each calculation of an aggregation expression, resulting in one row per time window in the result table.
You can use date_bin() after the SELECT keyword to define fixed time windows.
For example:
SELECT
    max(temperature) as max_temp,
    date_bin(INTERVAL '10 seconds', ts) as time_window
FROM temp_sensor_data
GROUP BY time_window;
In this example, the date_bin(INTERVAL '10 seconds', ts) function creates 10-second time windows starting from UTC 00:00:00.
The max(temperature) function calculates the maximum temperature value within each time window.
For more details on the behavior of the function,
please refer to date_bin.
Currently, the internal state of the flow, such as the accumulator's value for incremental query results (e.g., the accumulator for count(col) which records the current count), is not persistently stored. To minimize data loss in case of internal state failure, it is advisable to use smaller time windows.
This internal state loss does not affect the existing data in the sink table.
Flush a flow
The flow engine automatically processes aggregation operations within 1 second when new data arrives in the source table.
However, you can manually trigger the flow engine to process the aggregation operation immediately using the ADMIN FLUSH_FLOW command.
ADMIN FLUSH_FLOW('<flow-name>')
Delete a flow
To delete a flow, use the following DROP FLOW clause:
DROP FLOW [IF EXISTS] <name>
For example:
DROP FLOW IF EXISTS my_flow;