-
Notifications
You must be signed in to change notification settings - Fork 508
[docs] Add Real-Time User Profile quickstart tutorial #2669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Prajwal-banakar
wants to merge
6
commits into
apache:main
Choose a base branch
from
Prajwal-banakar:docs-newQuickstart
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+279
−0
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
be13af5
Add Real-Time User Profile quickstart tutorial
Prajwal-banakar 8881dbd
Added Diagram
Prajwal-banakar ee09dbc
enhanced guide by adding the environment setup
Prajwal-banakar d481162
enhanced guide by adding the environment setup
Prajwal-banakar 0e8af82
verified environment setup
Prajwal-banakar 361a094
improved
Prajwal-banakar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,279 @@ | ||
| --- | ||
| title: Real-Time User Profile | ||
| sidebar_position: 4 | ||
| --- | ||
|
|
||
| # Real-Time User Profile | ||
|
|
||
| This tutorial demonstrates how to build a production-grade, real-time user profiling system using Apache Fluss. You will learn how to map high-cardinality string identifiers (like emails) to compact integers and aggregate user behavior directly in the storage layer with exactly-once guarantees. | ||
|
|
||
|  | ||
|
|
||
| ## How the System Works | ||
|
|
||
| ### Core Concepts | ||
| * **Identity Mapping**: High-cardinality strings (Emails) ➔ Compact 32-bit `INT` UIDs. | ||
| * **Offloaded Aggregation**: Computation happens in Fluss TabletServers, keeping Flink state-free. | ||
| * **Optimized Storage**: Native [RoaringBitmap](https://roaringbitmap.org/) support for sub-second unique visitor (UV) counts. | ||
|
|
||
| ### Data Flow | ||
| 1. **Ingestion**: Raw event streams (e.g., clicks, page views) enter the system from a source like [Apache Kafka](https://kafka.apache.org/). | ||
| 2. **Mapping**: The [Apache Flink](https://flink.apache.org/) job performs a temporal lookup join against the `user_dict` table. If a user is new, the `insert-if-not-exists` hint triggers Fluss to automatically generate a unique `INT` UID using its Auto-Increment feature. | ||
| 3. **Merge**: The **Aggregation Merge Engine** updates clicks and bitmaps in the storage layer. | ||
| 4. **Recovery**: The **Undo Recovery** mechanism ensures exactly-once accuracy during failovers. | ||
|
|
||
| ## Environment Setup | ||
|
|
||
| ### Prerequisites | ||
|
|
||
| Before proceeding, ensure that [Docker](https://docs.docker.com/engine/install/) and the [Docker Compose plugin](https://docs.docker.com/compose/install/linux/) are installed. | ||
|
|
||
| ### Starting the Playground | ||
|
|
||
| 1. Create a working directory. | ||
| ```shell | ||
| mkdir fluss-user-profile | ||
| cd fluss-user-profile | ||
| ``` | ||
|
|
||
| 2. Set the Fluss version environment variable. | ||
| ```shell | ||
| # Set to 0.9.0 or 0.10-SNAPSHOT for latest features | ||
| export FLUSS_DOCKER_VERSION=0.10-SNAPSHOT | ||
| export FLINK_VERSION="1.20" | ||
| ``` | ||
| :::note | ||
| If you open a new terminal window, remember to re-run these export commands. | ||
| ::: | ||
|
|
||
| 3. Create a `lib` directory and download the required JARs. | ||
| ```shell | ||
| mkdir lib | ||
| # Download Flink Faker for data generation | ||
| curl -fL -o lib/flink-faker-0.5.3.jar https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar | ||
| # Download Fluss Connector | ||
| curl -fL -o "lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" \ | ||
| "https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/${FLUSS_DOCKER_VERSION}/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" | ||
| ``` | ||
| :::note | ||
| For now : cp ../fluss-flink/fluss-flink-1.20/target/fluss-flink-1.20-0.10-SNAPSHOT.jar lib/ | ||
| ::: | ||
|
|
||
| 4. Create a `docker-compose.yml` file. | ||
| ```yaml | ||
| services: | ||
| coordinator-server: | ||
| image: apache/fluss:${FLUSS_DOCKER_VERSION} | ||
| command: coordinatorServer | ||
| depends_on: | ||
| - zookeeper | ||
| environment: | ||
| - | | ||
| FLUSS_PROPERTIES= | ||
| zookeeper.address: zookeeper:2181 | ||
| bind.listeners: FLUSS://coordinator-server:9123 | ||
| remote.data.dir: /remote-data | ||
| volumes: | ||
| - fluss-remote-data:/remote-data | ||
| tablet-server: | ||
| image: apache/fluss:${FLUSS_DOCKER_VERSION} | ||
| command: tabletServer | ||
| depends_on: | ||
| - coordinator-server | ||
| environment: | ||
| - | | ||
| FLUSS_PROPERTIES= | ||
| zookeeper.address: zookeeper:2181 | ||
| bind.listeners: FLUSS://tablet-server:9123 | ||
| data.dir: /tmp/fluss/data | ||
| remote.data.dir: /remote-data | ||
| volumes: | ||
| - fluss-remote-data:/remote-data | ||
| zookeeper: | ||
| restart: always | ||
| image: zookeeper:3.9.2 | ||
| jobmanager: | ||
| image: flink:${FLINK_VERSION} | ||
| ports: | ||
| - "8081:8081" | ||
| environment: | ||
| - | | ||
| FLINK_PROPERTIES= | ||
| jobmanager.rpc.address: jobmanager | ||
| entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh jobmanager"] | ||
| volumes: | ||
| - ./lib:/tmp/lib | ||
| - fluss-remote-data:/remote-data | ||
| taskmanager: | ||
| image: flink:${FLINK_VERSION} | ||
| depends_on: | ||
| - jobmanager | ||
| environment: | ||
| - | | ||
| FLINK_PROPERTIES= | ||
| jobmanager.rpc.address: jobmanager | ||
| taskmanager.numberOfTaskSlots: 2 | ||
| entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh taskmanager"] | ||
| volumes: | ||
| - ./lib:/tmp/lib | ||
| - fluss-remote-data:/remote-data | ||
| sql-client: | ||
| image: flink:${FLINK_VERSION} | ||
| depends_on: | ||
| - jobmanager | ||
| environment: | ||
| - | | ||
| FLINK_PROPERTIES= | ||
| jobmanager.rpc.address: jobmanager | ||
| rest.address: jobmanager | ||
| entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh bin/sql-client.sh"] | ||
| volumes: | ||
| - ./lib:/tmp/lib | ||
| - fluss-remote-data:/remote-data | ||
|
|
||
| volumes: | ||
| fluss-remote-data: | ||
| ``` | ||
|
|
||
| 5. Start the environment. | ||
| ```shell | ||
| docker compose up -d | ||
| ``` | ||
|
|
||
| 6. Launch the Flink SQL Client. | ||
| ```shell | ||
| # Explicitly include the Fluss JAR in the classpath for the session | ||
| docker compose run sql-client bin/sql-client.sh -j /tmp/lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar | ||
| ``` | ||
|
|
||
| ## Step 1: Create the Fluss Catalog | ||
|
|
||
| In the SQL Client, create and use the Fluss catalog. | ||
|
|
||
| :::tip | ||
| Run SQL statements one by one to avoid errors. | ||
| ::: | ||
|
|
||
| ```sql | ||
| CREATE CATALOG fluss_catalog WITH ( | ||
| 'type' = 'fluss', | ||
| 'bootstrap.servers' = 'coordinator-server:9123' | ||
| ); | ||
| ``` | ||
|
|
||
| ```sql | ||
| USE CATALOG fluss_catalog; | ||
| ``` | ||
|
|
||
| ## Step 2: Create the User Dictionary Table | ||
|
|
||
| Create the `user_dict` table to map emails to UIDs. We use `auto-increment.fields` to automatically generate unique IDs. | ||
|
|
||
| ```sql | ||
| CREATE TABLE user_dict ( | ||
| email STRING, | ||
| uid INT, | ||
| PRIMARY KEY (email) NOT ENFORCED | ||
| ) WITH ( | ||
| 'connector' = 'fluss', | ||
| 'auto-increment.fields' = 'uid', | ||
| 'bucket.num' = '1' | ||
| ); | ||
| ``` | ||
|
|
||
| ## Step 3: Create the Aggregated Profile Table | ||
|
|
||
| Create the `user_profiles` table using the **Aggregation Merge Engine**. This pushes the aggregation logic (summing clicks, unioning bitmaps) to the Fluss TabletServers. | ||
|
|
||
| :::tip | ||
| We use `rbm64` ([RoaringBitmap](https://roaringbitmap.org/)) for efficient unique visitor counting. This allows you to store millions of unique IDs in a highly compressed format. | ||
| ::: | ||
|
|
||
| ```sql | ||
| CREATE TABLE user_profiles ( | ||
| profile_id INT, | ||
| unique_visitors BYTES, | ||
| total_clicks BIGINT, | ||
| PRIMARY KEY (profile_id) NOT ENFORCED | ||
| ) WITH ( | ||
| 'connector' = 'fluss', | ||
| 'table.merge-engine' = 'aggregation', | ||
| 'fields.unique_visitors.agg' = 'rbm64', | ||
| 'fields.total_clicks.agg' = 'sum', | ||
| 'bucket.num' = '1' | ||
| ); | ||
| ``` | ||
|
|
||
| ## Step 4: Ingest and Process Data | ||
|
|
||
| Create a temporary source table to simulate raw user events using the Faker connector. | ||
|
|
||
| ```sql | ||
| CREATE TEMPORARY TABLE raw_events ( | ||
| email STRING, | ||
| click_count INT, | ||
| profile_group_id INT, | ||
| proctime AS PROCTIME() | ||
| ) WITH ( | ||
| 'connector' = 'faker', | ||
| 'rows-per-second' = '1', | ||
| 'fields.email.expression' = '#{internet.emailAddress}', | ||
| 'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}', | ||
| 'fields.profile_group_id.expression' = '#{number.numberBetween ''1'',''5''}' | ||
| ); | ||
| ``` | ||
|
|
||
| Now, run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if an email is not found in `user_dict`, Fluss generates a new `uid` on the fly. | ||
|
|
||
| ```sql | ||
| INSERT INTO user_profiles | ||
| SELECT | ||
| d.uid, | ||
| -- Convert INT to BYTES for rbm64. | ||
| -- Note: In a real production job, you might use a UDF to ensure correct bitmap initialization. | ||
| CAST(CAST(d.uid AS STRING) AS BYTES), | ||
| CAST(e.click_count AS BIGINT) | ||
| FROM raw_events AS e | ||
| JOIN user_dict /*+ OPTIONS('lookup.insert-if-not-exists' = 'true') */ | ||
| FOR SYSTEM_TIME AS OF e.proctime AS d | ||
| ON e.email = d.email; | ||
| ``` | ||
|
|
||
| ## Step 5: Verify Results | ||
|
|
||
| You can query the `user_profiles` table to see the aggregated metrics updating in real-time. | ||
|
|
||
| ```sql | ||
| -- Set result mode to tableau for better visualization | ||
| SET 'sql-client.execution.result-mode' = 'tableau'; | ||
| ``` | ||
|
|
||
| ```sql | ||
| --user profile | ||
| SELECT | ||
| profile_id, | ||
| total_clicks, | ||
| unique_visitors | ||
| FROM user_profiles; | ||
| ``` | ||
|
|
||
| You should see rows with `profile_id`, `total_clicks`, and `unique_visitors` (as raw bytes representing the bitmap). The `total_clicks` will increase as more events arrive for the same user. | ||
|
|
||
| To verify the dictionary mapping: | ||
| ```sql | ||
| SELECT * FROM user_dict; | ||
| ``` | ||
|
|
||
| ## Clean Up | ||
|
|
||
| Exit the SQL Client (`exit;`) and stop the Docker containers. | ||
|
|
||
| ```shell | ||
| docker compose down -v | ||
| ``` | ||
|
|
||
| ## Architectural Benefits | ||
|
|
||
| * **Stateless Flink Jobs:** Offloading the dictionary and aggregation state to Fluss makes Flink jobs lightweight, enabling faster checkpoints and nearly instant recovery. | ||
| * **Compact Storage:** Using `INT` UIDs instead of `STRING` emails reduces the memory footprint for deduplication tasks by up to 90%. | ||
| * **Exactly-Once Accuracy:** Even during failovers, the **Undo Recovery** mechanism ensures replayed data does not result in double-counting, maintaining perfect accuracy. | ||
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we need the
to_rbmandfrom_rbmFlink UDFs to process the data correctly. Without these functions, the results would be meaningless and users would not understand the purpose of this feature.However, shipping Flink UDFs falls outside the scope of the Fluss project. I will coordinate with members of the Flink community to contribute these UDFs and identify an appropriate location to open source and publish the UDF JARs. Once available, we can reference these functions in our documentation and examples.
That said, the Lunar New Year holiday is approaching in China, so we likely will not be able to start this work until March. Until then, we may need to suspend this PR. Thank you for your quick updating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly. we need such functions as RB_CARDINALITY and RB_OR_AGG for aggregating the result bitmap.