Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions website/docs/quickstart/User-Profile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
---
title: Real-Time User Profile
sidebar_position: 4
---

# Real-Time User Profile

This tutorial demonstrates how to build a production-grade, real-time user profiling system using Apache Fluss. You will learn how to map high-cardinality string identifiers (like emails) to compact integers and aggregate user behavior directly in the storage layer with exactly-once guarantees.

![arch](/img/user-profile.png)

## How the System Works

### Core Concepts
* **Identity Mapping**: High-cardinality strings (Emails) ➔ Compact 32-bit `INT` UIDs.
* **Offloaded Aggregation**: Computation happens in Fluss TabletServers, keeping Flink state-free.
* **Optimized Storage**: Native [RoaringBitmap](https://roaringbitmap.org/) support for sub-second unique visitor (UV) counts.

### Data Flow
1. **Ingestion**: Raw event streams (e.g., clicks, page views) enter the system from a source like [Apache Kafka](https://kafka.apache.org/).
2. **Mapping**: The [Apache Flink](https://flink.apache.org/) job performs a temporal lookup join against the `user_dict` table. If a user is new, the `insert-if-not-exists` hint triggers Fluss to automatically generate a unique `INT` UID using its Auto-Increment feature.
3. **Merge**: The **Aggregation Merge Engine** updates clicks and bitmaps in the storage layer.
4. **Recovery**: The **Undo Recovery** mechanism ensures exactly-once accuracy during failovers.

## Environment Setup

### Prerequisites

Before proceeding, ensure that [Docker](https://docs.docker.com/engine/install/) and the [Docker Compose plugin](https://docs.docker.com/compose/install/linux/) are installed.

### Starting the Playground

1. Create a working directory.
```shell
mkdir fluss-user-profile
cd fluss-user-profile
```

2. Set the Fluss version environment variable.
```shell
# Set to 0.9.0 or 0.10-SNAPSHOT for latest features
export FLUSS_DOCKER_VERSION=0.10-SNAPSHOT
export FLINK_VERSION="1.20"
```
:::note
If you open a new terminal window, remember to re-run these export commands.
:::

3. Create a `lib` directory and download the required JARs.
```shell
mkdir lib
# Download Flink Faker for data generation
curl -fL -o lib/flink-faker-0.5.3.jar https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar
# Download Fluss Connector
curl -fL -o "lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" \
"https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/${FLUSS_DOCKER_VERSION}/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar"
```
:::note
For now : cp ../fluss-flink/fluss-flink-1.20/target/fluss-flink-1.20-0.10-SNAPSHOT.jar lib/
:::

4. Create a `docker-compose.yml` file.
```yaml
services:
coordinator-server:
image: apache/fluss:${FLUSS_DOCKER_VERSION}
command: coordinatorServer
depends_on:
- zookeeper
environment:
- |
FLUSS_PROPERTIES=
zookeeper.address: zookeeper:2181
bind.listeners: FLUSS://coordinator-server:9123
remote.data.dir: /remote-data
volumes:
- fluss-remote-data:/remote-data
tablet-server:
image: apache/fluss:${FLUSS_DOCKER_VERSION}
command: tabletServer
depends_on:
- coordinator-server
environment:
- |
FLUSS_PROPERTIES=
zookeeper.address: zookeeper:2181
bind.listeners: FLUSS://tablet-server:9123
data.dir: /tmp/fluss/data
remote.data.dir: /remote-data
volumes:
- fluss-remote-data:/remote-data
zookeeper:
restart: always
image: zookeeper:3.9.2
jobmanager:
image: flink:${FLINK_VERSION}
ports:
- "8081:8081"
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh jobmanager"]
volumes:
- ./lib:/tmp/lib
- fluss-remote-data:/remote-data
taskmanager:
image: flink:${FLINK_VERSION}
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh taskmanager"]
volumes:
- ./lib:/tmp/lib
- fluss-remote-data:/remote-data
sql-client:
image: flink:${FLINK_VERSION}
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh bin/sql-client.sh"]
volumes:
- ./lib:/tmp/lib
- fluss-remote-data:/remote-data

volumes:
fluss-remote-data:
```

5. Start the environment.
```shell
docker compose up -d
```

6. Launch the Flink SQL Client.
```shell
# Explicitly include the Fluss JAR in the classpath for the session
docker compose run sql-client bin/sql-client.sh -j /tmp/lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar
```

## Step 1: Create the Fluss Catalog

In the SQL Client, create and use the Fluss catalog.

:::tip
Run SQL statements one by one to avoid errors.
:::

```sql
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'coordinator-server:9123'
);
```

```sql
USE CATALOG fluss_catalog;
```

## Step 2: Create the User Dictionary Table

Create the `user_dict` table to map emails to UIDs. We use `auto-increment.fields` to automatically generate unique IDs.

```sql
CREATE TABLE user_dict (
email STRING,
uid INT,
PRIMARY KEY (email) NOT ENFORCED
) WITH (
'connector' = 'fluss',
'auto-increment.fields' = 'uid',
'bucket.num' = '1'
);
```

## Step 3: Create the Aggregated Profile Table

Create the `user_profiles` table using the **Aggregation Merge Engine**. This pushes the aggregation logic (summing clicks, unioning bitmaps) to the Fluss TabletServers.

:::tip
We use `rbm64` ([RoaringBitmap](https://roaringbitmap.org/)) for efficient unique visitor counting. This allows you to store millions of unique IDs in a highly compressed format.
:::

```sql
CREATE TABLE user_profiles (
profile_id INT,
unique_visitors BYTES,
total_clicks BIGINT,
PRIMARY KEY (profile_id) NOT ENFORCED
) WITH (
'connector' = 'fluss',
'table.merge-engine' = 'aggregation',
'fields.unique_visitors.agg' = 'rbm64',
'fields.total_clicks.agg' = 'sum',
'bucket.num' = '1'
);
```

## Step 4: Ingest and Process Data

Create a temporary source table to simulate raw user events using the Faker connector.

```sql
CREATE TEMPORARY TABLE raw_events (
email STRING,
click_count INT,
profile_group_id INT,
proctime AS PROCTIME()
) WITH (
'connector' = 'faker',
'rows-per-second' = '1',
'fields.email.expression' = '#{internet.emailAddress}',
'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}',
'fields.profile_group_id.expression' = '#{number.numberBetween ''1'',''5''}'
);
```

Now, run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if an email is not found in `user_dict`, Fluss generates a new `uid` on the fly.

```sql
INSERT INTO user_profiles
SELECT
d.uid,
-- Convert INT to BYTES for rbm64.
-- Note: In a real production job, you might use a UDF to ensure correct bitmap initialization.
CAST(CAST(d.uid AS STRING) AS BYTES),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need the to_rbm and from_rbm Flink UDFs to process the data correctly. Without these functions, the results would be meaningless and users would not understand the purpose of this feature.

However, shipping Flink UDFs falls outside the scope of the Fluss project. I will coordinate with members of the Flink community to contribute these UDFs and identify an appropriate location to open source and publish the UDF JARs. Once available, we can reference these functions in our documentation and examples.

That said, the Lunar New Year holiday is approaching in China, so we likely will not be able to start this work until March. Until then, we may need to suspend this PR. Thank you for your quick updating.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need the to_rbm and from_rbm Flink UDFs to process the data correctly. Without these functions, the results would be meaningless and users would not understand the purpose of this feature.

Exactly. we need such functions as RB_CARDINALITY and RB_OR_AGG for aggregating the result bitmap.

CAST(e.click_count AS BIGINT)
FROM raw_events AS e
JOIN user_dict /*+ OPTIONS('lookup.insert-if-not-exists' = 'true') */
FOR SYSTEM_TIME AS OF e.proctime AS d
ON e.email = d.email;
```

## Step 5: Verify Results

You can query the `user_profiles` table to see the aggregated metrics updating in real-time.

```sql
-- Set result mode to tableau for better visualization
SET 'sql-client.execution.result-mode' = 'tableau';
```

```sql
--user profile
SELECT
profile_id,
total_clicks,
unique_visitors
FROM user_profiles;
```

You should see rows with `profile_id`, `total_clicks`, and `unique_visitors` (as raw bytes representing the bitmap). The `total_clicks` will increase as more events arrive for the same user.

To verify the dictionary mapping:
```sql
SELECT * FROM user_dict;
```

## Clean Up

Exit the SQL Client (`exit;`) and stop the Docker containers.

```shell
docker compose down -v
```

## Architectural Benefits

* **Stateless Flink Jobs:** Offloading the dictionary and aggregation state to Fluss makes Flink jobs lightweight, enabling faster checkpoints and nearly instant recovery.
* **Compact Storage:** Using `INT` UIDs instead of `STRING` emails reduces the memory footprint for deduplication tasks by up to 90%.
* **Exactly-Once Accuracy:** Even during failovers, the **Undo Recovery** mechanism ensures replayed data does not result in double-counting, maintaining perfect accuracy.
Binary file added website/static/img/user-profile.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.