Use Case
Join streaming events with dimension tables, compute derived fields, and transform data on the fly — so downstream systems always work with complete, enriched records.
Trusted by 1,000+ Data-Driven Organizations
for Real-time Analytics
The Problem
Batch ETL enriches data every few hours. Between runs, your recommendation engine uses stale user profiles, your fraud system misses context from recent transactions, and your analytics dashboards show incomplete records.
With RisingWave
RisingWave performs streaming JOINs — continuously matching events with the latest dimension data. User profiles, product catalogs, geo lookups — all joined in real time with standard SQL.
A DTC brand spends $2.4M/month across Google, Meta, TikTok, and email. The marketing team needs to know which touchpoint combination actually drives purchases — not the last-click attribution that gives 100% credit to the final ad.
| user_id | session_id | channel | campaign | action | revenue | ts |
|---|---|---|---|---|---|---|
| usr_80012 | sess_t1a | tiktok | spring_awareness | video_view | 0 | 2024-03-01T09:14:00.000Z |
| usr_80012 | sess_t1b | brand_search | click | 0 | 2024-03-05T11:22:00.000Z | |
| usr_80012 | sess_t1c | meta | retargeting_q1 | click | 0 | 2024-03-10T16:45:00.000Z |
| usr_80012 | sess_t1d | abandon_cart_flow | click | 189.99 | 2024-03-12T20:08:00.000Z | |
| usr_73204 | sess_t2a | tiktok | spring_awareness | video_view | 0 | 2024-03-02T14:30:00.000Z |
| usr_73204 | sess_t2b | tiktok | creator_collab | click | 0 | 2024-03-07T19:12:00.000Z |
CREATE SOURCE touchpoints WITH (
connector = 'kafka',
topic = 'marketing.touchpoints',
properties.bootstrap.server = 'broker:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE MATERIALIZED VIEW user_journeys AS
WITH conversions AS (
SELECT
user_id,
ts AS conversion_ts,
revenue
FROM touchpoints
WHERE revenue > 0
),
journeys AS (
SELECT
c.user_id,
c.conversion_ts,
c.revenue,
t.channel,
t.campaign,
t.action,
t.ts AS touch_ts,
ROW_NUMBER() OVER (
PARTITION BY c.user_id, c.conversion_ts
ORDER BY t.ts
) AS touch_position,
COUNT(*) OVER (
PARTITION BY c.user_id, c.conversion_ts
) AS journey_length
FROM conversions c
JOIN touchpoints t
ON c.user_id = t.user_id
AND t.ts <= c.conversion_ts
AND t.ts >= c.conversion_ts - INTERVAL '30 DAYS'
)
SELECT
user_id,
conversion_ts,
revenue,
channel,
campaign,
touch_position,
journey_length,
CASE
WHEN journey_length = 1 THEN 1.0
WHEN touch_position = 1 THEN 0.30
WHEN touch_position = journey_length THEN 0.30
ELSE 0.40 / (journey_length - 2)
END AS position_weight
FROM journeys;| channel | last_click_revenue | attributed_revenue | assist_count | avg_position | roas |
|---|---|---|---|---|---|
| 892400 | 684120 | 14200 | 0.58 | 3.42 | |
| meta | 724800 | 571440 | 18400 | 0.65 | 2.86 |
| tiktok | 72000 | 456000 | 31200 | 0.22 | 2.28 |
| 710800 | 688440 | 8600 | 0.88 | 5.74 |
Replace batch enrichment with continuous streaming JOINs that keep every downstream system working with fresh, complete records.