diff --git a/src/keboola/vcr/recorder.py b/src/keboola/vcr/recorder.py index ebe0a2f..99fb21e 100644 --- a/src/keboola/vcr/recorder.py +++ b/src/keboola/vcr/recorder.py @@ -9,17 +9,104 @@ from __future__ import annotations import base64 +import contextlib import functools import json import logging import os from datetime import datetime, timezone from pathlib import Path -from typing import Any, Callable +from typing import IO, Any, Callable try: import vcr from vcr.stubs import VCRHTTPResponse as _VCRHTTPResponse + + # --- urllib3 connection-reuse fix --- + # urllib3's pool calls is_connection_dropped(conn) → not conn.is_connected before + # reusing a connection. VCRConnection doesn't define is_connected, so __getattr__ + # delegates to real_connection.is_connected, which checks real_connection.sock. + # + # During VCR RECORDING, APIs like Facebook respond with Connection: close, which + # closes real_connection.sock. urllib3 then sees is_connected=False and calls + # _new_conn(), creating a NEW VCRHTTPConnection + real_connection + ssl_context + # per interaction. Each ssl_context loads the system CA trust store into OpenSSL + # C-heap (~480 KB), so 1.6 Connection-close responses per interaction × ~480 KB + # ≈ 770 KB/interaction of C-heap growth — matching the observed RSS growth rate. + # This C-heap growth does NOT appear in tracemalloc (Python-only) and is not + # released by gc.collect() + malloc_trim because jemalloc manages it differently. + # + # During VCR REPLAY, VCRFakeSocket replaces the real socket, so is_connected on + # the real_connection is always False even though the VCR stub is perfectly usable. + # + # Fix: always return True when real_connection exists (recording) or a VCRFakeSocket + # is present (replay). http.client.HTTPConnection auto-reconnects on its next + # send() if the socket has been closed — so reusing the VCRHTTPConnection is safe + # and reuses the existing ssl_context instead of loading the CA store again. + try: + from vcr.stubs import VCRFakeSocket as _VCRFakeSocket + + _VCRConnection = _VCRHTTPResponse # sentinel — replaced below + + # Walk to VCRConnection (shared base of VCRHTTPConnection / VCRHTTPSConnection) + from vcr.stubs import VCRHTTPConnection as _VCRHTTPConnection + + _VCRConnection = _VCRHTTPConnection.__bases__[0] + + @property # type: ignore[misc] + def _vcr_is_connected(self) -> bool: + sock = getattr(self, "_sock", None) + if isinstance(sock, _VCRFakeSocket): + return True # VCR replay: fake socket is always "connected" + rc = getattr(self, "real_connection", None) + if rc is not None: + # VCR recording: always claim connected so urllib3 reuses this + # VCRHTTPConnection and its ssl_context instead of calling _new_conn(). + # http.client.HTTPConnection.send() calls connect() automatically when + # the socket has been closed (Connection: close), so the real connection + # is re-established transparently on the next request. + return True + return False + + _VCRConnection.is_connected = _vcr_is_connected # type: ignore[attr-defined] + del _vcr_is_connected, _VCRHTTPConnection, _VCRConnection + except Exception: + pass + # --- end urllib3 connection-reuse fix --- + + # --- VCRHTTPResponse.release_conn fix --- + # urllib3 2.x returns VCRHTTPResponse *directly* from _make_request() (it is + # duck-typed as BaseHTTPResponse). urllib3 then stamps + # response._connection = VCRHTTPConnection + # response._pool = HTTPConnectionPool + # on the VCRHTTPResponse instance and returns it straight to requests as + # response.raw. When requests finishes reading the body it calls + # response.raw.release_conn() + # which hits VCRHTTPResponse — NOT urllib3.response.HTTPResponse — so urllib3's + # own release_conn() machinery is bypassed entirely. + # + # Without a working release_conn(), _put_conn() is never called, the pool queue + # stays empty (q=0), and _new_conn() fires for every request. Each _new_conn() + # creates a new VCRHTTPConnection + real urllib3 HTTPSConnection + SSLContext + # (~480 KB of OpenSSL C-heap loaded from the system CA store). That adds up to + # ~8 MB per 10 interactions and causes OOM for jobs with 500+ interactions. + # + # The fix: implement release_conn() to return _connection to the pool, exactly + # mirroring what urllib3.response.HTTPResponse.release_conn() does. This is safe + # because VCRHTTPConnection is just a stub — _put_conn() merely places it back in + # the pool queue so it can be reused on the next request, where vcrpy replaces the + # socket with a VCRFakeSocket / real_connection reconnect as usual. + if not hasattr(_VCRHTTPResponse, "release_conn"): + def _vcr_release_conn(self) -> None: # noqa: E306 + pool = getattr(self, "_pool", None) + conn = getattr(self, "_connection", None) + if pool is not None and conn is not None: + pool._put_conn(conn) + self._connection = None + + _VCRHTTPResponse.release_conn = _vcr_release_conn # type: ignore[attr-defined] + # --- end VCRHTTPResponse.release_conn fix --- + except ImportError: vcr = None # type: ignore _VCRHTTPResponse = None # type: ignore @@ -29,7 +116,14 @@ except ImportError: freeze_time = None # type: ignore -from .sanitizers import BaseSanitizer, CompositeSanitizer, DefaultSanitizer, create_default_sanitizer, extract_values +from .sanitizers import ( + BaseSanitizer, + CompositeSanitizer, + DefaultSanitizer, + _dedup_sanitizers, + create_default_sanitizer, + extract_values, +) logger = logging.getLogger(__name__) @@ -223,6 +317,7 @@ def record_debug_run( chain: list[BaseSanitizer] = [DefaultSanitizer(config=config, sensitive_values=secret_values)] if sanitizers: chain.extend(sanitizers) + chain = _dedup_sanitizers(chain) recorder = cls( cassette_dir=output_dir, @@ -231,6 +326,7 @@ def record_debug_run( freeze_time_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), cassette_file=f"vcr_debug_{component_id}_{config_id}_{timestamp}.json", ) + recorder.record(component_runner) def run( @@ -312,11 +408,12 @@ def record(self, component_runner: Callable[[], None]) -> None: # Suppress the context-exit _save() — we write directly to temp_path cassette._save = lambda force=False: None - if self.freeze_time_at and self.freeze_time_at != "auto": - with freeze_time(self.freeze_time_at): + with _pool_reuse_patch(): + if self.freeze_time_at and self.freeze_time_at != "auto": + with freeze_time(self.freeze_time_at): + component_runner() + else: component_runner() - else: - component_runner() finally: _VCRHTTPResponse.__init__ = _original_vcr_init @@ -425,13 +522,7 @@ def _append_interaction(self, temp_path: Path, cassette_before_record_response, return with open(temp_path, "a") as f: - json.dump( - {"request": filtered_request._to_dict(), "response": filtered_response}, - f, - cls=_BytesEncoder, - sort_keys=True, - ) - f.write("\n") + _write_interaction(f, filtered_request._to_dict(), filtered_response) def _resolve_freeze_time(self) -> str | None: """Resolve effective freeze_time, reading from metadata if 'auto'.""" @@ -700,6 +791,96 @@ def getbuffer(self): return memoryview(self._data) +def _write_interaction(f: IO[str], req_dict: dict, response: dict) -> None: + """Serialize a VCR interaction to f as a JSONL record.""" + json.dump({"request": req_dict, "response": response}, f, cls=_BytesEncoder, sort_keys=True) + f.write("\n") + +@contextlib.contextmanager +def _pool_reuse_patch(): + """Prevent urllib3 connection pool proliferation during VCR recording. + + When the component creates a new requests.Session per API query (common with + Facebook Graph API batch requests), each session creates its own + urllib3.PoolManager which creates a fresh HTTPSConnectionPool for + graph.facebook.com:443. For N interactions this results in N pools, each + with its own SSLContext (~480 KB of C-heap loaded from the system CA store). + + This C-heap growth does NOT appear in Python tracemalloc, is NOT collected + by gc.collect(), and adds ~8 MB per 10 interactions — causing OOM for + Facebook Ads jobs with 500–2000 interactions. + + Fix 1 (pool reuse): intercept PoolManager.connection_from_host at the class + level and return the first pool for any given (scheme, host, port) instead of + letting each new PoolManager create its own. + + Fix 2 (ssl_context reuse): inject a single shared ssl.SSLContext into each + pool's conn_kw so that ALL connections created by _new_conn() for that pool + share the same SSLContext. Without this, each _new_conn() + _validate_conn() + sequence calls real_connection.connect() → _ssl_wrap_socket_and_match_hostname + with ssl_context=None → create_urllib3_context() + load_default_certs() for + EVERY interaction. load_default_certs() loads ~480 KB of CA cert data into + OpenSSL's C-heap; jemalloc never returns this to the OS even after the + SSLContext is freed, so RSS grows ~480 KB per interaction. + + With a shared SSLContext, load_default_certs() runs once per host and all + subsequent connect() calls reuse the already-loaded context. + + IMPORTANT: must be applied INSIDE the vcrpy cassette context so that pools + created (and cached) here already carry vcrpy's patched connection stubs. + """ + try: + from urllib3.poolmanager import PoolManager + except ImportError: + yield + return + + _shared_pools: dict = {} + _original_cfh = PoolManager.connection_from_host + + def _inject_shared_ssl_context(pool, scheme: str) -> None: + """Inject a single shared SSLContext into the pool's conn_kw. + + This ensures all connections created by pool._new_conn() reuse one + SSLContext instead of creating a new one (and calling load_default_certs) + on every connect(). + """ + if scheme != "https": + return + if pool.conn_kw.get("ssl_context") is not None: + return + try: + from urllib3.util.ssl_ import create_urllib3_context + ctx = create_urllib3_context() + # load_default_certs() is only called by urllib3 when it creates its own + # context (default_ssl_context=True). Since we supply one, we must call + # it ourselves so that certificate verification works correctly. + ctx.load_default_certs() + pool.conn_kw["ssl_context"] = ctx + except Exception as exc: + logger.warning("VCR: could not inject shared ssl_context into pool for %s: %s", pool.host, exc) + + def _reusing_cfh(self, host, port=None, scheme="http", pool_kwargs=None, **kw): + key = (scheme, host, port) + if key not in _shared_pools: + pool = _original_cfh(self, host, port=port, scheme=scheme, pool_kwargs=pool_kwargs, **kw) + _inject_shared_ssl_context(pool, scheme) + _shared_pools[key] = pool + return _shared_pools[key] + + PoolManager.connection_from_host = _reusing_cfh + try: + yield + finally: + PoolManager.connection_from_host = _original_cfh + for pool in _shared_pools.values(): + try: + pool.close() + except Exception: + pass + _shared_pools.clear() + + def _zero_copy_vcr_response_init(self_resp, recorded_response, *, original_init) -> None: """Patch target for VCRHTTPResponse.__init__ during recording. diff --git a/src/keboola/vcr/sanitizers.py b/src/keboola/vcr/sanitizers.py index 5e9206c..ff5baa3 100644 --- a/src/keboola/vcr/sanitizers.py +++ b/src/keboola/vcr/sanitizers.py @@ -146,7 +146,22 @@ def _sanitize_body(self, body: str) -> str: if not body: return body - # 1. Try JSON + # Quick pre-scan: skip expensive JSON parse if no sensitive content is present. + # Use quoted field names for JSON key matching (avoids false positives from substrings). + needs_json_sanitization = any(f'"{field}"' in body for field in self.sensitive_fields) or any( + value in body for value in self.sensitive_values + ) + if not needs_json_sanitization: + # Still apply form-encoded / exact value checks (cheap string ops) + result = body + for pattern in self._field_patterns: + result = pattern.sub(rf"\1{self.replacement}", result) + for value in self.sensitive_values: + if value in result: + result = result.replace(value, self.replacement) + return result + + # 1. Try JSON (only when pre-scan found a potential sensitive field/value) try: data = json.loads(body) changed = [False] @@ -212,6 +227,16 @@ def _filter_headers(self, headers: dict) -> dict: result[key] = value return result + def merge(self, other: DefaultSanitizer) -> DefaultSanitizer: + """Merge two DefaultSanitizers into one, unioning all their fields.""" + merged_values = list(dict.fromkeys(self.sensitive_values + other.sensitive_values)) + return DefaultSanitizer( + sensitive_fields=list(self.sensitive_fields | other.sensitive_fields), + safe_headers=list(self.safe_headers | other.safe_headers), + sensitive_values=merged_values, + replacement=self.replacement, + ) + # -- Applied uniformly to requests and responses -- def before_record_request(self, request: Any) -> Any: @@ -292,6 +317,13 @@ def __init__(self, tokens: list[str], replacement: str = "REDACTED"): self.tokens = [t for t in tokens if t] # Filter out empty strings self.replacement = replacement + def merge(self, other: TokenSanitizer) -> TokenSanitizer: + """Merge two TokenSanitizers into one, unioning their token lists.""" + return TokenSanitizer( + tokens=list(dict.fromkeys(self.tokens + other.tokens)), + replacement=self.replacement, + ) + def _sanitize_string(self, value: str) -> str: """Replace all tokens in a string.""" result = value @@ -421,6 +453,13 @@ def __init__( self.headers_to_remove = set(h.lower() for h in (headers_to_remove or [])) + def merge(self, other: HeaderSanitizer) -> HeaderSanitizer: + """Merge two HeaderSanitizers into one, unioning their header sets.""" + merged = HeaderSanitizer.__new__(HeaderSanitizer) + merged.safe_headers = self.safe_headers | other.safe_headers + merged.headers_to_remove = self.headers_to_remove | other.headers_to_remove + return merged + def _filter_headers(self, headers: dict) -> dict: """Filter headers to only include safe ones.""" result = {} @@ -539,6 +578,13 @@ def __init__(self, parameters: list[str] | None = None, replacement: str = "toke # Build regex patterns for each parameter self._patterns = [re.compile(rf"({re.escape(param)}=)[^&\"\s]+") for param in self.parameters] + def merge(self, other: QueryParamSanitizer) -> QueryParamSanitizer: + """Merge two QueryParamSanitizers into one, unioning their parameter lists.""" + return QueryParamSanitizer( + parameters=list(dict.fromkeys(self.parameters + other.parameters)), + replacement=self.replacement, + ) + def _sanitize_string(self, value: str) -> str: """Replace parameter values in a string.""" for pattern in self._patterns: @@ -579,8 +625,20 @@ def __init__(self, patterns: list[tuple]): Pattern is a regex string, replacement is the string to use for matches. """ + self._raw_patterns = patterns # stored for merge() self.patterns = [(re.compile(p), r) for p, r in patterns] + def merge(self, other: UrlPatternSanitizer) -> UrlPatternSanitizer: + """Merge two UrlPatternSanitizers into one, unioning their pattern lists.""" + seen = set() + combined = [] + for p, r in self._raw_patterns + other._raw_patterns: + key = (p, r) + if key not in seen: + seen.add(key) + combined.append((p, r)) + return UrlPatternSanitizer(patterns=combined) + def _sanitize_url(self, url: str) -> str: """Apply all patterns to the URL.""" result = url @@ -620,6 +678,7 @@ def __init__(self, dynamic_params: list[str], url_domains: list[str]): dynamic_params: Query parameter names to strip from matching URLs. url_domains: Domain substrings that identify URLs to sanitize. """ + self.dynamic_params = dynamic_params # stored for merge() self.url_domains = url_domains # Build a single regex that matches any of the dynamic params as # query-string key=value pairs (including leading & or ?). @@ -630,6 +689,13 @@ def __init__(self, dynamic_params: list[str], url_domains: list[str]): domain_alts = "|".join(re.escape(d) for d in url_domains) self._url_re = re.compile(rf'https?://[^\s"\'<>]*(?:{domain_alts})[^\s"\'<>]*') + def merge(self, other: ResponseUrlSanitizer) -> ResponseUrlSanitizer: + """Merge two ResponseUrlSanitizers into one, unioning their params and domains.""" + return ResponseUrlSanitizer( + dynamic_params=list(dict.fromkeys(self.dynamic_params + other.dynamic_params)), + url_domains=list(dict.fromkeys(self.url_domains + other.url_domains)), + ) + def _body_has_matching_domain(self, body_str: str) -> bool: """Quick check whether the body contains any target domain.""" return any(domain in body_str for domain in self.url_domains) @@ -785,6 +851,24 @@ def before_record_response(self, response: dict) -> dict: return response +def _dedup_sanitizers(sanitizers: list["BaseSanitizer"]) -> list["BaseSanitizer"]: + """Merge same-class sanitizers to avoid redundant processing passes. + + Preserves order (first occurrence of each class keeps its position). + Sanitizer classes without a ``merge()`` method are kept as-is. + """ + result: list[BaseSanitizer] = [] + by_class: dict[type, int] = {} # class → index in result + for s in sanitizers: + cls = type(s) + if cls in by_class and hasattr(result[by_class[cls]], "merge"): + result[by_class[cls]] = result[by_class[cls]].merge(s) + else: + by_class[cls] = len(result) + result.append(s) + return result + + def create_default_sanitizer(secrets: dict[str, Any]) -> DefaultSanitizer: """ Create a default sanitizer from secrets. diff --git a/uv.lock b/uv.lock index 53e0410..aa6d6ab 100644 --- a/uv.lock +++ b/uv.lock @@ -164,7 +164,7 @@ wheels = [ [[package]] name = "keboola-vcr" -version = "0.0.0" +version = "0.1.15" source = { editable = "." } dependencies = [ { name = "freezegun" },