Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
PlanSubmitted,
PlanTableScanRequest,
ScanTasks,
StorageCredential,
)
from pyiceberg.exceptions import (
AuthorizationExpiredError,
Expand Down Expand Up @@ -256,6 +257,7 @@ class TableResponse(IcebergBaseModel):
metadata_location: str | None = Field(alias="metadata-location", default=None)
metadata: TableMetadata
config: Properties = Field(default_factory=dict)
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list)


class CreateTableRequest(IcebergBaseModel):
Expand Down Expand Up @@ -391,6 +393,26 @@ def _create_session(self) -> Session:

return session

@staticmethod
def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties:
"""Resolve the best-matching storage credential by longest prefix match.

Mirrors the Java implementation in S3FileIO.clientForStoragePath() which iterates
over storage credential prefixes and selects the one with the longest match.

See: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
"""
if not storage_credentials or not location:
return {}

best_match: StorageCredential | None = None
for cred in storage_credentials:
if location.startswith(cred.prefix):
if best_match is None or len(cred.prefix) > len(best_match.prefix):
best_match = cred

return best_match.config if best_match else {}

def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
merged_properties = {**self.properties, **properties}
if self._auth_manager:
Expand Down Expand Up @@ -729,24 +751,34 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
session.mount(self.uri, SigV4Adapter(**self.properties))

def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
# Per Iceberg spec: storage-credentials take precedence over config
credential_config = self._resolve_storage_credentials(
table_response.storage_credentials, table_response.metadata_location
)
return Table(
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
{**table_response.metadata.properties, **table_response.config, **credential_config},
table_response.metadata_location,
),
catalog=self,
config=table_response.config,
)

def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> StagedTable:
# Per Iceberg spec: storage-credentials take precedence over config
credential_config = self._resolve_storage_credentials(
table_response.storage_credentials, table_response.metadata_location
)
return StagedTable(
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
{**table_response.metadata.properties, **table_response.config, **credential_config},
table_response.metadata_location,
),
catalog=self,
)
Expand Down
82 changes: 82 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2351,3 +2351,85 @@ def test_table_uuid_check_on_refresh(rest_mock: Mocker, example_table_metadata_v
assert "Table UUID does not match" in str(exc_info.value)
assert f"current={original_uuid}" in str(exc_info.value)
assert f"refreshed={different_uuid}" in str(exc_info.value)


def test_resolve_storage_credentials_longest_prefix_wins() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

credentials = [
StorageCredential(prefix="s3://warehouse/", config={"s3.access-key-id": "short-prefix-key"}),
StorageCredential(prefix="s3://warehouse/database/table", config={"s3.access-key-id": "long-prefix-key"}),
]
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json")
assert result == {"s3.access-key-id": "long-prefix-key"}


def test_resolve_storage_credentials_no_match() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

credentials = [
StorageCredential(prefix="s3://other-bucket/", config={"s3.access-key-id": "no-match"}),
]
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json")
assert result == {}


def test_resolve_storage_credentials_empty() -> None:
assert RestCatalog._resolve_storage_credentials([], "s3://warehouse/foo") == {}
assert RestCatalog._resolve_storage_credentials([], None) == {}


def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json={
"metadata-location": metadata_location,
"metadata": example_table_metadata_with_snapshot_v1,
"config": {
"s3.access-key-id": "from-config",
"s3.secret-access-key": "from-config-secret",
},
"storage-credentials": [
{
"prefix": "s3://warehouse/database/table",
"config": {
"s3.access-key-id": "vended-key",
"s3.secret-access-key": "vended-secret",
"s3.session-token": "vended-token",
},
}
],
},
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("fokko", "table"))

# Storage credentials should override config values
assert table.io.properties["s3.access-key-id"] == "vended-key"
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
assert table.io.properties["s3.session-token"] == "vended-token"


def test_load_table_without_storage_credentials(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
actual = catalog.load_table(("fokko", "table"))
expected = Table(
identifier=("fokko", "table"),
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected
Loading