diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 802be28565..b4eec8ef86 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -40,6 +40,7 @@ PlanSubmitted, PlanTableScanRequest, ScanTasks, + StorageCredential, ) from pyiceberg.exceptions import ( AuthorizationExpiredError, @@ -256,6 +257,7 @@ class TableResponse(IcebergBaseModel): metadata_location: str | None = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) + storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list) class CreateTableRequest(IcebergBaseModel): @@ -391,6 +393,26 @@ def _create_session(self) -> Session: return session + @staticmethod + def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties: + """Resolve the best-matching storage credential by longest prefix match. + + Mirrors the Java implementation in S3FileIO.clientForStoragePath() which iterates + over storage credential prefixes and selects the one with the longest match. + + See: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java + """ + if not storage_credentials or not location: + return {} + + best_match: StorageCredential | None = None + for cred in storage_credentials: + if location.startswith(cred.prefix): + if best_match is None or len(cred.prefix) > len(best_match.prefix): + best_match = cred + + return best_match.config if best_match else {} + def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO: merged_properties = {**self.properties, **properties} if self._auth_manager: @@ -729,24 +751,34 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin session.mount(self.uri, SigV4Adapter(**self.properties)) def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table: + # Per Iceberg spec: storage-credentials take precedence over config + credential_config = self._resolve_storage_credentials( + table_response.storage_credentials, table_response.metadata_location + ) return Table( identifier=identifier_tuple, metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + {**table_response.metadata.properties, **table_response.config, **credential_config}, + table_response.metadata_location, ), catalog=self, config=table_response.config, ) def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> StagedTable: + # Per Iceberg spec: storage-credentials take precedence over config + credential_config = self._resolve_storage_credentials( + table_response.storage_credentials, table_response.metadata_location + ) return StagedTable( identifier=identifier_tuple, metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + {**table_response.metadata.properties, **table_response.config, **credential_config}, + table_response.metadata_location, ), catalog=self, ) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 9fb1fa9af5..ad137925e3 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -2351,3 +2351,85 @@ def test_table_uuid_check_on_refresh(rest_mock: Mocker, example_table_metadata_v assert "Table UUID does not match" in str(exc_info.value) assert f"current={original_uuid}" in str(exc_info.value) assert f"refreshed={different_uuid}" in str(exc_info.value) + + +def test_resolve_storage_credentials_longest_prefix_wins() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + credentials = [ + StorageCredential(prefix="s3://warehouse/", config={"s3.access-key-id": "short-prefix-key"}), + StorageCredential(prefix="s3://warehouse/database/table", config={"s3.access-key-id": "long-prefix-key"}), + ] + result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json") + assert result == {"s3.access-key-id": "long-prefix-key"} + + +def test_resolve_storage_credentials_no_match() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + credentials = [ + StorageCredential(prefix="s3://other-bucket/", config={"s3.access-key-id": "no-match"}), + ] + result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json") + assert result == {} + + +def test_resolve_storage_credentials_empty() -> None: + assert RestCatalog._resolve_storage_credentials([], "s3://warehouse/foo") == {} + assert RestCatalog._resolve_storage_credentials([], None) == {} + + +def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None: + metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json" + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json={ + "metadata-location": metadata_location, + "metadata": example_table_metadata_with_snapshot_v1, + "config": { + "s3.access-key-id": "from-config", + "s3.secret-access-key": "from-config-secret", + }, + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/table", + "config": { + "s3.access-key-id": "vended-key", + "s3.secret-access-key": "vended-secret", + "s3.session-token": "vended-token", + }, + } + ], + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) + + # Storage credentials should override config values + assert table.io.properties["s3.access-key-id"] == "vended-key" + assert table.io.properties["s3.secret-access-key"] == "vended-secret" + assert table.io.properties["s3.session-token"] == "vended-token" + + +def test_load_table_without_storage_credentials( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] +) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + actual = catalog.load_table(("fokko", "table")) + expected = Table( + identifier=("fokko", "table"), + metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]), + io=load_file_io(), + catalog=catalog, + ) + assert actual.metadata.model_dump() == expected.metadata.model_dump() + assert actual == expected