Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions google/cloud/storage/asyncio/retry/reads_resumption_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,28 @@ def update_state_from_response(
self, response: storage_v2.BidiReadObjectResponse, state: Dict[str, Any]
) -> None:
"""Processes a server response, performs integrity checks, and updates state."""
proto = getattr(response, "_pb", response)

# Capture read_handle if provided.
if response.read_handle:
state["read_handle"] = response.read_handle
if proto.HasField("read_handle"):
state["read_handle"] = storage_v2.BidiReadHandle(
handle=proto.read_handle.handle
)

download_states = state["download_states"]

for object_data_range in response.object_data_ranges:
for object_data_range in proto.object_data_ranges:
# Ignore empty ranges or ranges for IDs not in our state
# (e.g., from a previously cancelled request on the same stream).
if not object_data_range.read_range:
if not object_data_range.HasField("read_range"):
logger.warning(
"Received response with missing read_range field; ignoring."
)
continue

read_id = object_data_range.read_range.read_id
read_range_pb = object_data_range.read_range
read_id = read_range_pb.read_id

if read_id not in download_states:
logger.warning(
f"Received data for unknown or stale read_id {read_id}; ignoring."
Expand All @@ -107,7 +112,8 @@ def update_state_from_response(
read_state = download_states[read_id]

# Offset Verification
chunk_offset = object_data_range.read_range.read_offset
# We must validate data before updating state or writing to buffer.
chunk_offset = read_range_pb.read_offset
if chunk_offset != read_state.next_expected_offset:
raise DataCorruption(
response,
Expand All @@ -116,11 +122,11 @@ def update_state_from_response(
)

# Checksum Verification
# We must validate data before updating state or writing to buffer.
data = object_data_range.checksummed_data.content
server_checksum = object_data_range.checksummed_data.crc32c
checksummed_data = object_data_range.checksummed_data
data = checksummed_data.content

if server_checksum is not None:
if checksummed_data.HasField("crc32c"):
server_checksum = checksummed_data.crc32c
client_checksum = int.from_bytes(Checksum(data).digest(), "big")
if server_checksum != client_checksum:
raise DataCorruption(
Expand Down
17 changes: 9 additions & 8 deletions tests/perf/microbenchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import socket
import psutil

_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show


def publish_benchmark_extra_info(
benchmark: Any,
Expand All @@ -28,7 +29,6 @@ def publish_benchmark_extra_info(
download_bytes_list: Optional[List[int]] = None,
duration: Optional[int] = None,
) -> None:

"""
Helper function to publish benchmark parameters to the extra_info property.
"""
Expand All @@ -48,14 +48,15 @@ def publish_benchmark_extra_info(
benchmark.group = benchmark_group

if download_bytes_list is not None:
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
assert (
duration is not None
), "Duration must be provided if total_bytes_transferred is provided."
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
median_throughput = statistics.median(throughputs_list)


else:
object_size = params.file_size_bytes
num_files = params.num_files
Expand Down Expand Up @@ -211,13 +212,13 @@ def get_affinity(irq):

def get_primary_interface_name():
primary_ip = None

# 1. Determine the Local IP used for internet access
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# connect() to a public IP (Google DNS) to force route resolution
s.connect(('8.8.8.8', 80))
s.connect(("8.8.8.8", 80))
primary_ip = s.getsockname()[0]
except Exception:
# Fallback if no internet
Expand Down Expand Up @@ -248,7 +249,7 @@ def get_irq_affinity():
for irq in irqs:
affinity_str = get_affinity(irq)
if affinity_str != "N/A":
for part in affinity_str.split(','):
if '-' not in part:
for part in affinity_str.split(","):
if "-" not in part:
cpus.add(int(part))
return cpus
2 changes: 1 addition & 1 deletion tests/perf/microbenchmarks/time_based/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
@pytest.fixture
def workload_params(request):
params = request.param
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
return params, files_names
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params):


def _download_files_worker(process_idx, filename, params, bucket_type):

if bucket_type == "zonal":
return worker_loop.run_until_complete(
_download_time_based_async(worker_client, filename, params)
Expand Down
18 changes: 9 additions & 9 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer):
writer._is_stream_open = True
writer.write_obj_stream = mock_appendable_writer["mock_stream"]

mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(persisted_size=100)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100)

size = await writer.state_lookup()

Expand Down Expand Up @@ -388,9 +388,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer):
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
writer.bytes_appended_since_last_flush = 100

mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(persisted_size=200)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200)

await writer.flush()

Expand Down Expand Up @@ -431,9 +431,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer):
writer.write_obj_stream = mock_appendable_writer["mock_stream"]

resource = storage_type.Object(size=999)
mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(resource=resource)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource)

res = await writer.finalize()

Expand Down
Loading