Skip to content

Bedrock integration backup#677

Open
vaquarkhan wants to merge 9 commits intoapache:mainfrom
vaquarkhan:bedrock-integration-backup
Open

Bedrock integration backup#677
vaquarkhan wants to merge 9 commits intoapache:mainfrom
vaquarkhan:bedrock-integration-backup

Conversation

@vaquarkhan
Copy link

[Short description explaining the high-level reason for the pull request]

Changes

How I tested this

Notes

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

@vaquarkhan vaquarkhan force-pushed the bedrock-integration-backup branch from 995a1ba to f593ee3 Compare March 16, 2026 08:28
Copy link
Collaborator

@andreahlert andreahlert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two features in one PR, would've been easier to review split. Bedrock side is straightforward, the SQS consumer needs work.

if s3_key and s3_key.endswith(".jsonl"):
await self._handle_s3_event(s3_key, event_time)

await sqs_client.delete_message(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s3_key = None and event_time = None are dead. They're assigned here but never read, the actual values come from s3_keys_with_times on line 855.


async def indexing_jobs(
self, offset: int = 0, limit: int = 100, filter_empty: bool = True
) -> Sequence[schema.IndexingJob]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocker: if _handle_s3_event raises on the Nth record of a multi-record SQS message, records 0..N-1 get re-indexed on retry with no dedup. Make _handle_s3_event idempotent (check if s3_path exists in LogFile before insert).

self._name = name
self._region = region
self._guardrail_id = guardrail_id
self._guardrail_version = guardrail_version or "DRAFT"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guardrail_version or "DRAFT" silently defaults to DRAFT when only guardrail_id is set. Is that intentional? Feels risky for prod, someone could set a guardrail ID and not realize they're running against an unpublished draft. Same at line 191.

return result, new_state


class BedrockStreamingAction(StreamingAction):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO BedrockStreamingAction is a copy-paste of BedrockAction. __init__, _get_client, reads/writes/name properties are all identical, ~70 lines duplicated. Pull them into a _BedrockBase and let each subclass just implement its execution method.

self._region = region
self._guardrail_id = guardrail_id
self._guardrail_version = guardrail_version or "DRAFT"
self._inference_config = inference_config or {"maxTokens": 4096}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a small nit: inference_config or {"maxTokens": 4096} means inference_config={} gives you the default because empty dict is falsy. Use if inference_config is not None if you want to allow empty configs.

- name: Install dependencies
run: |
python -m pip install -e ".[tests,tracking-client,graphviz]"
python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking-server-s3 in the main CI install and apache-burr[bedrock] in [tests] (pyproject.toml:101) means every contributor now pulls boto3, aiobotocore, tortoise-orm, aerich even for unrelated PRs. Keep AWS deps in a separate CI job and test group, like the existing test-persister-dbs pattern.

"""
if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not self._sqs_queue_url:
logger.info("Event consumer not configured, skipping")
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the data/{project}/... prefix structure guaranteed for all event sources? WDYT about getting the project name from data_file.prefix (you already parse it via DataFile.from_path one line above) instead of a raw split?

response = await sqs_client.receive_message(
QueueUrl=self._sqs_queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=self._sqs_wait_time_seconds,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, auto-creating the project from the S3 event is a good call for the event-driven flow.

})
}

resource "aws_sqs_queue_redrive_policy" "main" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the DLQ + redrive policy setup, clean separation into modules.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants