diff --git a/website/docs/quickstart/User-Profile.md b/website/docs/quickstart/User-Profile.md new file mode 100644 index 0000000000..dad91f277c --- /dev/null +++ b/website/docs/quickstart/User-Profile.md @@ -0,0 +1,279 @@ +--- +title: Real-Time User Profile +sidebar_position: 4 +--- + +# Real-Time User Profile + +This tutorial demonstrates how to build a production-grade, real-time user profiling system using Apache Fluss. You will learn how to map high-cardinality string identifiers (like emails) to compact integers and aggregate user behavior directly in the storage layer with exactly-once guarantees. + +![arch](/img/user-profile.png) + +## How the System Works + +### Core Concepts +* **Identity Mapping**: High-cardinality strings (Emails) ➔ Compact 32-bit `INT` UIDs. +* **Offloaded Aggregation**: Computation happens in Fluss TabletServers, keeping Flink state-free. +* **Optimized Storage**: Native [RoaringBitmap](https://roaringbitmap.org/) support for sub-second unique visitor (UV) counts. + +### Data Flow +1. **Ingestion**: Raw event streams (e.g., clicks, page views) enter the system from a source like [Apache Kafka](https://kafka.apache.org/). +2. **Mapping**: The [Apache Flink](https://flink.apache.org/) job performs a temporal lookup join against the `user_dict` table. If a user is new, the `insert-if-not-exists` hint triggers Fluss to automatically generate a unique `INT` UID using its Auto-Increment feature. +3. **Merge**: The **Aggregation Merge Engine** updates clicks and bitmaps in the storage layer. +4. **Recovery**: The **Undo Recovery** mechanism ensures exactly-once accuracy during failovers. + +## Environment Setup + +### Prerequisites + +Before proceeding, ensure that [Docker](https://docs.docker.com/engine/install/) and the [Docker Compose plugin](https://docs.docker.com/compose/install/linux/) are installed. + +### Starting the Playground + +1. Create a working directory. + ```shell + mkdir fluss-user-profile + cd fluss-user-profile + ``` + +2. Set the Fluss version environment variable. + ```shell + # Set to 0.9.0 or 0.10-SNAPSHOT for latest features + export FLUSS_DOCKER_VERSION=0.10-SNAPSHOT + export FLINK_VERSION="1.20" + ``` + :::note + If you open a new terminal window, remember to re-run these export commands. + ::: + +3. Create a `lib` directory and download the required JARs. + ```shell + mkdir lib + # Download Flink Faker for data generation + curl -fL -o lib/flink-faker-0.5.3.jar https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar + # Download Fluss Connector + curl -fL -o "lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" \ + "https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/${FLUSS_DOCKER_VERSION}/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" + ``` + :::note + For now : cp ../fluss-flink/fluss-flink-1.20/target/fluss-flink-1.20-0.10-SNAPSHOT.jar lib/ + ::: + +4. Create a `docker-compose.yml` file. + ```yaml + services: + coordinator-server: + image: apache/fluss:${FLUSS_DOCKER_VERSION} + command: coordinatorServer + depends_on: + - zookeeper + environment: + - | + FLUSS_PROPERTIES= + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://coordinator-server:9123 + remote.data.dir: /remote-data + volumes: + - fluss-remote-data:/remote-data + tablet-server: + image: apache/fluss:${FLUSS_DOCKER_VERSION} + command: tabletServer + depends_on: + - coordinator-server + environment: + - | + FLUSS_PROPERTIES= + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server:9123 + data.dir: /tmp/fluss/data + remote.data.dir: /remote-data + volumes: + - fluss-remote-data:/remote-data + zookeeper: + restart: always + image: zookeeper:3.9.2 + jobmanager: + image: flink:${FLINK_VERSION} + ports: + - "8081:8081" + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh jobmanager"] + volumes: + - ./lib:/tmp/lib + - fluss-remote-data:/remote-data + taskmanager: + image: flink:${FLINK_VERSION} + depends_on: + - jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 2 + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh taskmanager"] + volumes: + - ./lib:/tmp/lib + - fluss-remote-data:/remote-data + sql-client: + image: flink:${FLINK_VERSION} + depends_on: + - jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + rest.address: jobmanager + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh bin/sql-client.sh"] + volumes: + - ./lib:/tmp/lib + - fluss-remote-data:/remote-data + + volumes: + fluss-remote-data: + ``` + +5. Start the environment. + ```shell + docker compose up -d + ``` + +6. Launch the Flink SQL Client. + ```shell + # Explicitly include the Fluss JAR in the classpath for the session + docker compose run sql-client bin/sql-client.sh -j /tmp/lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar + ``` + +## Step 1: Create the Fluss Catalog + +In the SQL Client, create and use the Fluss catalog. + +:::tip +Run SQL statements one by one to avoid errors. +::: + +```sql +CREATE CATALOG fluss_catalog WITH ( + 'type' = 'fluss', + 'bootstrap.servers' = 'coordinator-server:9123' +); +``` + +```sql +USE CATALOG fluss_catalog; +``` + +## Step 2: Create the User Dictionary Table + +Create the `user_dict` table to map emails to UIDs. We use `auto-increment.fields` to automatically generate unique IDs. + +```sql +CREATE TABLE user_dict ( + email STRING, + uid INT, + PRIMARY KEY (email) NOT ENFORCED +) WITH ( + 'connector' = 'fluss', + 'auto-increment.fields' = 'uid', + 'bucket.num' = '1' +); +``` + +## Step 3: Create the Aggregated Profile Table + +Create the `user_profiles` table using the **Aggregation Merge Engine**. This pushes the aggregation logic (summing clicks, unioning bitmaps) to the Fluss TabletServers. + +:::tip +We use `rbm64` ([RoaringBitmap](https://roaringbitmap.org/)) for efficient unique visitor counting. This allows you to store millions of unique IDs in a highly compressed format. +::: + +```sql +CREATE TABLE user_profiles ( + profile_id INT, + unique_visitors BYTES, + total_clicks BIGINT, + PRIMARY KEY (profile_id) NOT ENFORCED +) WITH ( + 'connector' = 'fluss', + 'table.merge-engine' = 'aggregation', + 'fields.unique_visitors.agg' = 'rbm64', + 'fields.total_clicks.agg' = 'sum', + 'bucket.num' = '1' +); +``` + +## Step 4: Ingest and Process Data + +Create a temporary source table to simulate raw user events using the Faker connector. + +```sql +CREATE TEMPORARY TABLE raw_events ( + email STRING, + click_count INT, + profile_group_id INT, + proctime AS PROCTIME() +) WITH ( + 'connector' = 'faker', + 'rows-per-second' = '1', + 'fields.email.expression' = '#{internet.emailAddress}', + 'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}', + 'fields.profile_group_id.expression' = '#{number.numberBetween ''1'',''5''}' +); +``` + +Now, run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if an email is not found in `user_dict`, Fluss generates a new `uid` on the fly. + +```sql +INSERT INTO user_profiles +SELECT + d.uid, + -- Convert INT to BYTES for rbm64. + -- Note: In a real production job, you might use a UDF to ensure correct bitmap initialization. + CAST(CAST(d.uid AS STRING) AS BYTES), + CAST(e.click_count AS BIGINT) +FROM raw_events AS e +JOIN user_dict /*+ OPTIONS('lookup.insert-if-not-exists' = 'true') */ +FOR SYSTEM_TIME AS OF e.proctime AS d +ON e.email = d.email; +``` + +## Step 5: Verify Results + +You can query the `user_profiles` table to see the aggregated metrics updating in real-time. + +```sql +-- Set result mode to tableau for better visualization +SET 'sql-client.execution.result-mode' = 'tableau'; +``` + +```sql +--user profile +SELECT + profile_id, + total_clicks, + unique_visitors +FROM user_profiles; +``` + +You should see rows with `profile_id`, `total_clicks`, and `unique_visitors` (as raw bytes representing the bitmap). The `total_clicks` will increase as more events arrive for the same user. + +To verify the dictionary mapping: +```sql +SELECT * FROM user_dict; +``` + +## Clean Up + +Exit the SQL Client (`exit;`) and stop the Docker containers. + +```shell +docker compose down -v +``` + +## Architectural Benefits + +* **Stateless Flink Jobs:** Offloading the dictionary and aggregation state to Fluss makes Flink jobs lightweight, enabling faster checkpoints and nearly instant recovery. +* **Compact Storage:** Using `INT` UIDs instead of `STRING` emails reduces the memory footprint for deduplication tasks by up to 90%. +* **Exactly-Once Accuracy:** Even during failovers, the **Undo Recovery** mechanism ensures replayed data does not result in double-counting, maintaining perfect accuracy. diff --git a/website/static/img/user-profile.png b/website/static/img/user-profile.png new file mode 100644 index 0000000000..413ffd6639 Binary files /dev/null and b/website/static/img/user-profile.png differ