Skip to content

Service bus std#14

Open
JC-wk wants to merge 12 commits intomainfrom
service-bus-std
Open

Service bus std#14
JC-wk wants to merge 12 commits intomainfrom
service-bus-std

Conversation

@JC-wk
Copy link
Copy Markdown
Owner

@JC-wk JC-wk commented Feb 26, 2026

Description
This PR introduces the Claim-Check pattern for Azure Service Bus to handle messages that exceed the size limits of the Standard SKU (256 KB). While the
Premium SKU supports up to 100 MB, large messages (e.g., extensive deployment logs or error traces) can still pose challenges for downstream consumers
like Cosmos DB.

This implementation allows components to offload large message payloads to an Azure Storage account, sending only a reference (claim-check) in the
Service Bus message.

Key Changes

  • Infrastructure Updates:
    • Updated Terraform to create a sb-messages storage container.
    • Added RBAC roles (Storage Blob Data Contributor) for API and Processor identities to access the offload storage.
    • Exposed SERVICE_BUS_MESSAGES_STORAGE_ACCOUNT_NAME and other configurations to the API and Processors.
  • Component Enhancements:
    • API App: Automatically offloads large resource request messages.
    • Resource Processor: Consumes claim-check messages and downloads payloads before processing.
    • Airlock Processor: Updated to use the async-compatible claim-check helpers and improved event loop performance by converting I/O operations to
      async.
  • Configuration:
    • Introduced SERVICE_BUS_MESSAGE_OFFLOAD_THRESHOLD (default: 200KB) to control when offloading occurs.
    • Made the offload container name configurable via SERVICE_BUS_MESSAGES_CONTAINER_NAME.

Validation Results

  • Verified that Standard Service Bus successfully offloads and processes messages > 256KB.
  • Confirmed that Premium Service Bus remains fully compatible and its default behavior is unchanged.
  • Consolidated unit tests for api_app and airlock_processor pass with the refactored shared logic.
  • Manual verification of async performance improvements in the Airlock Processor.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @JC-wk, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the message handling capabilities by introducing a 'claim check' pattern for Service Bus and Event Grid, which intelligently offloads large payloads to blob storage. It also provides flexibility by making the Service Bus SKU configurable, allowing environments to balance cost and performance. These changes ensure more robust and scalable inter-service communication.

Highlights

  • Service Bus Message Offloading (Claim Check Pattern): Implemented a 'claim check' pattern for Service Bus messages and Event Grid events. Large message payloads are now automatically offloaded to Azure Blob Storage, with the message itself containing a reference to the blob, preventing message size limits from being hit.
  • Configurable Service Bus SKU: The Azure Service Bus SKU is now configurable between 'Standard' and 'Premium'. This impacts network rule sets, private endpoint availability, and maximum message sizes, allowing for cost optimization in dev/test environments.
  • Asynchronous Operations: Many functions across the Airlock Processor, API App, and Resource Processor have been converted to asynchronous operations (async/await) to improve performance and responsiveness, especially when interacting with external services like Blob Storage.
  • Terraform Infrastructure Updates: Terraform configurations were updated to support the new configurable Service Bus SKU, provision a dedicated storage container for offloaded messages, and assign necessary 'Storage Blob Data Contributor' roles to relevant identities (API, Resource Processor, Airlock).
Changelog
  • airlock_processor/BlobCreatedTrigger/init.py
    • Imported 'sb_helpers' for message processing.
    • Converted 'main' and 'send_delete_event' functions to asynchronous.
    • Updated message processing to retrieve payload using 'sb_helpers.receive_message_payload'.
    • Modified event output setting to wrap data for offloading using 'sb_helpers.wrap_payload_for_offloading'.
  • airlock_processor/DataDeletionTrigger/init.py
    • Imported 'sb_helpers' for message processing.
    • Converted 'delete_blob_and_container_if_last_blob' and 'main' functions to asynchronous.
    • Updated blob operations to use asynchronous calls.
    • Modified message processing to retrieve payload using 'sb_helpers.receive_message_payload'.
  • airlock_processor/ScanResultTrigger/init.py
    • Imported 'sb_helpers' for message processing.
    • Converted 'main' function to asynchronous.
    • Updated message processing to retrieve payload using 'sb_helpers.receive_message_payload'.
    • Modified event output setting to wrap data for offloading using 'sb_helpers.wrap_payload_for_offloading'.
  • airlock_processor/StatusChangedQueueTrigger/init.py
    • Imported 'sb_helpers' for message processing.
    • Converted several functions ('main', 'handle_status_changed', 'extract_properties', 'set_output_event_to_report_failure', 'set_output_event_to_report_request_files', 'set_output_event_to_trigger_container_deletion') to asynchronous.
    • Updated message processing to retrieve payload using 'sb_helpers.receive_message_payload'.
    • Modified event output settings to wrap data for offloading using 'sb_helpers.wrap_payload_for_offloading'.
  • airlock_processor/shared_code/constants.py
    • Added new constants for Service Bus message offloading configuration.
  • airlock_processor/shared_code/sb_helpers.py
    • Added new file containing asynchronous helper functions for Service Bus message offloading and retrieval.
  • api_app/core/config.py
    • Added new configuration variables for Service Bus message offloading settings.
  • api_app/event_grid/helpers.py
    • Implemented claim check pattern for publishing Event Grid events, offloading large event data to blob storage.
  • api_app/service_bus/airlock_request_status_update.py
    • Imported 'receive_message_payload' and updated message processing to use it.
  • api_app/service_bus/deployment_status_updater.py
    • Imported 'receive_message_payload' and updated message processing to use it.
  • api_app/service_bus/helpers.py
    • Implemented claim check pattern for sending Service Bus messages, offloading large message bodies to blob storage.
    • Added helper functions for offloading, receiving, and downloading Service Bus message payloads from blob storage.
  • config.sample.yaml
    • Added a commented-out option for 'service_bus_sku: Premium'.
  • config_schema.json
    • Added 'service_bus_sku' property with 'Standard' and 'Premium' as allowed enum values.
  • core/terraform/airlock/airlock_processor.tf
    • Added 'SERVICE_BUS_MESSAGES_STORAGE_ACCOUNT_NAME' environment variable to the Airlock Function App.
  • core/terraform/airlock/outputs.tf
    • Added 'airlock_id_principal_id' as an output.
  • core/terraform/airlock/variables.tf
    • Added 'service_bus_messages_storage_account_name' variable.
  • core/terraform/api-webapp.tf
    • Added 'SERVICE_BUS_MESSAGES_STORAGE_ACCOUNT_NAME' environment variable to the API Web App.
  • core/terraform/main.tf
    • Passed 'service_bus_messages_storage_account_name' to the 'airlock_resources' and 'resource_processor_vmss_porter' modules.
    • Passed 'storage_endpoint_suffix' to the 'resource_processor_vmss_porter' module.
  • core/terraform/modules_move_definitions.tf
    • Removed a 'moved' block related to 'azurerm_servicebus_namespace_network_rule_set'.
  • core/terraform/network/network.tf
    • Added 'Microsoft.ServiceBus' service endpoint to 'app_service_subnet', 'resource_processor_subnet', and 'airlock_events_subnet'.
  • core/terraform/resource_processor/vmss_porter/cloud-config.yaml
    • Added 'SERVICE_BUS_MESSAGES_STORAGE_ACCOUNT_NAME', 'STORAGE_ENDPOINT_SUFFIX', and 'SERVICE_BUS_MESSAGE_OFFLOAD_THRESHOLD' environment variables.
  • core/terraform/resource_processor/vmss_porter/locals.tf
    • Added 'service_bus_messages_storage_account_name' and 'storage_endpoint_suffix' to local variables.
  • core/terraform/resource_processor/vmss_porter/outputs.tf
    • Added 'vmss_msi_principal_id' as an output.
  • core/terraform/resource_processor/vmss_porter/variables.tf
    • Added 'service_bus_messages_storage_account_name' and 'storage_endpoint_suffix' variables.
  • core/terraform/servicebus.tf
    • Made Service Bus SKU configurable via 'var.service_bus_sku'.
    • Adjusted 'premium_messaging_partitions', 'capacity', and 'max_message_size_in_kilobytes' based on the selected SKU.
    • Dynamically configured 'network_rule_set' to include subnets based on SKU, enabling direct access for Standard SKU where private endpoints are not available.
    • Made the 'azurerm_private_endpoint.sbpe' resource conditional on the 'Premium' SKU.
  • core/terraform/storage.tf
    • Added 'azurerm_storage_container' for 'sb-messages' to store offloaded Service Bus messages.
    • Added 'azurerm_role_assignment' resources to grant 'Storage Blob Data Contributor' role to API, Resource Processor, and Airlock identities on the storage account.
  • core/terraform/variables.tf
    • Added 'service_bus_sku' variable with a default of 'Premium' and validation for allowed values.
  • docs/tre-admins/environment-variables.md
    • Added documentation for the new 'SERVICE_BUS_SKU' environment variable.
  • resource_processor/shared/config.py
    • Added 'service_bus_messages_storage_account_name', 'storage_endpoint_suffix', and 'service_bus_message_offload_threshold' to the configuration.
  • resource_processor/shared/sb_helpers.py
    • Added new file containing asynchronous helper functions for Service Bus message sending, offloading, and retrieval.
  • resource_processor/vmss_porter/requirements.txt
    • Added 'azure-storage-blob' dependency.
  • resource_processor/vmss_porter/runner.py
    • Imported 'sb_helpers'.
    • Updated 'receive_message' to use 'sb_helpers.receive_message_payload' for message retrieval.
    • Updated 'invoke_porter_action' to use 'sb_helpers.send_message' for sending status updates.
Activity
  • The pull request introduces a 'claim check' pattern to handle large Service Bus messages and Event Grid events by offloading them to Azure Blob Storage.
  • It makes the Service Bus SKU configurable (Standard or Premium), which affects network configurations and message size limits.
  • Existing Python Azure Functions (BlobCreatedTrigger, DataDeletionTrigger, ScanResultTrigger, StatusChangedQueueTrigger) have been refactored to use asynchronous operations and the new Service Bus helper functions for message processing.
  • The API application's Event Grid and Service Bus helpers have been updated to implement the claim check pattern for both sending and receiving messages/events.
  • Terraform configurations have been extensively modified to provision the necessary storage account and permissions for message offloading, and to dynamically configure the Service Bus namespace based on the chosen SKU.
  • Documentation for environment variables has been updated to reflect the new Service Bus SKU option.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

gemini-code-assist[bot]

This comment was marked as outdated.

@JC-wk
Copy link
Copy Markdown
Owner Author

JC-wk commented Feb 27, 2026

/gemini review

gemini-code-assist[bot]

This comment was marked as outdated.

@JC-wk
Copy link
Copy Markdown
Owner Author

JC-wk commented Feb 27, 2026

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant changes to support Azure Service Bus Standard SKU, primarily by refactoring the codebase to be asynchronous and implementing the claim-check pattern for handling large messages. The introduction of a supervisor pattern for Service Bus consumers is a commendable improvement for service reliability. However, I've identified a critical configuration issue in the Terraform code related to the Service Bus queue size that will cause deployment failures with the Standard SKU. Additionally, there's a high-severity maintainability concern due to significant code duplication of the new claim-check helper logic across different application components, which should be refactored into a shared library.

Comment on lines +58 to +92
async def _offload_to_blob(content: str) -> str:
account_url = f"https://{config.SERVICE_BUS_MESSAGES_STORAGE_ACCOUNT_NAME}.blob.{config.STORAGE_ENDPOINT_SUFFIX}"
async with credentials.get_credential_async_context() as credential:
blob_service_client = BlobServiceClient(account_url, credential=credential)
async with blob_service_client:
blob_name = f"msg-{uuid.uuid4()}.json"
blob_client = blob_service_client.get_blob_client(container=config.SERVICE_BUS_MESSAGES_CONTAINER_NAME, blob=blob_name)
await blob_client.upload_blob(content)
return f"{config.SERVICE_BUS_MESSAGES_CONTAINER_NAME}/{blob_name}"


async def receive_message_payload(msg: ServiceBusMessage) -> str:
body_str = b"".join(msg.body).decode("utf-8")
try:
body_json = json.loads(body_str)
if "claim_check" in body_json:
blob_path = body_json["claim_check"]
logger.info(f"Message has claim check: {blob_path}. Downloading from blob storage.")
return await _download_from_blob(blob_path)
except json.JSONDecodeError:
pass

return body_str


async def _download_from_blob(blob_path: str) -> str:
account_url = f"https://{config.SERVICE_BUS_MESSAGES_STORAGE_ACCOUNT_NAME}.blob.{config.STORAGE_ENDPOINT_SUFFIX}"
container_name, blob_name = blob_path.split("/", 1)
async with credentials.get_credential_async_context() as credential:
blob_service_client = BlobServiceClient(account_url, credential=credential)
async with blob_service_client:
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
download_stream = await blob_client.download_blob()
content = await download_stream.readall()
return content.decode("utf-8")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is significant code duplication for the Service Bus claim-check helper functions (_offload_to_blob, receive_message_payload, _download_from_blob) across multiple components:

  • api_app/service_bus/helpers.py
  • airlock_processor/shared_code/sb_helpers.py
  • resource_processor/shared/sb_helpers.py

This duplication will increase maintenance overhead and could lead to inconsistencies in the future. Please consider refactoring this shared logic into a common library that all components can import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant