Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 265 additions & 2 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.blob import _quote
from google.cloud.storage.blob import Blob
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.constants import (
_DEFAULT_TIMEOUT,
ENFORCEMENT_MODE_FULLY_RESTRICTED,
ENFORCEMENT_MODE_NOT_RESTRICTED,
)
from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS
from google.cloud.storage.constants import COLDLINE_STORAGE_CLASS
from google.cloud.storage.constants import DUAL_REGION_LOCATION_TYPE
Expand All @@ -65,7 +69,6 @@
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED


_UBLA_BPO_ENABLED_MESSAGE = (
"Pass only one of 'uniform_bucket_level_access_enabled' / "
"'bucket_policy_only_enabled' to 'IAMConfiguration'."
Expand Down Expand Up @@ -2538,6 +2541,25 @@ def cors(self, entries):
:rtype: bool or ``NoneType``
"""

@property
def encryption(self):
"""Retrieve encryption configuration for this bucket.

:rtype: :class:`BucketEncryption`
:returns: an instance for managing the bucket's encryption configuration.
"""
info = self._properties.get("encryption", {})
return BucketEncryption.from_api_repr(info, self)

@encryption.setter
def encryption(self, value):
"""Set encryption configuration for this bucket.

:type value: :class:`BucketEncryption` or dict
:param value: The encryption configuration.
"""
self._patch_property("encryption", value)

@property
def default_kms_key_name(self):
"""Retrieve / set default KMS encryption key for objects in the bucket.
Expand Down Expand Up @@ -3965,6 +3987,247 @@ def ip_filter(self, value):
self._patch_property(_IP_FILTER_PROPERTY, value)


class EncryptionEnforcementConfig(dict):
"""Map a bucket's encryption enforcement configuration.

:type restriction_mode: str
:param restriction_mode:
(Optional) The restriction mode for the encryption type.
When set to ``FullyRestricted``, the bucket will only allow objects encrypted with the encryption type corresponding to this configuration.
When set to ``NotRestricted``, the bucket will allow objects encrypted with any encryption type.

:type effective_time: :class:`datetime.datetime`
:param effective_time:
(Output only) The time when the encryption enforcement configuration became effective.
"""

def __init__(self, restriction_mode=None):
data = {}
if restriction_mode is not None:
# Validate input against allowed constants
allowed = (
ENFORCEMENT_MODE_FULLY_RESTRICTED,
ENFORCEMENT_MODE_NOT_RESTRICTED,
)
if restriction_mode not in allowed:
raise ValueError(
f"Invalid restriction_mode: {restriction_mode}. "
f"Must be one of {allowed}"
)
data["restrictionMode"] = restriction_mode

super().__init__(data)

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct instance from resource.

:type resource: dict
:param resource: mapping as returned from API call.

:rtype: :class:`EncryptionEnforcementConfig`
:returns: Instance created from resource.
"""
instance = cls()
instance.update(resource)
return instance

@property
def restriction_mode(self):
"""Get the restriction mode.

:rtype: str or ``NoneType``
:returns: The restriction mode or ``None`` if the property is not set.
"""
return self.get("restrictionMode")

@restriction_mode.setter
def restriction_mode(self, value):
"""Set the restriction mode.

:type value: str
:param value: The restriction mode.
"""
self["restrictionMode"] = value

@property
def effective_time(self):
"""Get the effective time.

:rtype: datetime.datetime or ``NoneType``
:returns: point-in time at which the configuration is effective,
or ``None`` if the property is not set.
"""
timestamp = self.get("effectiveTime")
if timestamp is not None:
return _rfc3339_nanos_to_datetime(timestamp)


class BucketEncryption(dict):
"""Map a bucket's encryption configuration.

:type bucket: :class:`Bucket`
:param bucket: Bucket for which this instance is the policy.

:type default_kms_key_name: str
:param default_kms_key_name:
(Optional) Resource name of KMS key used to encrypt bucket's content.

:type google_managed_encryption_enforcement_config: :class:`EncryptionEnforcementConfig`
:param google_managed_encryption_enforcement_config:
(Optional) Encryption enforcement configuration for Google managed encryption.

:type customer_managed_encryption_enforcement_config: :class:`EncryptionEnforcementConfig`
:param customer_managed_encryption_enforcement_config:
(Optional) Encryption enforcement configuration for Customer managed encryption.

:type customer_supplied_encryption_enforcement_config: :class:`EncryptionEnforcementConfig`
:param customer_supplied_encryption_enforcement_config:
(Optional) Encryption enforcement configuration for Customer supplied encryption.
"""

def __init__(
self,
bucket,
default_kms_key_name=None,
google_managed_encryption_enforcement_config=None,
customer_managed_encryption_enforcement_config=None,
customer_supplied_encryption_enforcement_config=None,
):
data = {}
if default_kms_key_name is not None:
data["defaultKmsKeyName"] = default_kms_key_name

if google_managed_encryption_enforcement_config is not None:
data["googleManagedEncryptionEnforcementConfig"] = (
google_managed_encryption_enforcement_config
)

if customer_managed_encryption_enforcement_config is not None:
data["customerManagedEncryptionEnforcementConfig"] = (
customer_managed_encryption_enforcement_config
)

if customer_supplied_encryption_enforcement_config is not None:
data["customerSuppliedEncryptionEnforcementConfig"] = (
customer_supplied_encryption_enforcement_config
)

super().__init__(data)
self._bucket = bucket

@classmethod
def from_api_repr(cls, resource, bucket):
"""Factory: construct instance from resource.

:type resource: dict
:param resource: mapping as returned from API call.

:type bucket: :class:`Bucket`
:params bucket: Bucket for which this instance is the policy.

:rtype: :class:`BucketEncryption`
:returns: Instance created from resource.
"""
instance = cls(bucket)
instance.update(resource)
return instance

@property
def bucket(self):
"""Bucket for which this instance is the policy.

:rtype: :class:`Bucket`
:returns: the instance's bucket.
"""
return self._bucket

@property
def default_kms_key_name(self):
"""Retrieve default KMS encryption key for objects in the bucket.

:rtype: str or ``NoneType``
:returns: Default KMS encryption key, or ``None`` if not set.
"""
return self.get("defaultKmsKeyName")

@default_kms_key_name.setter
def default_kms_key_name(self, value):
"""Set default KMS encryption key for objects in the bucket.

:type value: str or None
:param value: new KMS key name (None to clear any existing key).
"""
self["defaultKmsKeyName"] = value
self.bucket._patch_property("encryption", self)

@property
def google_managed_encryption_enforcement_config(self):
"""Retrieve the encryption enforcement configuration for Google managed encryption.

:rtype: :class:`EncryptionEnforcementConfig`
:returns: The configuration instance.
"""
data = self.get("googleManagedEncryptionEnforcementConfig")
if data:
return EncryptionEnforcementConfig.from_api_repr(data)
return None

@google_managed_encryption_enforcement_config.setter
def google_managed_encryption_enforcement_config(self, value):
"""Set the encryption enforcement configuration for Google managed encryption.

:type value: :class:`EncryptionEnforcementConfig` or dict
:param value: The configuration instance or dictionary.
"""
self["googleManagedEncryptionEnforcementConfig"] = value
self.bucket._patch_property("encryption", self)

@property
def customer_managed_encryption_enforcement_config(self):
"""Retrieve the encryption enforcement configuration for Customer managed encryption.

:rtype: :class:`EncryptionEnforcementConfig`
:returns: The configuration instance.
"""
data = self.get("customerManagedEncryptionEnforcementConfig")
if data:
return EncryptionEnforcementConfig.from_api_repr(data)
return None

@customer_managed_encryption_enforcement_config.setter
def customer_managed_encryption_enforcement_config(self, value):
"""Set the encryption enforcement configuration for Customer managed encryption.

:type value: :class:`EncryptionEnforcementConfig` or dict
:param value: The configuration instance or dictionary.
"""
self["customerManagedEncryptionEnforcementConfig"] = value
self.bucket._patch_property("encryption", self)

@property
def customer_supplied_encryption_enforcement_config(self):
"""Retrieve the encryption enforcement configuration for Customer supplied encryption.

:rtype: :class:`EncryptionEnforcementConfig`
:returns: The configuration instance.
"""
data = self.get("customerSuppliedEncryptionEnforcementConfig")
if data:
return EncryptionEnforcementConfig.from_api_repr(data)
return None

@customer_supplied_encryption_enforcement_config.setter
def customer_supplied_encryption_enforcement_config(self, value):
"""Set the encryption enforcement configuration for Customer supplied encryption.

:type value: :class:`EncryptionEnforcementConfig` or dict
:param value: The configuration instance or dictionary.
"""
self["customerSuppliedEncryptionEnforcementConfig"] = value
self.bucket._patch_property("encryption", self)


class SoftDeletePolicy(dict):
"""Map a bucket's soft delete policy.

Expand Down
6 changes: 6 additions & 0 deletions google/cloud/storage/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,9 @@
See: https://cloud.google.com/storage/docs/managing-turbo-replication
"""

ENFORCEMENT_MODE_FULLY_RESTRICTED = "FullyRestricted"
"""Bucket encryption restriction mode where encryption is fully restricted."""

ENFORCEMENT_MODE_NOT_RESTRICTED = "NotRestricted"
"""Bucket encryption restriction mode where encryption is not restricted."""
Loading
Loading