More Query Examples
This document demonstrates some typical query patterns that you can achieve in Timeplus to solve various use cases.
A docker-compose file is created to bundle proton image with a data generator.
- Download the docker-compose.yml and put into a new folder.
- Open a terminal and run docker compose up in this folder.
- Wait for few minutes to pull all required images and start the containers.
- Use proton-client to run SQL to query the data:
docker exec -it <folder>-proton-1 proton-client
You can get the container name viadocker ps
.
Customer Scenario and Data Model
You are the lead business analyst in a carsharing company. Sensors are equipped in each car to report car locations. The customers use the mobile app to find available cars nearby, book them, unlock them and hit the road. At the end of the trip, the customer parks the car, locks it, and ends the trip. The payment will proceed automatically with the registered credit card.
Some of the typical use cases for time-sensitive insights are:
- How many cars are being driven by users in certain locations? Do we need to move some cars from less busy locations to those hot zones?
- Which cars are being driven too fast or running low on fuel? The service team may need to take action.
- Which users keep booking cars then canceling them? Shall we send real-time notification to those users to avoid abuse.
There are multiple data streams in the systems:
dim_user_info
A relative static stream with all registered user information.
Column | Type | Sample Value |
---|---|---|
uid | string | u00001 |
first_name | string | Foo |
last_name | string | Bar |
string | a@timeplus.io | |
credit_card | string | 371712345678910 |
gender | string | F |
birthday | string | 1990-01-15 |
dim_car_info
A relative static stream with all registered cars
Column | Comment | Type | Sample Value |
---|---|---|---|
cid | car ID | string | c00001 |
license_plate_no | string | KM235L | |
in_service | False if the car is suspended (retried or in maintenance) | bool | True |
car_live_data
A data stream with the latest data from car sensors. When the car engine is started, report data every second. Otherwise, report data every half an hour.
Column | Comment | Type | Sample Value |
---|---|---|---|
time | datetime of the sensor data | datetime | 2022-01-12 23:00:58.476 |
cid | Car ID | string | c00001 |
longitude | current position | float | 40.75896 |
latitude | current position | float | -73.985195 |
gas_percent | percentage of gas level, 100 means full tank | decimal | 86.12 |
speed_kmh | current driving speed in KM/hour | int | 56 |
total_km | this car's total distance in km. Keep increasing after trips | float | 3536 |
locked | whether the car is locked | bool | True |
in_use | whether someone is using the car | bool | True |
bookings
A data stream with trip details and payment info. Each row is generated during the booking lifecycle
- when the user books the car, a new event with action=add, with the booking_time=now, expire=now+30m
- when the user unlock the car, a new event with action=service
- when the user completes the trip and lock the car, a new event with action=end
- when the user cancels the booking, a new event with action=cancel
- when the user extends the booking for another 30 min, a new event with action=extend, and update the expire field
- if the user doesn't unlock the car before the expire time, then new event is added with action=expire
Column | Comment | Type | Sample Value |
---|---|---|---|
time | When the event happens | datetime | 2022-01-12 13:00:58.476 |
bid | booking ID | string | b00001 |
booking_time | When the user books the car. Expire in 30min | datetime | 2022-01-12 13:30:58.476 |
uid | User ID | string | u00001 |
cid | Car ID | string | c00001 |
action | One of the values: add, cancel, extend, service, expire,end | string | add |
expire | When the booking will be expired | datetime | 2022-01-12 13:30:58.476 |
trips
A data stream with trip details and payment info. Each row is generated at the end of the trip
Column | Comment | Type | Sample Value |
---|---|---|---|
tid | Trip ID | string | t00001 |
start_time | When the trip starts | datetime | 2022-01-12 13:00:58.476 |
end_time | When the trip ends | datetime | 2022-01-12 24:00:58.476 |
bid | booking ID | string | b00001 |
start_lon | Start location | float | 40.75896 |
start_lat | Start location | float | -73.985195 |
end_lon | End location | float | 42.75896 |
end_lat | End location | float | -71.985195 |
distance | distance drove in km | float | 23.2 |
amount | how much the user should pay for the trip | decimal | 40.75 |
The following sections show how to query Timeplus to understand the business.
Streaming Analysis
S-TAIL: Showing raw data with or without filter conditions
Use Case: to start the data exploration, the analyst wants to show all recently reported car iot data
SELECT * FROM car_live_data
or focusing on which cars are almost running out of gas (so that they can send service team to fill gas or suspend the car)
SELECT time,cid,gas_percent FROM car_live_data WHERE gas_percent < 25
Result:
time | cid | gas_percent | in_use |
---|---|---|---|
2022-01-12 23:00:58.476 | c00001 | 18 | false |
S-DOWNSAMPLING: Converting detailed data points to high level data
Use Case: The sensors on each car may report data from half a second to every 10 seconds. The analyst may reduce the granularity and only need to save per-minute data to downstream
SELECT window_start,cid, avg(gas_percent) AS avg_gas_percent,avg(speed_kmh) AS avg_speed FROM
tumble(car_live_data,1m) GROUP BY window_start, cid
Result:
window_start | cid | avg_gas_percent | avg_speed |
---|---|---|---|
2022-01-12 23:01:00.000 | c00001 | 34 | 35 |
More practically, the user can create a materialized view to automatically put downsampled data into a new stream/view.
CREATE MATERIALIZED VIEW car_live_data_1min as
SELECT window_start AS time,cid, avg(gas_percent) AS avg_gas,avg(speed_kmh) AS avg_speed
FROM tumble(car_live_data,1m) GROUP BY window_start, cid
Then the user can search the data via
SELECT * FROM car_live_data_1min
Result:
time | cid | avg_gas | avg_speed |
---|---|---|---|
2022-01-12 23:01:00.000 | c00001 | 34 | 35 |
S-AGG-RECENT: Showing aggregation for the recent data
Use Case: the analyst wants to monitor the total revenue for the past 1 hour.
Timeplus provides a special syntax to get such result easily
SELECT sum(amount) FROM trips EMIT LAST 1h
Once the query is submitted, it will show quite a few rows based on the past day, then show new results in a streaming fashion.
Result:
sum(amount) |
---|
3500.42 |
There are other ways to get similar results, with more verbose queries
-
We can apply a global aggregation for data in a recent 1 hour window.
select sum(amount) from trips where end_time > date_sub(now(), 1h)
-
The other solution is to use hop window aggregation. Similar to the
tumble
window in S-DOWNSAMPLING ,the data are grouped per a fixed size time window, such an hour. Tumble windows are not overlapped to each other, so it's ideal for downsampling without data duplication (for example, forcount
aggregation, no data will be counted twice) For hop window, it will be shifted to the left or right(past or future in the timeline) with a sliding step. For example, the following query will use the hop window to get total revenue for the past 1 hour, the result will be sent out every second.select window_start,window_end, sum(amount) from hop(trips,end_time,1s,1h) group by window_start,window_end
S-SESSION: analyzing activities with active sessions
Use Case: The analyst wants to track the daily movement of the cars. The sensors on the cars report data every second while the engine is started, and report data every half an hour when the engine is off. If the server doesn't receive the data for a running car for 5 seconds, the car is considered disconnected. We can run the following query to show the trip distances for each running cars
SELECT cid,window_start,window_end,max(total_km)-min(total_km) AS trip_km
FROM session(car_live_data, time, 5s, cid)
GROUP BY __tp_session_id, cid, window_start, window_end
HAVING trip_km > 0
Result:
cid | window_start | window_end | trip_km |
---|---|---|---|
c00040 | 2022-03-23 21:42:08.000 | 2022-03-23 21:42:12.000 | 0.05395412226778262 |
c00078 | 2022-03-23 21:42:08.000 | 2022-03-23 21:42:33.000 | 0.4258001818272703 |
More complex queries can be created to aggregate the data by car id and trip ending time.
with query_1 AS (
select cid,window_start AS w_start,window_end AS w_end,max(total_km)-min(total_km) AS trip_km
from session(car_live_data,time,20m, cid) group by __tp_session_id, cid, window_start, window_end
)
select cid,window_start,window_end,sum(trip_km)
from tumble(query_1,w_end,1h) group by cid,window_start,window_end
Result:
cid | window_start | window_end | trip_km |
---|---|---|---|
c00001 | 2022-01-12 00:00:00.000 | 2022-01-12 23:59:59.999 | 17.2 |
c00002 | 2022-01-12 00:00:00.000 | 2022-01-12 23:59:59.999 | 4.1 |
This query is a continuously streaming query. Every hour (or every day, depending on tumble window size), the analysis results can be sent to email/slack or a Kafka topic for further processing.
S-TIME-TRAVEL: Going back to a past time and run analysis since then
Use Case: the analysts don't need to keep watching the streaming charts or dashboards. They can rewind to a past time to run the streaming analysis since that moment. This could help them to better understand what happened a few hours ago (such as midnight).
For example, the analyst wants to understand how the users book the car 2 hours ago
SELECT window_start,count(*) FROM tumble(bookings,15m)
WHERE action='add' GROUP BY window_start
EMIT LAST 2h
Or they can specify an exactly timestamp, e.g.
SELECT window_start,count(*) FROM tumble(bookings,15m)
WHERE action='add' and _tp_time>='2022-01-12 06:00:00.000' GROUP BY window_start
Result:
window_start | count(*) |
---|---|
2022-01-12 06:00:00.000 | 34 |
2022-01-12 06:15:00.000 | 23 |
Not only the past data will be analyzed, but also the latest incoming data will be processed continuously.
S-MVIEW: Creating materialized view to keep latest analysis result and cache for other systems to query
Use Case: Unlike the traditional SQL queries, streaming queries never end until the user cancels it. The analysis results are kept pushing to the web UI or slack/kafka destinations. The analysts want to run advanced streaming queries in Timeplus and cache the results as a materialized view. So that they can use regular SQL tools/systems to get the streaming insights as regular tables. Materialized views are also useful to downsample the data to reduce the data volume for future analysis and storage
CREATE MATERIALIZED VIEW today_revenue as
SELECT sum(amount) FROM trips WHERE end_time > today();
-- in Timeplus or other connected SQL clients
SELECT * FROM today_revenue
S-DROP-LATE: Dropping late events to get real-time aggregation insights
Use Case: The streaming data may arrive late for many reasons, such as network latency, iot sensor malfunction, etc. When we run streaming analysis (such as payment per minute), we aggregate the data based on their event time (when the payment actually happened, instead of when Timeplus receives the data), and we don't want to wait for events which are significantly too late.
Watermark is a common mechanism in the streaming processing world to set the bar on how late the events can be. Unlike other systems, Timeplus makes it very easy to identify late events without explicitly setting a watermark policy.
For a query like this
SELECT window_start,window_end,sum(amount),count(*)
FROM tumble(trips,end_time,1m) GROUP BY window_start,window_end
It will show the total payment every minute, for example
window_start | window_end | sum(amount) | count(*) |
---|---|---|---|
2022-01-12 10:00:00.000 | 2022-01-12 10:01:00.000 | 200 | 42 |
2022-01-12 10:01:00.000 | 2022-01-12 10:02:00.000 | 300 | 67 |
Considering two cars are returned at the same time at 10:00:10. For tripA and trip, both of them are supposed to be calculated into the first time window. However, for some reason, the data point tripA arrives in Timeplus on 10:01:15, and tripB data point arrives on 10:01:16. Timeplus will accept tripA data and add it into the 1st window aggregation, and also close the first window. The watermark will be sent to 10:01:00. So when tripB data point arrives, it's considered to be too late and won't be calculated in the streaming result. But it'll still be available when we run a historical query.
data point | event time | arrive time | note |
---|---|---|---|
tripA | 2022-01-12 10:00:10.000 | 2022-01-12 10:01:15.000 | included in 1st window, trigger the watermark change |
tripB | 2022-01-12 10:00:10.000 | 2022-01-12 10:01:16.000 | its time is lower than the watermark. 1st window has been closed(not accepting more data) The data is dropped for streaming analysis. Still can be analyzed with historical searches |
S-WAIT-LATE: Waiting for extra time for late events
Use Case: for more advanced use for the above case, it might be desirable for the analysts to set an extra time to wait for late events. This will make the streaming analysis not so real-time, however can include more data points, if the network latency is unpredictable.
Given the similar scenario, the query with the advanced setting is
SELECT window_start,window_end,sum(amount),count(*)
FROM tumble(trips,end_time,1m) GROUP BY window_start,window_end
EMIT AFTER WATERMARK AND DELAY 30s
S-TOP-K: Getting the most common value for each streaming window
Use Case: the analysts want to understand which cars are booked most often every day or every hour
SELECT window_start,top_k(cid,3) AS popular_cars FROM tumble(bookings,1h) GROUP BY window_start
This will generate a daily report like this
window_start | popular_cars |
---|---|
2022-01-12 00:00:00.000 | ['car1','car2','car3'] |
2022-01-13 00:00:00.000 | ['car2','car3','car4'] |
S-MAX-K: Getting the maximum value for each streaming window
Use Case: the analysts want to understand which trips are longest every day
SELECT window_start,max_k(amount,3,bid,distance) AS longest_trips FROM tumble(trips,1d) GROUP BY window_start
This will generate a daily report like this
window_start | longest_trips |
---|---|
2022-01-12 00:00:00.000 | [(7.62,'b01',13.8),(2.45,'b02',2.37),(12.66,'b03',22.6)] |
To get the booking id for the 2nd longest trip, you can select ..,longest_trips[2].2 AS bookingId
S-MIN-K: Getting the minimal value for each streaming window
Use Case: the analysts want to understand which trips are shortest every day
SELECT window_start,min_k(amount,3,bid,distance) AS shortest_trips FROM tumble(trips,1d) GROUP BY window_start
This will generate a daily report like this
window_start | shortest_trips |
---|---|
2022-01-12 00:00:00.000 | [(2.56,'b06',3.10),(7.68,'b07',10.8),(10.24,'b08',15.36)] |
S-OVER-TIME: Getting the difference/gaps for results in each time window
Use Case: with Timeplus, the analysts can easily compare the current minute data with last minute data.
For example, the user wants to understand how many cars are being used in each minute and how it is different than the last minute
SELECT window_start,count(*) AS num_of_trips,
lag(num_of_trips) AS last_min_trips,num_of_trips-last_min_trips AS gap
FROM tumble(trips,1m) GROUP BY window_start
Result
window_start | num_of_trips | last_min_trips | gap |
---|---|---|---|
2022-01-12 10:00:00.000 | 88 | 0 | 88 |
2022-01-12 10:01:00.000 | 80 | 88 | -8 |
2022-01-12 10:02:00.000 | 90 | 80 | 10 |
This is a very powerful and useful capability. Besides comparing the last aggregation result, the analysts can also compare the data for the past. For example this second with the same second in the last minute or last hour.
The following query comparing the number of car sensor data by each second, comparing the number of events in last m
SELECT window_start,count(*) AS num_of_events,
lag(window_start,60) AS last_min,
lag(num_of_events,60) AS last_min_events,
num_of_events-last_min_events AS gap,
concat(to_string(to_decimal(gap*100/num_of_events,2)),'%') AS change
FROM tumble(car_live_data,1s) GROUP BY window_start
Once the query starts running, for the first 1 minute, only the newer result is available. Then we can get the result from 60 windows back, so that we can compare the difference.
Result
window_start | num_of_events | last_min | last_min_events | gap | change |
---|---|---|---|---|---|
2022-01-12 10:01:00.000 | 88 | 2022-01-12 10:00:00.000 | 83 | 5 | 5 |
2022-01-12 10:01:01.000 | 80 | 2022-01-12 10:00:01.000 | 87 | -7 | -8.75% |
S-UNION-STREAMS: Merging multiple streams in same schema to a single stream
Use Case: There can be some data streams in the same data schema but intentionally put into different streams, such as one stream for one city, or a country (for performance or regulation considerations, for example). We would like to merge the data to understand the big picture.
For example, the car sharing company starts their business in Vancouver BC first. Then expand it to Victoria, BC. Per local city government's regulation requirements, two systems are set up. The headquarter wants to show streaming analysis for both cities.
SELECT * FROM trips_vancouver
UNION
SELECT * FROM trips_victoria
S-JOIN-STREAMS: Querying multiple data streams in the same time
Use Case: Data keeps changing, and each type of changing data is a stream. It's a common requirement to query multiple kinds of data at the same time to enrich the data, get more context and understand their correlation.
For example, we want to understand how many minutes on average between the user booking the car and starting the trip. The booking information in bookings stream and trips stream contains the trip start time and end time
SELECT avg(gap) FROM
( SELECT
date_diff('second', bookings.booking_time, trips.start_time) AS gap
FROM bookings
INNER JOIN trips ON (bookings.bid = trips.bid)
AND date_diff_within(2m, bookings.booking_time, trips.start_time)
) WHERE _tp_time >= now()-1d
Other streaming queries we can run for this demo set
Get number of in-use cars
Each car will report their status to the car_live_data stream, including the in_use
bool flag. For the same car id, it may report in_use=false
2 minutes ago, then report in_use=true
1 minute ago. Then this car should be considered as in-use. We should not run global aggregation since we only care about the current status, not the accumulated data (each running car should report data twice a second). tumble
window should be okay with 1 second as window size.
SELECT window_start, count(distinct cid) FROM tumble(car_live_data,1s)
WHERE in_use GROUP BY window_start
Get the top 10 cars order by revenue
We probably want to understand which cars help the company earn the most revenue or which cars are not gaining enough revenue. This can be done with the following query
SELECT cid,sum(amount) AS revenue from trips
INNER JOIN bookings on trips.bid=bookings.bid
WHERE end_time > today() GROUP BY cid
ORDER BY revenue DESC LIMIT 10
settings query_mode='table'
The result is like this
cid | revenue |
---|---|
c87850 | 675.9 |
c30765 | 637.48 |
c72990 | 487.36 |
c00956 | 481.66 |
c21898 | 479.64 |
c96280 | 476.62 |
c59872 | 461.2 |
c51800 | 451.14 |
c20995 | 445.48 |
c04604 | 445.3 |
You can further enrich the data by looking up the car license plate from dim_car_info
WITH top10cars AS (
SELECT cid,sum(amount) AS revenue FROM trips
INNER JOIN bookings on trips.bid=bookings.bid
WHERE end_time > today() GROUP BY cid
ORDER BY revenue DESC LIMIT 10
)
SELECT cid,revenue,license_plate_no FROM top10cars
INNER JOIN dim_car_info on top10cars.cid=dim_car_info.cid
settings query_mode='table'
The result is
cid | revenue | license_plate_no |
---|---|---|
c30765 | 749.06 | OPIH21 |
c87850 | 701.14 | 384K9Z |
c00956 | 522.66 | 3JHAYX |
c68812 | 497.12 | LR3AF9 |
c51363 | 495.98 | B8UVY6 |
c72990 | 487.36 | W6IZOB |
c91904 | 486.14 | LIHUZW |
c21898 | 479.64 | 1KTBIJ |
c20995 | 477.38 | 6GN7SP |
c96280 | 476.62 | YSK84J |
Data Transformation/filter/cleaning
T-MASK: Scrubbing sensitive fields
Use Case: Scrub sensitive fields of data(such AS PII or payment information) to hide from downstream consumers
In this example, only the first and last 4 digits of the credit card numbers are shown during user activity analysis.
SELECT uid,replace_regex(credit_card,'(\\d{4})(\\d*)(\\d{4})','\\1***\\3') AS card
FROM user_info
Result:
uid | card |
---|---|
u00001 | 3717***8910 |
T-DERIVE: Computing derived columns from raw data
Use Case: Create new columns to combine information from multiple columns in the raw data, or turn data in certain columns in another format to make them ready to be displayed.
SELECT uid, concat(first_name,' ',last_name) AS full_name,
year(today())-year(to_date(birthday)) AS age FROM user_info
Result:
uid | full_name | age |
---|---|---|
u00001 | Foo Bar | 32 |
T-LOOKUP: Converting identifiers to more readable information
Use Case: While checking the car IoT data, we want to convert the car ID to its license plate number.
SELECT time, cid, c.license_plate_no AS license,gas_percent,speed_kmh FROM car_live_data
INNER JOIN car_info AS c
ON car_live_data.cid=c.cid
Result:
time | cid | license | gas_percent | speed_kmh |
---|---|---|---|---|
2022-01-12 23:00:58.476 | c00001 | KM235L | 55 | 50 |