From 0677f8eda483ec8be840e3931296911161bc66a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Thu, 12 Feb 2026 11:02:03 +0100 Subject: [PATCH] Added lock disable --- pyiceberg/catalog/hive.py | 191 ++++++++++++++++++++----------------- tests/catalog/test_hive.py | 67 +++++++++++++ 2 files changed, 171 insertions(+), 87 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1bec186ca8..6a3292c15f 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -127,6 +127,9 @@ HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name" HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive" +LOCK_ENABLED = "lock-enabled" +DEFAULT_LOCK_ENABLED = True + LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time" LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time" LOCK_CHECK_RETRIES = "lock-check-retries" @@ -301,6 +304,7 @@ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) self._client = self._create_hive_client(properties) + self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED) self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME) self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME) self._lock_check_retries = property_as_float( @@ -499,6 +503,91 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() + def _do_commit( + self, open_client: Client, table_identifier: Identifier, database_name: str, table_name: str, + requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...], + ) -> CommitTableResponse: + """Perform the actual commit logic (get table, update, write metadata, alter/create in HMS). + + This method contains the core commit logic, separated from locking concerns. + """ + hive_table: HiveTable | None + current_table: Table | None + try: + hive_table = self._get_hive_table(open_client, database_name, table_name) + current_table = self._convert_hive_into_iceberg(hive_table) + except NoSuchTableError: + hive_table = None + current_table = None + + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if current_table and updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + ) + + if hive_table and current_table: + # Table exists, update it. + + # Note on table properties: + # - Iceberg table properties are stored in both HMS and Iceberg metadata JSON. + # - Updates are reflected in both locations + # - Existing HMS table properties (set by external systems like Hive/Spark) are preserved. + # + # While it is possible to modify HMS table properties through this API, it is not recommended: + # - Mixing HMS-specific properties in Iceberg metadata can cause confusion + # - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific) + # - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg + # (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata, + # and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS) + new_iceberg_properties = _construct_parameters( + metadata_location=updated_staged_table.metadata_location, + previous_metadata_location=current_table.metadata_location, + metadata_properties=updated_staged_table.properties, + ) + # Detect properties that were removed from Iceberg metadata + deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys() + + # Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties + existing_hms_parameters = dict(hive_table.parameters or {}) + for key in deleted_iceberg_properties: + existing_hms_parameters.pop(key, None) + existing_hms_parameters.update(new_iceberg_properties) + hive_table.parameters = existing_hms_parameters + + # Update hive's schema and properties + hive_table.sd = _construct_hive_storage_descriptor( + updated_staged_table.schema(), + updated_staged_table.location(), + property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), + ) + open_client.alter_table_with_environment_context( + dbname=database_name, + tbl_name=table_name, + new_tbl=hive_table, + environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), + ) + else: + # Table does not exist, create it. + hive_table = self._convert_iceberg_into_hive( + StagedTable( + identifier=(database_name, table_name), + metadata=updated_staged_table.metadata, + metadata_location=updated_staged_table.metadata_location, + io=updated_staged_table.io, + catalog=self, + ) + ) + self._create_hive_table(open_client, hive_table) + + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -521,95 +610,23 @@ def commit_table( # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: - lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) + if self._lock_enabled: + lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) - try: - if lock.state != LockState.ACQUIRED: - if lock.state == LockState.WAITING: - self._wait_for_lock(database_name, table_name, lock.lockid, open_client) - else: - raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") - - hive_table: HiveTable | None - current_table: Table | None try: - hive_table = self._get_hive_table(open_client, database_name, table_name) - current_table = self._convert_hive_into_iceberg(hive_table) - except NoSuchTableError: - hive_table = None - current_table = None - - updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) - if current_table and updated_staged_table.metadata == current_table.metadata: - # no changes, do nothing - return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - self._write_metadata( - metadata=updated_staged_table.metadata, - io=updated_staged_table.io, - metadata_path=updated_staged_table.metadata_location, - ) - - if hive_table and current_table: - # Table exists, update it. - - # Note on table properties: - # - Iceberg table properties are stored in both HMS and Iceberg metadata JSON. - # - Updates are reflected in both locations - # - Existing HMS table properties (set by external systems like Hive/Spark) are preserved. - # - # While it is possible to modify HMS table properties through this API, it is not recommended: - # - Mixing HMS-specific properties in Iceberg metadata can cause confusion - # - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific) - # - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg - # (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata, - # and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS) - new_iceberg_properties = _construct_parameters( - metadata_location=updated_staged_table.metadata_location, - previous_metadata_location=current_table.metadata_location, - metadata_properties=updated_staged_table.properties, - ) - # Detect properties that were removed from Iceberg metadata - deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys() - - # Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties - existing_hms_parameters = dict(hive_table.parameters or {}) - for key in deleted_iceberg_properties: - existing_hms_parameters.pop(key, None) - existing_hms_parameters.update(new_iceberg_properties) - hive_table.parameters = existing_hms_parameters - - # Update hive's schema and properties - hive_table.sd = _construct_hive_storage_descriptor( - updated_staged_table.schema(), - updated_staged_table.location(), - property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), - ) - open_client.alter_table_with_environment_context( - dbname=database_name, - tbl_name=table_name, - new_tbl=hive_table, - environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), - ) - else: - # Table does not exist, create it. - hive_table = self._convert_iceberg_into_hive( - StagedTable( - identifier=(database_name, table_name), - metadata=updated_staged_table.metadata, - metadata_location=updated_staged_table.metadata_location, - io=updated_staged_table.io, - catalog=self, - ) - ) - self._create_hive_table(open_client, hive_table) - except WaitingForLockException as e: - raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e - finally: - open_client.unlock(UnlockRequest(lockid=lock.lockid)) - - return CommitTableResponse( - metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location - ) + if lock.state != LockState.ACQUIRED: + if lock.state == LockState.WAITING: + self._wait_for_lock(database_name, table_name, lock.lockid, open_client) + else: + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") + + return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) + except WaitingForLockException as e: + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e + finally: + open_client.unlock(UnlockRequest(lockid=lock.lockid)) + else: + return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) def load_table(self, identifier: str | Identifier) -> Table: """Load the table's metadata and return the table instance. diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 88b653e44f..29c1ccf943 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -51,6 +51,7 @@ LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, + LOCK_ENABLED, HiveCatalog, _construct_hive_storage_descriptor, _HiveClient, @@ -1407,3 +1408,69 @@ def test_create_hive_client_with_kerberos_using_context_manager( # closing and re-opening work as expected. with client as open_client: assert open_client._iprot.trans.isOpen() + + +def test_lock_enabled_defaults_to_true() -> None: + """Verify that lock-enabled defaults to True for backward compatibility.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + assert catalog._lock_enabled is True + + +def test_lock_enabled_can_be_disabled() -> None: + """Verify that lock-enabled can be set to false.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + assert catalog._lock_enabled is False + + +def test_commit_table_skips_locking_when_lock_disabled() -> None: + """When lock-enabled is false, commit_table must not call lock, check_lock, or unlock.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + catalog._client = MagicMock() + + mock_table = MagicMock() + mock_table.name.return_value = ("default", "my_table") + + mock_do_commit = MagicMock() + mock_do_commit.return_value = MagicMock() + + with patch.object(catalog, "_do_commit", mock_do_commit): + catalog.commit_table(mock_table, requirements=(), updates=()) + + # The core commit logic should still be called + mock_do_commit.assert_called_once() + + # But no locking operations should have been performed + catalog._client.__enter__().lock.assert_not_called() + catalog._client.__enter__().check_lock.assert_not_called() + catalog._client.__enter__().unlock.assert_not_called() + + +def test_commit_table_uses_locking_when_lock_enabled() -> None: + """When lock-enabled is true (default), commit_table must call lock and unlock.""" + lockid = 99999 + prop = {"uri": HIVE_METASTORE_FAKE_URL} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.lock.return_value = LockResponse(lockid=lockid, state=LockState.ACQUIRED) + catalog._client = mock_client + + mock_table = MagicMock() + mock_table.name.return_value = ("default", "my_table") + + mock_do_commit = MagicMock() + mock_do_commit.return_value = MagicMock() + + with patch.object(catalog, "_do_commit", mock_do_commit): + catalog.commit_table(mock_table, requirements=(), updates=()) + + # Locking operations should have been performed + mock_client.lock.assert_called_once() + mock_client.unlock.assert_called_once() + # The core commit logic should still be called + mock_do_commit.assert_called_once()