From 454ff396a5f0b10357fa68d6ccef16d8fdd0c1c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Thu, 5 Mar 2026 19:53:36 +0100 Subject: [PATCH 1/9] feat(sanitizers): merge duplicate sanitizers and add pre-scan optimisation - Add merge() to DefaultSanitizer, TokenSanitizer, HeaderSanitizer, QueryParamSanitizer, UrlPatternSanitizer, ResponseUrlSanitizer - Add _dedup_sanitizers() which collapses same-class sanitizers into one, so the sanitizer chain makes a single pass over each request/response instead of N passes when multiple sanitizers of the same type are present - Add pre-scan in DefaultSanitizer._sanitize_body: skip the full JSON parse when no sensitive field name or value is present in the body (fast path) Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/recorder.py | 10 +++- src/keboola/vcr/sanitizers.py | 93 ++++++++++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index ebe0a2f..4f3817d 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -29,7 +29,14 @@ except ImportError: freeze_time = None # type: ignore -from .sanitizers import BaseSanitizer, CompositeSanitizer, DefaultSanitizer, create_default_sanitizer, extract_values +from .sanitizers import ( + BaseSanitizer, + CompositeSanitizer, + DefaultSanitizer, + _dedup_sanitizers, + create_default_sanitizer, + extract_values, +) logger = logging.getLogger(__name__) @@ -223,6 +230,7 @@ def record_debug_run( chain: list[BaseSanitizer] = [DefaultSanitizer(config=config, sensitive_values=secret_values)] if sanitizers: chain.extend(sanitizers) + chain = _dedup_sanitizers(chain) recorder = cls( cassette_dir=output_dir, diff --git a/src/keboola/vcr/sanitizers.py b/src/keboola/vcr/sanitizers.py index 5e9206c..ee52839 100644 --- a/src/keboola/vcr/sanitizers.py +++ b/src/keboola/vcr/sanitizers.py @@ -14,6 +14,7 @@ from typing import Any + class BaseSanitizer(ABC): """ Base class for request/response sanitization. @@ -146,7 +147,22 @@ def _sanitize_body(self, body: str) -> str: if not body: return body - # 1. Try JSON + # Quick pre-scan: skip expensive JSON parse if no sensitive content is present. + # Use quoted field names for JSON key matching (avoids false positives from substrings). + needs_json_sanitization = any(f'"{field}"' in body for field in self.sensitive_fields) or any( + value in body for value in self.sensitive_values + ) + if not needs_json_sanitization: + # Still apply form-encoded / exact value checks (cheap string ops) + result = body + for pattern in self._field_patterns: + result = pattern.sub(rf"\1{self.replacement}", result) + for value in self.sensitive_values: + if value in result: + result = result.replace(value, self.replacement) + return result + + # 1. Try JSON (only when pre-scan found a potential sensitive field/value) try: data = json.loads(body) changed = [False] @@ -212,6 +228,22 @@ def _filter_headers(self, headers: dict) -> dict: result[key] = value return result + def merge(self, other: "DefaultSanitizer") -> "DefaultSanitizer": + """Merge two DefaultSanitizers into one, unioning all their fields.""" + merged = DefaultSanitizer.__new__(DefaultSanitizer) + merged.sensitive_fields = self.sensitive_fields | other.sensitive_fields + merged.safe_headers = self.safe_headers | other.safe_headers + merged.replacement = self.replacement + seen = set() + combined_values = [] + for v in self.sensitive_values + other.sensitive_values: + if v not in seen: + seen.add(v) + combined_values.append(v) + merged.sensitive_values = sorted(combined_values, key=len, reverse=True) + merged._field_patterns = [re.compile(rf"({re.escape(f)}=)[^&\"\s]+") for f in merged.sensitive_fields] + return merged + # -- Applied uniformly to requests and responses -- def before_record_request(self, request: Any) -> Any: @@ -292,6 +324,13 @@ def __init__(self, tokens: list[str], replacement: str = "REDACTED"): self.tokens = [t for t in tokens if t] # Filter out empty strings self.replacement = replacement + def merge(self, other: "TokenSanitizer") -> "TokenSanitizer": + """Merge two TokenSanitizers into one, unioning their token lists.""" + return TokenSanitizer( + tokens=list(dict.fromkeys(self.tokens + other.tokens)), + replacement=self.replacement, + ) + def _sanitize_string(self, value: str) -> str: """Replace all tokens in a string.""" result = value @@ -421,6 +460,13 @@ def __init__( self.headers_to_remove = set(h.lower() for h in (headers_to_remove or [])) + def merge(self, other: "HeaderSanitizer") -> "HeaderSanitizer": + """Merge two HeaderSanitizers into one, unioning their header sets.""" + merged = HeaderSanitizer.__new__(HeaderSanitizer) + merged.safe_headers = self.safe_headers | other.safe_headers + merged.headers_to_remove = self.headers_to_remove | other.headers_to_remove + return merged + def _filter_headers(self, headers: dict) -> dict: """Filter headers to only include safe ones.""" result = {} @@ -539,6 +585,13 @@ def __init__(self, parameters: list[str] | None = None, replacement: str = "toke # Build regex patterns for each parameter self._patterns = [re.compile(rf"({re.escape(param)}=)[^&\"\s]+") for param in self.parameters] + def merge(self, other: "QueryParamSanitizer") -> "QueryParamSanitizer": + """Merge two QueryParamSanitizers into one, unioning their parameter lists.""" + return QueryParamSanitizer( + parameters=list(dict.fromkeys(self.parameters + other.parameters)), + replacement=self.replacement, + ) + def _sanitize_string(self, value: str) -> str: """Replace parameter values in a string.""" for pattern in self._patterns: @@ -579,8 +632,20 @@ def __init__(self, patterns: list[tuple]): Pattern is a regex string, replacement is the string to use for matches. """ + self._raw_patterns = patterns # stored for merge() self.patterns = [(re.compile(p), r) for p, r in patterns] + def merge(self, other: "UrlPatternSanitizer") -> "UrlPatternSanitizer": + """Merge two UrlPatternSanitizers into one, unioning their pattern lists.""" + seen = set() + combined = [] + for p, r in self._raw_patterns + other._raw_patterns: + key = (p, r) + if key not in seen: + seen.add(key) + combined.append((p, r)) + return UrlPatternSanitizer(patterns=combined) + def _sanitize_url(self, url: str) -> str: """Apply all patterns to the URL.""" result = url @@ -620,6 +685,7 @@ def __init__(self, dynamic_params: list[str], url_domains: list[str]): dynamic_params: Query parameter names to strip from matching URLs. url_domains: Domain substrings that identify URLs to sanitize. """ + self.dynamic_params = dynamic_params # stored for merge() self.url_domains = url_domains # Build a single regex that matches any of the dynamic params as # query-string key=value pairs (including leading & or ?). @@ -630,6 +696,13 @@ def __init__(self, dynamic_params: list[str], url_domains: list[str]): domain_alts = "|".join(re.escape(d) for d in url_domains) self._url_re = re.compile(rf'https?://[^\s"\'<>]*(?:{domain_alts})[^\s"\'<>]*') + def merge(self, other: "ResponseUrlSanitizer") -> "ResponseUrlSanitizer": + """Merge two ResponseUrlSanitizers into one, unioning their params and domains.""" + return ResponseUrlSanitizer( + dynamic_params=list(dict.fromkeys(self.dynamic_params + other.dynamic_params)), + url_domains=list(dict.fromkeys(self.url_domains + other.url_domains)), + ) + def _body_has_matching_domain(self, body_str: str) -> bool: """Quick check whether the body contains any target domain.""" return any(domain in body_str for domain in self.url_domains) @@ -785,6 +858,24 @@ def before_record_response(self, response: dict) -> dict: return response +def _dedup_sanitizers(sanitizers: list["BaseSanitizer"]) -> list["BaseSanitizer"]: + """Merge same-class sanitizers to avoid redundant processing passes. + + Preserves order (first occurrence of each class keeps its position). + Sanitizer classes without a ``merge()`` method are kept as-is. + """ + result: list[BaseSanitizer] = [] + by_class: dict[type, int] = {} # class → index in result + for s in sanitizers: + cls = type(s) + if cls in by_class and hasattr(result[by_class[cls]], "merge"): + result[by_class[cls]] = result[by_class[cls]].merge(s) + else: + by_class[cls] = len(result) + result.append(s) + return result + + def create_default_sanitizer(secrets: dict[str, Any]) -> DefaultSanitizer: """ Create a default sanitizer from secrets. From 0b79c174af5445af0c14e6d875fc8091c18a560d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Thu, 5 Mar 2026 19:54:00 +0100 Subject: [PATCH 2/9] fix(oom): prevent RSS growth during VCR recording MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: every HTTPS interaction during recording caused urllib3 to call load_default_certs() on a brand-new ssl.SSLContext, allocating ~480 KB of CA cert data into the OpenSSL C-heap (via jemalloc). jemalloc's high-watermark behaviour means this memory is never returned to the OS, producing ~7 MB RSS growth per 10 interactions and OOM for jobs with 500–2000 interactions. Three root causes, three fixes applied inside _pool_reuse_patch(): 1. Pool proliferation: components that create a new requests.Session per API call (e.g. Facebook Ads batch requests) each get their own urllib3.PoolManager and a fresh HTTPSConnectionPool, so N interactions → N pools → N ssl_context allocations. Fix: intercept PoolManager.connection_from_host and return one shared pool per (scheme, host, port) for the lifetime of the recording. 2. ssl_context per connect(): even with a single pool, urllib3 calls _new_conn() → connect() → _ssl_wrap_socket_and_match_hostname( ssl_context=None) for each interaction because requests 2.32.x never puts ssl_context into pool.conn_kw. When ssl_context is None, urllib3 creates a new context and calls load_default_certs() every time. Fix: inject one shared ssl.SSLContext (with load_default_certs() pre-called) into pool.conn_kw["ssl_context"] so all _new_conn() connections share it. 3. Connection churn: VCRConnection.is_connected delegated to real_connection.sock, which is None after Connection: close responses. urllib3 saw is_connected=False and called _new_conn() for every request. Fix: override is_connected on VCRConnection to always return True when real_connection exists (recording) or a VCRFakeSocket is present (replay). Additionally, implement VCRHTTPResponse.release_conn() so urllib3's pool queue is refilled after each response and _new_conn() is not called unnecessarily. Also: write large response body strings to the cassette file in 64 KB chunks (_write_interaction_chunked) to prevent multi-MB transient allocations from fragmenting glibc's heap during recording of APIs with large JSON payloads. Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- src/keboola/vcr/recorder.py | 242 ++++++++++++++++++++++++++++++++++-- uv.lock | 2 +- 3 files changed, 233 insertions(+), 13 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 39e6fc8..621f1a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "keboola.vcr" -version = "0.0.0" +version = "0.1.15" description = "VCR recording, sanitization, and validation for Keboola component HTTP interactions" readme = "README.md" license = {text = "MIT"} diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index 4f3817d..57febf0 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -9,6 +9,7 @@ from __future__ import annotations import base64 +import contextlib import functools import json import logging @@ -20,6 +21,92 @@ try: import vcr from vcr.stubs import VCRHTTPResponse as _VCRHTTPResponse + + # --- urllib3 connection-reuse fix --- + # urllib3's pool calls is_connection_dropped(conn) → not conn.is_connected before + # reusing a connection. VCRConnection doesn't define is_connected, so __getattr__ + # delegates to real_connection.is_connected, which checks real_connection.sock. + # + # During VCR RECORDING, APIs like Facebook respond with Connection: close, which + # closes real_connection.sock. urllib3 then sees is_connected=False and calls + # _new_conn(), creating a NEW VCRHTTPConnection + real_connection + ssl_context + # per interaction. Each ssl_context loads the system CA trust store into OpenSSL + # C-heap (~480 KB), so 1.6 Connection-close responses per interaction × ~480 KB + # ≈ 770 KB/interaction of C-heap growth — matching the observed RSS growth rate. + # This C-heap growth does NOT appear in tracemalloc (Python-only) and is not + # released by gc.collect() + malloc_trim because jemalloc manages it differently. + # + # During VCR REPLAY, VCRFakeSocket replaces the real socket, so is_connected on + # the real_connection is always False even though the VCR stub is perfectly usable. + # + # Fix: always return True when real_connection exists (recording) or a VCRFakeSocket + # is present (replay). http.client.HTTPConnection auto-reconnects on its next + # send() if the socket has been closed — so reusing the VCRHTTPConnection is safe + # and reuses the existing ssl_context instead of loading the CA store again. + try: + from vcr.stubs import VCRFakeSocket as _VCRFakeSocket + + _VCRConnection = _VCRHTTPResponse # sentinel — replaced below + + # Walk to VCRConnection (shared base of VCRHTTPConnection / VCRHTTPSConnection) + from vcr.stubs import VCRHTTPConnection as _VCRHTTPConnection + + _VCRConnection = _VCRHTTPConnection.__bases__[0] + + @property # type: ignore[misc] + def _vcr_is_connected(self) -> bool: + sock = getattr(self, "_sock", None) + if isinstance(sock, _VCRFakeSocket): + return True # VCR replay: fake socket is always "connected" + rc = getattr(self, "real_connection", None) + if rc is not None: + # VCR recording: always claim connected so urllib3 reuses this + # VCRHTTPConnection and its ssl_context instead of calling _new_conn(). + # http.client.HTTPConnection.send() calls connect() automatically when + # the socket has been closed (Connection: close), so the real connection + # is re-established transparently on the next request. + return True + return False + + _VCRConnection.is_connected = _vcr_is_connected # type: ignore[attr-defined] + del _vcr_is_connected, _VCRHTTPConnection, _VCRConnection, _VCRFakeSocket + except Exception: + pass + # --- end urllib3 connection-reuse fix --- + + # --- VCRHTTPResponse.release_conn fix --- + # urllib3 2.x returns VCRHTTPResponse *directly* from _make_request() (it is + # duck-typed as BaseHTTPResponse). urllib3 then stamps + # response._connection = VCRHTTPConnection + # response._pool = HTTPConnectionPool + # on the VCRHTTPResponse instance and returns it straight to requests as + # response.raw. When requests finishes reading the body it calls + # response.raw.release_conn() + # which hits VCRHTTPResponse — NOT urllib3.response.HTTPResponse — so urllib3's + # own release_conn() machinery is bypassed entirely. + # + # Without a working release_conn(), _put_conn() is never called, the pool queue + # stays empty (q=0), and _new_conn() fires for every request. Each _new_conn() + # creates a new VCRHTTPConnection + real urllib3 HTTPSConnection + SSLContext + # (~480 KB of OpenSSL C-heap loaded from the system CA store). That adds up to + # ~8 MB per 10 interactions and causes OOM for jobs with 500+ interactions. + # + # The fix: implement release_conn() to return _connection to the pool, exactly + # mirroring what urllib3.response.HTTPResponse.release_conn() does. This is safe + # because VCRHTTPConnection is just a stub — _put_conn() merely places it back in + # the pool queue so it can be reused on the next request, where vcrpy replaces the + # socket with a VCRFakeSocket / real_connection reconnect as usual. + if not hasattr(_VCRHTTPResponse, "release_conn"): + def _vcr_release_conn(self) -> None: # noqa: E306 + pool = getattr(self, "_pool", None) + conn = getattr(self, "_connection", None) + if pool is not None and conn is not None: + pool._put_conn(conn) + self._connection = None + + _VCRHTTPResponse.release_conn = _vcr_release_conn # type: ignore[attr-defined] + # --- end VCRHTTPResponse.release_conn fix --- + except ImportError: vcr = None # type: ignore _VCRHTTPResponse = None # type: ignore @@ -239,6 +326,7 @@ def record_debug_run( freeze_time_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), cassette_file=f"vcr_debug_{component_id}_{config_id}_{timestamp}.json", ) + recorder.record(component_runner) def run( @@ -320,11 +408,12 @@ def record(self, component_runner: Callable[[], None]) -> None: # Suppress the context-exit _save() — we write directly to temp_path cassette._save = lambda force=False: None - if self.freeze_time_at and self.freeze_time_at != "auto": - with freeze_time(self.freeze_time_at): + with _pool_reuse_patch(): + if self.freeze_time_at and self.freeze_time_at != "auto": + with freeze_time(self.freeze_time_at): + component_runner() + else: component_runner() - else: - component_runner() finally: _VCRHTTPResponse.__init__ = _original_vcr_init @@ -433,13 +522,7 @@ def _append_interaction(self, temp_path: Path, cassette_before_record_response, return with open(temp_path, "a") as f: - json.dump( - {"request": filtered_request._to_dict(), "response": filtered_response}, - f, - cls=_BytesEncoder, - sort_keys=True, - ) - f.write("\n") + _write_interaction_chunked(f, filtered_request._to_dict(), filtered_response) def _resolve_freeze_time(self) -> str | None: """Resolve effective freeze_time, reading from metadata if 'auto'.""" @@ -708,6 +791,143 @@ def getbuffer(self): return memoryview(self._data) +def _write_json_string_chunked(f, s: str, chunk_size: int = 65536) -> None: + """Write a JSON-encoded string to f in 64 KB chunks. + + json.dumps(large_string) allocates the entire escaped form at once, which + for multi-MB bodies creates a transient allocation large enough to fragment + glibc's heap. Chunking keeps each escaped piece ≤ ~130 KB so it falls + just above glibc's mmap threshold and the allocator returns it to the OS + immediately on free rather than leaving it stranded on the heap. + """ + f.write('"') + for i in range(0, len(s), chunk_size): + f.write(json.dumps(s[i : i + chunk_size])[1:-1]) # strip enclosing quotes + f.write('"') + + +def _write_interaction_chunked(f, req_dict: dict, response: dict) -> None: + """Serialize a VCR interaction to f as a JSONL record. + + The response body string is written in 64 KB chunks (see _write_json_string_chunked) + to prevent large transient allocations from fragmenting glibc's heap. + All other fields are small and serialized normally. + """ + body_dict = response.get("body", {}) + resp_rest = {k: v for k, v in response.items() if k != "body"} + + f.write('{"request": ') + json.dump(req_dict, f, cls=_BytesEncoder, sort_keys=True) + f.write(', "response": {"body": ') + if "string" in body_dict: + body_string = body_dict["string"] + if isinstance(body_string, bytes): + try: + body_str = body_string.decode("utf-8") + except UnicodeDecodeError: + body_str = base64.b64encode(body_string).decode("ascii") + else: + body_str = body_string if body_string is not None else "" + body_rest = {k: v for k, v in body_dict.items() if k != "string"} + f.write('{"string": ') + _write_json_string_chunked(f, body_str) + if body_rest: + f.write(", ") + f.write(json.dumps(body_rest, cls=_BytesEncoder, sort_keys=True)[1:-1]) + f.write("}") + else: + json.dump(body_dict, f, cls=_BytesEncoder, sort_keys=True) + if resp_rest: + f.write(", ") + f.write(json.dumps(resp_rest, cls=_BytesEncoder, sort_keys=True)[1:-1]) + f.write("}}\n") + + +@contextlib.contextmanager +def _pool_reuse_patch(): + """Prevent urllib3 connection pool proliferation during VCR recording. + + When the component creates a new requests.Session per API query (common with + Facebook Graph API batch requests), each session creates its own + urllib3.PoolManager which creates a fresh HTTPSConnectionPool for + graph.facebook.com:443. For N interactions this results in N pools, each + with its own SSLContext (~480 KB of C-heap loaded from the system CA store). + + This C-heap growth does NOT appear in Python tracemalloc, is NOT collected + by gc.collect(), and adds ~8 MB per 10 interactions — causing OOM for + Facebook Ads jobs with 500–2000 interactions. + + Fix 1 (pool reuse): intercept PoolManager.connection_from_host at the class + level and return the first pool for any given (scheme, host, port) instead of + letting each new PoolManager create its own. + + Fix 2 (ssl_context reuse): inject a single shared ssl.SSLContext into each + pool's conn_kw so that ALL connections created by _new_conn() for that pool + share the same SSLContext. Without this, each _new_conn() + _validate_conn() + sequence calls real_connection.connect() → _ssl_wrap_socket_and_match_hostname + with ssl_context=None → create_urllib3_context() + load_default_certs() for + EVERY interaction. load_default_certs() loads ~480 KB of CA cert data into + OpenSSL's C-heap; jemalloc never returns this to the OS even after the + SSLContext is freed, so RSS grows ~480 KB per interaction. + + With a shared SSLContext, load_default_certs() runs once per host and all + subsequent connect() calls reuse the already-loaded context. + + IMPORTANT: must be applied INSIDE the vcrpy cassette context so that pools + created (and cached) here already carry vcrpy's patched connection stubs. + """ + try: + from urllib3.poolmanager import PoolManager + except ImportError: + yield + return + + _shared_pools: dict = {} + _original_cfh = PoolManager.connection_from_host + + def _inject_shared_ssl_context(pool, scheme: str) -> None: + """Inject a single shared SSLContext into the pool's conn_kw. + + This ensures all connections created by pool._new_conn() reuse one + SSLContext instead of creating a new one (and calling load_default_certs) + on every connect(). + """ + if scheme != "https": + return + if pool.conn_kw.get("ssl_context") is not None: + return + try: + from urllib3.util.ssl_ import create_urllib3_context + ctx = create_urllib3_context() + # load_default_certs() is only called by urllib3 when it creates its own + # context (default_ssl_context=True). Since we supply one, we must call + # it ourselves so that certificate verification works correctly. + ctx.load_default_certs() + pool.conn_kw["ssl_context"] = ctx + except Exception as exc: + logger.warning("VCR: could not inject shared ssl_context into pool for %s: %s", pool.host, exc) + + def _reusing_cfh(self, host, port=None, scheme="http", pool_kwargs=None, **kw): + key = (scheme, host, port) + if key not in _shared_pools: + pool = _original_cfh(self, host, port=port, scheme=scheme, pool_kwargs=pool_kwargs, **kw) + _inject_shared_ssl_context(pool, scheme) + _shared_pools[key] = pool + return _shared_pools[key] + + PoolManager.connection_from_host = _reusing_cfh + try: + yield + finally: + PoolManager.connection_from_host = _original_cfh + for pool in _shared_pools.values(): + try: + pool.close() + except Exception: + pass + _shared_pools.clear() + + def _zero_copy_vcr_response_init(self_resp, recorded_response, *, original_init) -> None: """Patch target for VCRHTTPResponse.__init__ during recording. diff --git a/uv.lock b/uv.lock index 53e0410..aa6d6ab 100644 --- a/uv.lock +++ b/uv.lock @@ -164,7 +164,7 @@ wheels = [ [[package]] name = "keboola-vcr" -version = "0.0.0" +version = "0.1.15" source = { editable = "." } dependencies = [ { name = "freezegun" }, From 6895f156db0201a083723d277cf1beb680cf38ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Thu, 5 Mar 2026 19:57:27 +0100 Subject: [PATCH 3/9] fix(lint): keep _VCRFakeSocket in module scope; fix import sort _vcr_is_connected uses _VCRFakeSocket via a global lookup (module-level function, not a closure). Deleting it with `del` before the property is ever called would raise NameError at runtime. Remove it from the del list so it stays available as a module-level name. Also fix ruff I001: import block sort order in sanitizers.py. Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/recorder.py | 2 +- src/keboola/vcr/sanitizers.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index 57febf0..f6d0e11 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -69,7 +69,7 @@ def _vcr_is_connected(self) -> bool: return False _VCRConnection.is_connected = _vcr_is_connected # type: ignore[attr-defined] - del _vcr_is_connected, _VCRHTTPConnection, _VCRConnection, _VCRFakeSocket + del _vcr_is_connected, _VCRHTTPConnection, _VCRConnection except Exception: pass # --- end urllib3 connection-reuse fix --- diff --git a/src/keboola/vcr/sanitizers.py b/src/keboola/vcr/sanitizers.py index ee52839..72f7950 100644 --- a/src/keboola/vcr/sanitizers.py +++ b/src/keboola/vcr/sanitizers.py @@ -14,7 +14,6 @@ from typing import Any - class BaseSanitizer(ABC): """ Base class for request/response sanitization. From 8efc0c255e863d1750bec0d7ca274fa85fb6cce9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Fri, 6 Mar 2026 18:24:53 +0100 Subject: [PATCH 4/9] style(recorder): add IO[str] type annotation to f parameter Addresses review comment: _write_json_string_chunked and _write_interaction_chunked were missing the type annotation for the file argument. Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/recorder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index f6d0e11..0fda239 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -16,7 +16,7 @@ import os from datetime import datetime, timezone from pathlib import Path -from typing import Any, Callable +from typing import IO, Any, Callable try: import vcr @@ -791,7 +791,7 @@ def getbuffer(self): return memoryview(self._data) -def _write_json_string_chunked(f, s: str, chunk_size: int = 65536) -> None: +def _write_json_string_chunked(f: IO[str], s: str, chunk_size: int = 65536) -> None: """Write a JSON-encoded string to f in 64 KB chunks. json.dumps(large_string) allocates the entire escaped form at once, which @@ -806,7 +806,7 @@ def _write_json_string_chunked(f, s: str, chunk_size: int = 65536) -> None: f.write('"') -def _write_interaction_chunked(f, req_dict: dict, response: dict) -> None: +def _write_interaction_chunked(f: IO[str], req_dict: dict, response: dict) -> None: """Serialize a VCR interaction to f as a JSONL record. The response body string is written in 64 KB chunks (see _write_json_string_chunked) From ec06129c8921f74f22e19372403298d537ae2e80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Fri, 6 Mar 2026 18:25:42 +0100 Subject: [PATCH 5/9] refactor(recorder): remove _write_json_string_chunked, simplify writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The chunked string writing optimization added complexity without measurable benefit — the SSLContext sharing fix is the primary OOM prevention. Replace the manual body-splitting logic with a single json.dump call; _BytesEncoder already handles bytes-to-str conversion. Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/recorder.py | 53 +++---------------------------------- 1 file changed, 3 insertions(+), 50 deletions(-) diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index 0fda239..cc07800 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -791,57 +791,10 @@ def getbuffer(self): return memoryview(self._data) -def _write_json_string_chunked(f: IO[str], s: str, chunk_size: int = 65536) -> None: - """Write a JSON-encoded string to f in 64 KB chunks. - - json.dumps(large_string) allocates the entire escaped form at once, which - for multi-MB bodies creates a transient allocation large enough to fragment - glibc's heap. Chunking keeps each escaped piece ≤ ~130 KB so it falls - just above glibc's mmap threshold and the allocator returns it to the OS - immediately on free rather than leaving it stranded on the heap. - """ - f.write('"') - for i in range(0, len(s), chunk_size): - f.write(json.dumps(s[i : i + chunk_size])[1:-1]) # strip enclosing quotes - f.write('"') - - def _write_interaction_chunked(f: IO[str], req_dict: dict, response: dict) -> None: - """Serialize a VCR interaction to f as a JSONL record. - - The response body string is written in 64 KB chunks (see _write_json_string_chunked) - to prevent large transient allocations from fragmenting glibc's heap. - All other fields are small and serialized normally. - """ - body_dict = response.get("body", {}) - resp_rest = {k: v for k, v in response.items() if k != "body"} - - f.write('{"request": ') - json.dump(req_dict, f, cls=_BytesEncoder, sort_keys=True) - f.write(', "response": {"body": ') - if "string" in body_dict: - body_string = body_dict["string"] - if isinstance(body_string, bytes): - try: - body_str = body_string.decode("utf-8") - except UnicodeDecodeError: - body_str = base64.b64encode(body_string).decode("ascii") - else: - body_str = body_string if body_string is not None else "" - body_rest = {k: v for k, v in body_dict.items() if k != "string"} - f.write('{"string": ') - _write_json_string_chunked(f, body_str) - if body_rest: - f.write(", ") - f.write(json.dumps(body_rest, cls=_BytesEncoder, sort_keys=True)[1:-1]) - f.write("}") - else: - json.dump(body_dict, f, cls=_BytesEncoder, sort_keys=True) - if resp_rest: - f.write(", ") - f.write(json.dumps(resp_rest, cls=_BytesEncoder, sort_keys=True)[1:-1]) - f.write("}}\n") - + """Serialize a VCR interaction to f as a JSONL record.""" + json.dump({"request": req_dict, "response": response}, f, cls=_BytesEncoder, sort_keys=True) + f.write("\n") @contextlib.contextmanager def _pool_reuse_patch(): From dc747d47ae3fb51ee0229f072253851a66f7c688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Fri, 6 Mar 2026 18:25:58 +0100 Subject: [PATCH 6/9] style(recorder): rename _write_interaction_chunked to _write_interaction The "chunked" suffix no longer applies after the simplification. Cleaner name that matches what the function actually does. Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/recorder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index cc07800..99fb21e 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -522,7 +522,7 @@ def _append_interaction(self, temp_path: Path, cassette_before_record_response, return with open(temp_path, "a") as f: - _write_interaction_chunked(f, filtered_request._to_dict(), filtered_response) + _write_interaction(f, filtered_request._to_dict(), filtered_response) def _resolve_freeze_time(self) -> str | None: """Resolve effective freeze_time, reading from metadata if 'auto'.""" @@ -791,7 +791,7 @@ def getbuffer(self): return memoryview(self._data) -def _write_interaction_chunked(f: IO[str], req_dict: dict, response: dict) -> None: +def _write_interaction(f: IO[str], req_dict: dict, response: dict) -> None: """Serialize a VCR interaction to f as a JSONL record.""" json.dump({"request": req_dict, "response": response}, f, cls=_BytesEncoder, sort_keys=True) f.write("\n") From 98d452a955449898852008f3a44b213bb6cfe40a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Fri, 6 Mar 2026 18:26:21 +0100 Subject: [PATCH 7/9] style(sanitizers): use PEP 563 lazy annotations; drop quoted class names The module already imports from __future__ import annotations, so forward-reference string quotes on all merge() signatures are redundant. Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/sanitizers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/keboola/vcr/sanitizers.py b/src/keboola/vcr/sanitizers.py index 72f7950..713f77e 100644 --- a/src/keboola/vcr/sanitizers.py +++ b/src/keboola/vcr/sanitizers.py @@ -227,7 +227,7 @@ def _filter_headers(self, headers: dict) -> dict: result[key] = value return result - def merge(self, other: "DefaultSanitizer") -> "DefaultSanitizer": + def merge(self, other: DefaultSanitizer) -> DefaultSanitizer: """Merge two DefaultSanitizers into one, unioning all their fields.""" merged = DefaultSanitizer.__new__(DefaultSanitizer) merged.sensitive_fields = self.sensitive_fields | other.sensitive_fields @@ -323,7 +323,7 @@ def __init__(self, tokens: list[str], replacement: str = "REDACTED"): self.tokens = [t for t in tokens if t] # Filter out empty strings self.replacement = replacement - def merge(self, other: "TokenSanitizer") -> "TokenSanitizer": + def merge(self, other: TokenSanitizer) -> TokenSanitizer: """Merge two TokenSanitizers into one, unioning their token lists.""" return TokenSanitizer( tokens=list(dict.fromkeys(self.tokens + other.tokens)), @@ -459,7 +459,7 @@ def __init__( self.headers_to_remove = set(h.lower() for h in (headers_to_remove or [])) - def merge(self, other: "HeaderSanitizer") -> "HeaderSanitizer": + def merge(self, other: HeaderSanitizer) -> HeaderSanitizer: """Merge two HeaderSanitizers into one, unioning their header sets.""" merged = HeaderSanitizer.__new__(HeaderSanitizer) merged.safe_headers = self.safe_headers | other.safe_headers @@ -584,7 +584,7 @@ def __init__(self, parameters: list[str] | None = None, replacement: str = "toke # Build regex patterns for each parameter self._patterns = [re.compile(rf"({re.escape(param)}=)[^&\"\s]+") for param in self.parameters] - def merge(self, other: "QueryParamSanitizer") -> "QueryParamSanitizer": + def merge(self, other: QueryParamSanitizer) -> QueryParamSanitizer: """Merge two QueryParamSanitizers into one, unioning their parameter lists.""" return QueryParamSanitizer( parameters=list(dict.fromkeys(self.parameters + other.parameters)), @@ -634,7 +634,7 @@ def __init__(self, patterns: list[tuple]): self._raw_patterns = patterns # stored for merge() self.patterns = [(re.compile(p), r) for p, r in patterns] - def merge(self, other: "UrlPatternSanitizer") -> "UrlPatternSanitizer": + def merge(self, other: UrlPatternSanitizer) -> UrlPatternSanitizer: """Merge two UrlPatternSanitizers into one, unioning their pattern lists.""" seen = set() combined = [] @@ -695,7 +695,7 @@ def __init__(self, dynamic_params: list[str], url_domains: list[str]): domain_alts = "|".join(re.escape(d) for d in url_domains) self._url_re = re.compile(rf'https?://[^\s"\'<>]*(?:{domain_alts})[^\s"\'<>]*') - def merge(self, other: "ResponseUrlSanitizer") -> "ResponseUrlSanitizer": + def merge(self, other: ResponseUrlSanitizer) -> ResponseUrlSanitizer: """Merge two ResponseUrlSanitizers into one, unioning their params and domains.""" return ResponseUrlSanitizer( dynamic_params=list(dict.fromkeys(self.dynamic_params + other.dynamic_params)), From 08c0ed0897e0d31227c53ee74f9cd4d89d856ee9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Fri, 6 Mar 2026 18:28:04 +0100 Subject: [PATCH 8/9] refactor(sanitizers): simplify DefaultSanitizer.merge to use __init__ Replace __new__ + manual field assignment with a clean __init__ call. dict.fromkeys() deduplicates sensitive_values while preserving order; __init__ handles sorting, lowercasing, and pattern compilation. Co-Authored-By: Claude Sonnet 4.6 --- src/keboola/vcr/sanitizers.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/keboola/vcr/sanitizers.py b/src/keboola/vcr/sanitizers.py index 713f77e..ff5baa3 100644 --- a/src/keboola/vcr/sanitizers.py +++ b/src/keboola/vcr/sanitizers.py @@ -229,19 +229,13 @@ def _filter_headers(self, headers: dict) -> dict: def merge(self, other: DefaultSanitizer) -> DefaultSanitizer: """Merge two DefaultSanitizers into one, unioning all their fields.""" - merged = DefaultSanitizer.__new__(DefaultSanitizer) - merged.sensitive_fields = self.sensitive_fields | other.sensitive_fields - merged.safe_headers = self.safe_headers | other.safe_headers - merged.replacement = self.replacement - seen = set() - combined_values = [] - for v in self.sensitive_values + other.sensitive_values: - if v not in seen: - seen.add(v) - combined_values.append(v) - merged.sensitive_values = sorted(combined_values, key=len, reverse=True) - merged._field_patterns = [re.compile(rf"({re.escape(f)}=)[^&\"\s]+") for f in merged.sensitive_fields] - return merged + merged_values = list(dict.fromkeys(self.sensitive_values + other.sensitive_values)) + return DefaultSanitizer( + sensitive_fields=list(self.sensitive_fields | other.sensitive_fields), + safe_headers=list(self.safe_headers | other.safe_headers), + sensitive_values=merged_values, + replacement=self.replacement, + ) # -- Applied uniformly to requests and responses -- From 4a98c8a0bce821e41edd7d483bf89d193fe66618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Jir=C3=A1t?= Date: Fri, 6 Mar 2026 18:28:25 +0100 Subject: [PATCH 9/9] revert(pyproject): restore version = "0.0.0" Versions are managed via git tags and CI release automation, not by manually editing pyproject.toml. Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 621f1a0..39e6fc8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "keboola.vcr" -version = "0.1.15" +version = "0.0.0" description = "VCR recording, sanitization, and validation for Keboola component HTTP interactions" readme = "README.md" license = {text = "MIT"}