Conversation
…he#664) - Event-driven SQS telemetry: S3 notifications to SQS, near-instant updates - Buffered S3 persistence: SpooledTemporaryFile fixes seek errors on large files - Native BedrockAction and BedrockStreamingAction for Bedrock integration - Terraform module: S3, SQS, IAM with dev/prod tfvars and tutorial
… fixed build error
This reverts commit 61673cd.
995a1ba to
f593ee3
Compare
| if s3_key and s3_key.endswith(".jsonl"): | ||
| await self._handle_s3_event(s3_key, event_time) | ||
|
|
||
| await sqs_client.delete_message( |
There was a problem hiding this comment.
nit: s3_key = None and event_time = None are dead. They're assigned here but never read, the actual values come from s3_keys_with_times on line 855.
|
|
||
| async def indexing_jobs( | ||
| self, offset: int = 0, limit: int = 100, filter_empty: bool = True | ||
| ) -> Sequence[schema.IndexingJob]: |
There was a problem hiding this comment.
blocker: if _handle_s3_event raises on the Nth record of a multi-record SQS message, records 0..N-1 get re-indexed on retry with no dedup. Make _handle_s3_event idempotent (check if s3_path exists in LogFile before insert).
| self._name = name | ||
| self._region = region | ||
| self._guardrail_id = guardrail_id | ||
| self._guardrail_version = guardrail_version or "DRAFT" |
There was a problem hiding this comment.
guardrail_version or "DRAFT" silently defaults to DRAFT when only guardrail_id is set. Is that intentional? Feels risky for prod, someone could set a guardrail ID and not realize they're running against an unpublished draft. Same at line 191.
| return result, new_state | ||
|
|
||
|
|
||
| class BedrockStreamingAction(StreamingAction): |
There was a problem hiding this comment.
IMHO BedrockStreamingAction is a copy-paste of BedrockAction. __init__, _get_client, reads/writes/name properties are all identical, ~70 lines duplicated. Pull them into a _BedrockBase and let each subclass just implement its execution method.
| self._region = region | ||
| self._guardrail_id = guardrail_id | ||
| self._guardrail_version = guardrail_version or "DRAFT" | ||
| self._inference_config = inference_config or {"maxTokens": 4096} |
There was a problem hiding this comment.
just a small nit: inference_config or {"maxTokens": 4096} means inference_config={} gives you the default because empty dict is falsy. Use if inference_config is not None if you want to allow empty configs.
| - name: Install dependencies | ||
| run: | | ||
| python -m pip install -e ".[tests,tracking-client,graphviz]" | ||
| python -m pip install -e ".[tests,tracking-client,graphviz,tracking-server-s3]" |
There was a problem hiding this comment.
tracking-server-s3 in the main CI install and apache-burr[bedrock] in [tests] (pyproject.toml:101) means every contributor now pulls boto3, aiobotocore, tortoise-orm, aerich even for unrelated PRs. Keep AWS deps in a separate CI job and test group, like the existing test-persister-dbs pattern.
| """ | ||
| if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not self._sqs_queue_url: | ||
| logger.info("Event consumer not configured, skipping") | ||
| return |
There was a problem hiding this comment.
Is the data/{project}/... prefix structure guaranteed for all event sources? WDYT about getting the project name from data_file.prefix (you already parse it via DataFile.from_path one line above) instead of a raw split?
| response = await sqs_client.receive_message( | ||
| QueueUrl=self._sqs_queue_url, | ||
| MaxNumberOfMessages=10, | ||
| WaitTimeSeconds=self._sqs_wait_time_seconds, |
There was a problem hiding this comment.
Nice, auto-creating the project from the S3 event is a good call for the event-driven flow.
| }) | ||
| } | ||
|
|
||
| resource "aws_sqs_queue_redrive_policy" "main" { |
There was a problem hiding this comment.
+1 on the DLQ + redrive policy setup, clean separation into modules.
[Short description explaining the high-level reason for the pull request]
Changes
How I tested this
Notes
Checklist