Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 104 additions & 87 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@
HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name"
HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive"

LOCK_ENABLED = "lock-enabled"
DEFAULT_LOCK_ENABLED = True

LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
LOCK_CHECK_RETRIES = "lock-check-retries"
Expand Down Expand Up @@ -301,6 +304,7 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = self._create_hive_client(properties)

self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED)
self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
self._lock_check_retries = property_as_float(
Expand Down Expand Up @@ -499,6 +503,91 @@ def _do_wait_for_lock() -> LockResponse:

return _do_wait_for_lock()

def _do_commit(
self, open_client: Client, table_identifier: Identifier, database_name: str, table_name: str,
requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...],
) -> CommitTableResponse:
"""Perform the actual commit logic (get table, update, write metadata, alter/create in HMS).

This method contains the core commit logic, separated from locking concerns.
"""
hive_table: HiveTable | None
current_table: Table | None
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
current_table = self._convert_hive_into_iceberg(hive_table)
except NoSuchTableError:
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if hive_table and current_table:
# Table exists, update it.

# Note on table properties:
# - Iceberg table properties are stored in both HMS and Iceberg metadata JSON.
# - Updates are reflected in both locations
# - Existing HMS table properties (set by external systems like Hive/Spark) are preserved.
#
# While it is possible to modify HMS table properties through this API, it is not recommended:
# - Mixing HMS-specific properties in Iceberg metadata can cause confusion
# - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific)
# - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg
# (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata,
# and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS)
new_iceberg_properties = _construct_parameters(
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
metadata_properties=updated_staged_table.properties,
)
# Detect properties that were removed from Iceberg metadata
deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys()

# Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties
existing_hms_parameters = dict(hive_table.parameters or {})
for key in deleted_iceberg_properties:
existing_hms_parameters.pop(key, None)
existing_hms_parameters.update(new_iceberg_properties)
hive_table.parameters = existing_hms_parameters

# Update hive's schema and properties
hive_table.sd = _construct_hive_storage_descriptor(
updated_staged_table.schema(),
updated_staged_table.location(),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
)
open_client.alter_table_with_environment_context(
dbname=database_name,
tbl_name=table_name,
new_tbl=hive_table,
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
)
else:
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)

return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
Expand All @@ -521,95 +610,23 @@ def commit_table(
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))
if self._lock_enabled:
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))

try:
if lock.state != LockState.ACQUIRED:
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

hive_table: HiveTable | None
current_table: Table | None
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
current_table = self._convert_hive_into_iceberg(hive_table)
except NoSuchTableError:
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if hive_table and current_table:
# Table exists, update it.

# Note on table properties:
# - Iceberg table properties are stored in both HMS and Iceberg metadata JSON.
# - Updates are reflected in both locations
# - Existing HMS table properties (set by external systems like Hive/Spark) are preserved.
#
# While it is possible to modify HMS table properties through this API, it is not recommended:
# - Mixing HMS-specific properties in Iceberg metadata can cause confusion
# - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific)
# - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg
# (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata,
# and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS)
new_iceberg_properties = _construct_parameters(
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
metadata_properties=updated_staged_table.properties,
)
# Detect properties that were removed from Iceberg metadata
deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys()

# Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties
existing_hms_parameters = dict(hive_table.parameters or {})
for key in deleted_iceberg_properties:
existing_hms_parameters.pop(key, None)
existing_hms_parameters.update(new_iceberg_properties)
hive_table.parameters = existing_hms_parameters

# Update hive's schema and properties
hive_table.sd = _construct_hive_storage_descriptor(
updated_staged_table.schema(),
updated_staged_table.location(),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
)
open_client.alter_table_with_environment_context(
dbname=database_name,
tbl_name=table_name,
new_tbl=hive_table,
environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}),
)
else:
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(database_name, table_name),
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)
if lock.state != LockState.ACQUIRED:
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}")

return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))
else:
return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates)

def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and return the table instance.
Expand Down
67 changes: 67 additions & 0 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
LOCK_CHECK_MAX_WAIT_TIME,
LOCK_CHECK_MIN_WAIT_TIME,
LOCK_CHECK_RETRIES,
LOCK_ENABLED,
HiveCatalog,
_construct_hive_storage_descriptor,
_HiveClient,
Expand Down Expand Up @@ -1407,3 +1408,69 @@ def test_create_hive_client_with_kerberos_using_context_manager(
# closing and re-opening work as expected.
with client as open_client:
assert open_client._iprot.trans.isOpen()


def test_lock_enabled_defaults_to_true() -> None:
"""Verify that lock-enabled defaults to True for backward compatibility."""
prop = {"uri": HIVE_METASTORE_FAKE_URL}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
assert catalog._lock_enabled is True


def test_lock_enabled_can_be_disabled() -> None:
"""Verify that lock-enabled can be set to false."""
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
assert catalog._lock_enabled is False


def test_commit_table_skips_locking_when_lock_disabled() -> None:
"""When lock-enabled is false, commit_table must not call lock, check_lock, or unlock."""
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
catalog._client = MagicMock()

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

# The core commit logic should still be called
mock_do_commit.assert_called_once()

# But no locking operations should have been performed
catalog._client.__enter__().lock.assert_not_called()
catalog._client.__enter__().check_lock.assert_not_called()
catalog._client.__enter__().unlock.assert_not_called()


def test_commit_table_uses_locking_when_lock_enabled() -> None:
"""When lock-enabled is true (default), commit_table must call lock and unlock."""
lockid = 99999
prop = {"uri": HIVE_METASTORE_FAKE_URL}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)

mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.lock.return_value = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
catalog._client = mock_client

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

# Locking operations should have been performed
mock_client.lock.assert_called_once()
mock_client.unlock.assert_called_once()
# The core commit logic should still be called
mock_do_commit.assert_called_once()
Loading