From 6d2779de91b78c158c1610d550092bc33e17b896 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 30 Mar 2026 09:11:31 -0400 Subject: [PATCH 1/4] ENH: Improve zarr upload retry resilience and diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Batch-level retry backoff: replace linear sleep(1*N) with exponential backoff (5s, 20s, 40s, 80s, 120s) plus random jitter, giving S3 time to recover from throttling - Reduce parallelism on batch retry: halve ThreadPoolExecutor workers on each retry (5→3→2→1), reducing concurrent connections that may trigger S3 prefix-level throttling - Better failure diagnostics in _handle_failed_items_and_raise: log a summary line with failed/total counts, exception types grouped by count, and a "systematic" flag when all failures share the same exception type. This makes it immediately clear whether failures are random (flaky network) or deterministic (server-side issue) Motivated by investigation of #1821 where a previously-aborted OME-Zarr upload consistently fails with ConnectionAbortedError on every retry attempt for all 188 level-0 chunks (100% failure rate, 2259 connection attempts), while level-1 chunks and other fresh zarr uploads succeed fine from the same machine. Co-Authored-By: Claude Code 2.1.81 / Claude Opus 4.6 --- dandi/files/zarr.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index b9668e2b0..f3751a9da 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1,6 +1,7 @@ from __future__ import annotations from base64 import b64encode +from collections import Counter from collections.abc import Generator, Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed from contextlib import closing @@ -8,9 +9,11 @@ from datetime import datetime from enum import Enum import json +import math import os import os.path from pathlib import Path +import random from time import sleep from typing import Any, Optional import urllib.parse @@ -745,6 +748,7 @@ def mkzarr() -> str: items_to_upload = list(items) max_retries = 5 retry_count = 0 + current_jobs = jobs or 5 # Add all items to checksum tree (only done once) for it in items_to_upload: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) @@ -774,7 +778,7 @@ def mkzarr() -> str: r = client.post(f"/zarr/{zarr_id}/files/", json=uploading) # Upload files in parallel - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + with ThreadPoolExecutor(max_workers=current_jobs) as executor: futures = [ executor.submit( _upload_zarr_file, @@ -817,14 +821,22 @@ def mkzarr() -> str: # Prepare for next iteration with retry items if items_to_upload := retry_items: retry_count += 1 + current_jobs = max(1, math.ceil(current_jobs / 2)) if retry_count <= max_retries: lgr.info( - "%s: %s got 403 errors, requesting new URLs", + "%s: %s got 403 errors, requesting new URLs" + " (attempt %d/%d, workers: %d)", asset_path, pluralize(len(items_to_upload), "file"), + retry_count, + max_retries, + current_jobs, + ) + # Exponential backoff with jitter before retry + sleep( + min(2**retry_count * 5, 120) + + random.uniform(0, 5) ) - # Small delay before retry - sleep(1 * retry_count) # Check if we exhausted retries if items_to_upload: @@ -899,6 +911,18 @@ def _handle_failed_items_and_raise( # Log all failures for item, error in failed_items: lgr.error("Failed to upload %s: %s", item.filepath, error) + + # Summary diagnostics + exc_counts = Counter(type(error).__name__ for _, error in failed_items) + exc_summary = ", ".join(f"{k}: {v}" for k, v in exc_counts.most_common()) + lgr.error( + "Upload failure summary: %d/%d files failed; exception types: {%s}%s", + len(failed_items), + len(futures), + exc_summary, + " (systematic — all same exception type)" if len(exc_counts) == 1 else "", + ) + # Raise the first error raise failed_items[0][1] From 4c99954c556fc2c23ade5faef06fe626fa960a25 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Mon, 30 Mar 2026 10:54:57 -0400 Subject: [PATCH 2/4] BF: Set explicit timeout for zarr chunk upload PUT requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, zarr chunk uploads used no explicit timeout, relying on OS TCP defaults. For large zarr chunks (e.g., 2.6 GB level-0 OME-Zarr chunks), the upload can take many minutes at typical home upload speeds (~7 min at 50 Mbps, ~70 min at 5 Mbps). Without a timeout, the client either hangs indefinitely on network failures, or the OS TCP stack's default keepalive/idle timeout (often ~120s on Windows) kills the connection before a large upload can complete. Set an explicit requests timeout of (60, 7200) — 60 seconds for TCP connect, 2 hours for response read. The read timeout covers the period after the full request body is sent while waiting for S3's response, which is where ConnectionAbortedError was observed in #1821. Note: this does NOT limit the upload body transfer time itself. The requests library's read timeout only applies to waiting for response data, not to sending request data. So even very slow multi-hour uploads will not be interrupted by this timeout. Ref: #1821 Co-Authored-By: Claude Code 2.1.81 / Claude Opus 4.6 --- dandi/files/zarr.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index f3751a9da..8d20334b1 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -969,6 +969,7 @@ def _upload_zarr_file( json_resp=False, retry_if=_retry_zarr_file, headers=headers, + timeout=(60, 7200), ) except requests.HTTPError as e: post_upload_size_check(item.filepath, item.size, True) From a682735c4e483c5c2d7a8362b05190e8ae9d70b0 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Thu, 2 Apr 2026 16:44:48 -0400 Subject: [PATCH 3/4] TST: Add tests for zarr upload batch retry and failure diagnostics - test_zarr_upload_403_batch_retry_reduces_parallelism: exercises the batch-level retry loop with reduced parallelism (workers: N logged) and exponential backoff by mocking 403 responses at the request() level so _upload_zarr_file returns RETRY_NEEDED - test_zarr_upload_connection_error_diagnostics: exercises _handle_failed_items_and_raise summary logging by mocking ConnectionError on all S3 PUTs, verifying "Upload failure summary" includes exception type counts and "systematic" flag Covers code added in befe4a93 and 4a2da705. Co-Authored-By: Claude Code 2.1.81 / Claude Opus 4.6 --- dandi/tests/test_upload.py | 99 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/dandi/tests/test_upload.py b/dandi/tests/test_upload.py index 602e75ab9..c0020aebe 100644 --- a/dandi/tests/test_upload.py +++ b/dandi/tests/test_upload.py @@ -626,6 +626,105 @@ def mock_request(self, method, path, **kwargs): ), f"URL {url} should not have been retried but had {request_attempts[url]} attempts" +@pytest.mark.ai_generated +def test_zarr_upload_403_batch_retry_reduces_parallelism( + new_dandiset: SampleDandiset, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that 403 errors trigger batch-level retry with reduced parallelism + and exponential backoff, exercising the batch retry loop in iter_upload.""" + zarr_path = new_dandiset.dspath / "test.zarr" + zarr.save(zarr_path, np.arange(100), np.arange(100, 0, -1)) + + # Mock at the request() level so that _upload_zarr_file sees the 403 + # response and returns RETRY_NEEDED, triggering the batch retry loop. + request_attempts: defaultdict[str, int] = defaultdict(int) + original_request = RESTFullAPIClient.request + + def mock_request(self, method, path, **kwargs): + urlpath = urlparse(path).path if path.startswith("http") else path + request_attempts[urlpath] += 1 + + # Simulate 403 on first attempt for files containing "arr_1" + if method == "PUT" and "arr_1" in path and request_attempts[urlpath] == 1: + resp = Mock(spec=requests.Response) + resp.status_code = 403 + resp.ok = False + resp.text = "Forbidden" + resp.headers = {} + error = requests.HTTPError("403 Forbidden", response=resp) + error.response = resp + raise error + + return original_request(self, method, path, **kwargs) + + monkeypatch.setattr(RESTFullAPIClient, "request", mock_request) + # Speed up the test by removing the exponential backoff sleep + monkeypatch.setattr("dandi.files.zarr.sleep", lambda _: None) + + with caplog.at_level("INFO", logger="dandi"): + new_dandiset.upload() + + # Verify the upload succeeded + (asset,) = new_dandiset.dandiset.get_assets() + assert isinstance(asset, RemoteZarrAsset) + assert asset.path == "test.zarr" + + # Verify the batch retry log message with worker count + retry_msgs = [ + r.message for r in caplog.records if "requesting new URLs" in r.message + ] + assert len(retry_msgs) > 0, "Expected batch retry log message" + assert "workers:" in retry_msgs[0], "Expected reduced worker count in retry log" + + +@pytest.mark.ai_generated +def test_zarr_upload_connection_error_diagnostics( + new_dandiset: SampleDandiset, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that ConnectionError failures produce diagnostic summary logging.""" + zarr_path = new_dandiset.dspath / "test.zarr" + zarr.save(zarr_path, np.arange(100), np.arange(100, 0, -1)) + + # Mock put() to raise ConnectionError for all S3 uploads. + # This bypasses tenacity retries (they would take too long) and directly + # exercises _upload_zarr_file's except-Exception path and the + # _handle_failed_items_and_raise diagnostics. + original_put = RESTFullAPIClient.put + + def mock_put(self, url, **kwargs): + if "dandi-dandisets" in url: + raise requests.ConnectionError( + "('Connection aborted.', ConnectionAbortedError(10053, " + "'An established connection was aborted by the software " + "in your host machine'))" + ) + return original_put(self, url, **kwargs) + + monkeypatch.setattr(RESTFullAPIClient, "put", mock_put) + + with caplog.at_level("ERROR", logger="dandi"), pytest.raises( + requests.ConnectionError + ): + new_dandiset.upload() + + # Verify the diagnostic summary was logged + summary_msgs = [ + r.message for r in caplog.records if "Upload failure summary" in r.message + ] + assert ( + len(summary_msgs) == 1 + ), f"Expected 1 summary message, got {len(summary_msgs)}" + summary = summary_msgs[0] + assert "ConnectionError" in summary + assert ( + "systematic" in summary + ), "All-same-type failures should be flagged as systematic" + + @pytest.mark.ai_generated def test_upload_rejects_dandidownload_paths( new_dandiset: SampleDandiset, tmp_path: Path From 1ecb8aa8e22cd84a8813ece3529b15dc90f2cdd3 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Thu, 2 Apr 2026 16:55:03 -0400 Subject: [PATCH 4/4] ENH: Reject zarr chunks >5 GiB and improve upload failure logging - Add S3_MAX_SINGLE_PART_UPLOAD (5 GiB) constant and check file size in UploadItem.from_entry() before attempting upload. S3 rejects single-part PUTs larger than 5 GiB, and since the server mints presigned URLs without knowing the file size, the client must guard against this. Raises ValueError with a clear message about the multipart upload limitation. - Log file size alongside filepath in all upload failure paths: _upload_zarr_file now logs at WARNING level for both HTTPError (non-403) and generic Exception cases, making it immediately clear from logs which file failed and how large it was. - Include file size in _handle_failed_items_and_raise per-item error log lines. Motivated by #1821 where 2.6 GiB level-0 zarr chunks failed with ConnectionAbortedError but the logs didn't show the file sizes, making it hard to diagnose the size-related nature of the failure. Co-Authored-By: Claude Code 2.1.81 / Claude Opus 4.6 --- dandi/consts.py | 5 +++++ dandi/files/zarr.py | 31 ++++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/dandi/consts.py b/dandi/consts.py index 933c59aba..d2f07f591 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -193,6 +193,11 @@ def urls(self) -> Iterator[str]: #: MIME type assigned to & used to identify Zarr assets ZARR_MIME_TYPE = "application/x-zarr" +#: Maximum file size for a single S3 PUT upload (5 GiB). +#: S3 rejects single-part PUTs larger than this; such files would need +#: multipart upload which is not yet supported for zarr chunks. +S3_MAX_SINGLE_PART_UPLOAD = 5 * 1024**3 + #: Maximum number of Zarr directory entries to upload at once ZARR_UPLOAD_BATCH_SIZE = 255 diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 8d20334b1..40e5f9975 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -27,6 +27,7 @@ from dandi import get_logger from dandi.consts import ( MAX_ZARR_DEPTH, + S3_MAX_SINGLE_PART_UPLOAD, ZARR_DELETE_BATCH_SIZE, ZARR_MIME_TYPE, ZARR_UPLOAD_BATCH_SIZE, @@ -910,7 +911,7 @@ def _handle_failed_items_and_raise( # Log all failures for item, error in failed_items: - lgr.error("Failed to upload %s: %s", item.filepath, error) + lgr.error("Failed to upload %s (%d bytes): %s", item.filepath, item.size, error) # Summary diagnostics exc_counts = Counter(type(error).__name__ for _, error in failed_items) @@ -976,17 +977,29 @@ def _upload_zarr_file( # Check if this is a 403 error that we should retry with a new URL if e.response is not None and e.response.status_code == 403: lgr.debug( - "Got 403 error uploading %s, will retry with new URL: %s", + "Got 403 error uploading %s (%d bytes), will retry with new URL: %s", item.filepath, + item.size, str(e), ) return UploadResult(item=item, status=UploadStatus.RETRY_NEEDED, error=e) else: - # Other HTTP error - don't retry + lgr.warning( + "HTTP error uploading %s (%d bytes): %s", + item.filepath, + item.size, + e, + ) return UploadResult(item=item, status=UploadStatus.FAILED, error=e) except Exception as e: post_upload_size_check(item.filepath, item.size, True) - # Non-HTTP error - don't retry + lgr.warning( + "Error uploading %s (%d bytes): %s: %s", + item.filepath, + item.size, + type(e).__name__, + e, + ) return UploadResult(item=item, status=UploadStatus.FAILED, error=e) else: post_upload_size_check(item.filepath, item.size, False) @@ -1081,11 +1094,19 @@ def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem: content_type = "application/json" else: content_type = None + size = pre_upload_size_check(e.filepath) + if size > S3_MAX_SINGLE_PART_UPLOAD: + raise ValueError( + f"Zarr chunk {e.filepath} is {size / 1024**3:.2f} GiB," + f" exceeding the S3 single-part upload limit of" + f" {S3_MAX_SINGLE_PART_UPLOAD / 1024**3:.0f} GiB." + f" Multipart upload for zarr chunks is not yet supported." + ) return cls( entry_path=str(e), filepath=e.filepath, digest=digest, - size=pre_upload_size_check(e.filepath), + size=size, content_type=content_type, )