diff --git a/dandi/consts.py b/dandi/consts.py index 933c59aba..d2f07f591 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -193,6 +193,11 @@ def urls(self) -> Iterator[str]: #: MIME type assigned to & used to identify Zarr assets ZARR_MIME_TYPE = "application/x-zarr" +#: Maximum file size for a single S3 PUT upload (5 GiB). +#: S3 rejects single-part PUTs larger than this; such files would need +#: multipart upload which is not yet supported for zarr chunks. +S3_MAX_SINGLE_PART_UPLOAD = 5 * 1024**3 + #: Maximum number of Zarr directory entries to upload at once ZARR_UPLOAD_BATCH_SIZE = 255 diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index b9668e2b0..40e5f9975 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1,6 +1,7 @@ from __future__ import annotations from base64 import b64encode +from collections import Counter from collections.abc import Generator, Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed from contextlib import closing @@ -8,9 +9,11 @@ from datetime import datetime from enum import Enum import json +import math import os import os.path from pathlib import Path +import random from time import sleep from typing import Any, Optional import urllib.parse @@ -24,6 +27,7 @@ from dandi import get_logger from dandi.consts import ( MAX_ZARR_DEPTH, + S3_MAX_SINGLE_PART_UPLOAD, ZARR_DELETE_BATCH_SIZE, ZARR_MIME_TYPE, ZARR_UPLOAD_BATCH_SIZE, @@ -745,6 +749,7 @@ def mkzarr() -> str: items_to_upload = list(items) max_retries = 5 retry_count = 0 + current_jobs = jobs or 5 # Add all items to checksum tree (only done once) for it in items_to_upload: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) @@ -774,7 +779,7 @@ def mkzarr() -> str: r = client.post(f"/zarr/{zarr_id}/files/", json=uploading) # Upload files in parallel - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + with ThreadPoolExecutor(max_workers=current_jobs) as executor: futures = [ executor.submit( _upload_zarr_file, @@ -817,14 +822,22 @@ def mkzarr() -> str: # Prepare for next iteration with retry items if items_to_upload := retry_items: retry_count += 1 + current_jobs = max(1, math.ceil(current_jobs / 2)) if retry_count <= max_retries: lgr.info( - "%s: %s got 403 errors, requesting new URLs", + "%s: %s got 403 errors, requesting new URLs" + " (attempt %d/%d, workers: %d)", asset_path, pluralize(len(items_to_upload), "file"), + retry_count, + max_retries, + current_jobs, + ) + # Exponential backoff with jitter before retry + sleep( + min(2**retry_count * 5, 120) + + random.uniform(0, 5) ) - # Small delay before retry - sleep(1 * retry_count) # Check if we exhausted retries if items_to_upload: @@ -898,7 +911,19 @@ def _handle_failed_items_and_raise( # Log all failures for item, error in failed_items: - lgr.error("Failed to upload %s: %s", item.filepath, error) + lgr.error("Failed to upload %s (%d bytes): %s", item.filepath, item.size, error) + + # Summary diagnostics + exc_counts = Counter(type(error).__name__ for _, error in failed_items) + exc_summary = ", ".join(f"{k}: {v}" for k, v in exc_counts.most_common()) + lgr.error( + "Upload failure summary: %d/%d files failed; exception types: {%s}%s", + len(failed_items), + len(futures), + exc_summary, + " (systematic — all same exception type)" if len(exc_counts) == 1 else "", + ) + # Raise the first error raise failed_items[0][1] @@ -945,23 +970,36 @@ def _upload_zarr_file( json_resp=False, retry_if=_retry_zarr_file, headers=headers, + timeout=(60, 7200), ) except requests.HTTPError as e: post_upload_size_check(item.filepath, item.size, True) # Check if this is a 403 error that we should retry with a new URL if e.response is not None and e.response.status_code == 403: lgr.debug( - "Got 403 error uploading %s, will retry with new URL: %s", + "Got 403 error uploading %s (%d bytes), will retry with new URL: %s", item.filepath, + item.size, str(e), ) return UploadResult(item=item, status=UploadStatus.RETRY_NEEDED, error=e) else: - # Other HTTP error - don't retry + lgr.warning( + "HTTP error uploading %s (%d bytes): %s", + item.filepath, + item.size, + e, + ) return UploadResult(item=item, status=UploadStatus.FAILED, error=e) except Exception as e: post_upload_size_check(item.filepath, item.size, True) - # Non-HTTP error - don't retry + lgr.warning( + "Error uploading %s (%d bytes): %s: %s", + item.filepath, + item.size, + type(e).__name__, + e, + ) return UploadResult(item=item, status=UploadStatus.FAILED, error=e) else: post_upload_size_check(item.filepath, item.size, False) @@ -1056,11 +1094,19 @@ def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem: content_type = "application/json" else: content_type = None + size = pre_upload_size_check(e.filepath) + if size > S3_MAX_SINGLE_PART_UPLOAD: + raise ValueError( + f"Zarr chunk {e.filepath} is {size / 1024**3:.2f} GiB," + f" exceeding the S3 single-part upload limit of" + f" {S3_MAX_SINGLE_PART_UPLOAD / 1024**3:.0f} GiB." + f" Multipart upload for zarr chunks is not yet supported." + ) return cls( entry_path=str(e), filepath=e.filepath, digest=digest, - size=pre_upload_size_check(e.filepath), + size=size, content_type=content_type, ) diff --git a/dandi/tests/test_upload.py b/dandi/tests/test_upload.py index 602e75ab9..c0020aebe 100644 --- a/dandi/tests/test_upload.py +++ b/dandi/tests/test_upload.py @@ -626,6 +626,105 @@ def mock_request(self, method, path, **kwargs): ), f"URL {url} should not have been retried but had {request_attempts[url]} attempts" +@pytest.mark.ai_generated +def test_zarr_upload_403_batch_retry_reduces_parallelism( + new_dandiset: SampleDandiset, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that 403 errors trigger batch-level retry with reduced parallelism + and exponential backoff, exercising the batch retry loop in iter_upload.""" + zarr_path = new_dandiset.dspath / "test.zarr" + zarr.save(zarr_path, np.arange(100), np.arange(100, 0, -1)) + + # Mock at the request() level so that _upload_zarr_file sees the 403 + # response and returns RETRY_NEEDED, triggering the batch retry loop. + request_attempts: defaultdict[str, int] = defaultdict(int) + original_request = RESTFullAPIClient.request + + def mock_request(self, method, path, **kwargs): + urlpath = urlparse(path).path if path.startswith("http") else path + request_attempts[urlpath] += 1 + + # Simulate 403 on first attempt for files containing "arr_1" + if method == "PUT" and "arr_1" in path and request_attempts[urlpath] == 1: + resp = Mock(spec=requests.Response) + resp.status_code = 403 + resp.ok = False + resp.text = "Forbidden" + resp.headers = {} + error = requests.HTTPError("403 Forbidden", response=resp) + error.response = resp + raise error + + return original_request(self, method, path, **kwargs) + + monkeypatch.setattr(RESTFullAPIClient, "request", mock_request) + # Speed up the test by removing the exponential backoff sleep + monkeypatch.setattr("dandi.files.zarr.sleep", lambda _: None) + + with caplog.at_level("INFO", logger="dandi"): + new_dandiset.upload() + + # Verify the upload succeeded + (asset,) = new_dandiset.dandiset.get_assets() + assert isinstance(asset, RemoteZarrAsset) + assert asset.path == "test.zarr" + + # Verify the batch retry log message with worker count + retry_msgs = [ + r.message for r in caplog.records if "requesting new URLs" in r.message + ] + assert len(retry_msgs) > 0, "Expected batch retry log message" + assert "workers:" in retry_msgs[0], "Expected reduced worker count in retry log" + + +@pytest.mark.ai_generated +def test_zarr_upload_connection_error_diagnostics( + new_dandiset: SampleDandiset, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that ConnectionError failures produce diagnostic summary logging.""" + zarr_path = new_dandiset.dspath / "test.zarr" + zarr.save(zarr_path, np.arange(100), np.arange(100, 0, -1)) + + # Mock put() to raise ConnectionError for all S3 uploads. + # This bypasses tenacity retries (they would take too long) and directly + # exercises _upload_zarr_file's except-Exception path and the + # _handle_failed_items_and_raise diagnostics. + original_put = RESTFullAPIClient.put + + def mock_put(self, url, **kwargs): + if "dandi-dandisets" in url: + raise requests.ConnectionError( + "('Connection aborted.', ConnectionAbortedError(10053, " + "'An established connection was aborted by the software " + "in your host machine'))" + ) + return original_put(self, url, **kwargs) + + monkeypatch.setattr(RESTFullAPIClient, "put", mock_put) + + with caplog.at_level("ERROR", logger="dandi"), pytest.raises( + requests.ConnectionError + ): + new_dandiset.upload() + + # Verify the diagnostic summary was logged + summary_msgs = [ + r.message for r in caplog.records if "Upload failure summary" in r.message + ] + assert ( + len(summary_msgs) == 1 + ), f"Expected 1 summary message, got {len(summary_msgs)}" + summary = summary_msgs[0] + assert "ConnectionError" in summary + assert ( + "systematic" in summary + ), "All-same-type failures should be flagged as systematic" + + @pytest.mark.ai_generated def test_upload_rejects_dandidownload_paths( new_dandiset: SampleDandiset, tmp_path: Path