Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "keboola.vcr"
version = "0.0.0"
version = "0.1.15"
description = "VCR recording, sanitization, and validation for Keboola component HTTP interactions"
readme = "README.md"
license = {text = "MIT"}
Expand Down
252 changes: 240 additions & 12 deletions src/keboola/vcr/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import base64
import contextlib
import functools
import json
import logging
Expand All @@ -20,6 +21,92 @@
try:
import vcr
from vcr.stubs import VCRHTTPResponse as _VCRHTTPResponse

# --- urllib3 connection-reuse fix ---
# urllib3's pool calls is_connection_dropped(conn) → not conn.is_connected before
# reusing a connection. VCRConnection doesn't define is_connected, so __getattr__
# delegates to real_connection.is_connected, which checks real_connection.sock.
#
# During VCR RECORDING, APIs like Facebook respond with Connection: close, which
# closes real_connection.sock. urllib3 then sees is_connected=False and calls
# _new_conn(), creating a NEW VCRHTTPConnection + real_connection + ssl_context
# per interaction. Each ssl_context loads the system CA trust store into OpenSSL
# C-heap (~480 KB), so 1.6 Connection-close responses per interaction × ~480 KB
# ≈ 770 KB/interaction of C-heap growth — matching the observed RSS growth rate.
# This C-heap growth does NOT appear in tracemalloc (Python-only) and is not
# released by gc.collect() + malloc_trim because jemalloc manages it differently.
#
# During VCR REPLAY, VCRFakeSocket replaces the real socket, so is_connected on
# the real_connection is always False even though the VCR stub is perfectly usable.
#
# Fix: always return True when real_connection exists (recording) or a VCRFakeSocket
# is present (replay). http.client.HTTPConnection auto-reconnects on its next
# send() if the socket has been closed — so reusing the VCRHTTPConnection is safe
# and reuses the existing ssl_context instead of loading the CA store again.
try:
from vcr.stubs import VCRFakeSocket as _VCRFakeSocket

_VCRConnection = _VCRHTTPResponse # sentinel — replaced below

# Walk to VCRConnection (shared base of VCRHTTPConnection / VCRHTTPSConnection)
from vcr.stubs import VCRHTTPConnection as _VCRHTTPConnection

_VCRConnection = _VCRHTTPConnection.__bases__[0]

@property # type: ignore[misc]
def _vcr_is_connected(self) -> bool:
sock = getattr(self, "_sock", None)
if isinstance(sock, _VCRFakeSocket):
return True # VCR replay: fake socket is always "connected"
rc = getattr(self, "real_connection", None)
if rc is not None:
# VCR recording: always claim connected so urllib3 reuses this
# VCRHTTPConnection and its ssl_context instead of calling _new_conn().
# http.client.HTTPConnection.send() calls connect() automatically when
# the socket has been closed (Connection: close), so the real connection
# is re-established transparently on the next request.
return True
return False

_VCRConnection.is_connected = _vcr_is_connected # type: ignore[attr-defined]
del _vcr_is_connected, _VCRHTTPConnection, _VCRConnection
except Exception:
pass
# --- end urllib3 connection-reuse fix ---

# --- VCRHTTPResponse.release_conn fix ---
# urllib3 2.x returns VCRHTTPResponse *directly* from _make_request() (it is
# duck-typed as BaseHTTPResponse). urllib3 then stamps
# response._connection = VCRHTTPConnection
# response._pool = HTTPConnectionPool
# on the VCRHTTPResponse instance and returns it straight to requests as
# response.raw. When requests finishes reading the body it calls
# response.raw.release_conn()
# which hits VCRHTTPResponse — NOT urllib3.response.HTTPResponse — so urllib3's
# own release_conn() machinery is bypassed entirely.
#
# Without a working release_conn(), _put_conn() is never called, the pool queue
# stays empty (q=0), and _new_conn() fires for every request. Each _new_conn()
# creates a new VCRHTTPConnection + real urllib3 HTTPSConnection + SSLContext
# (~480 KB of OpenSSL C-heap loaded from the system CA store). That adds up to
# ~8 MB per 10 interactions and causes OOM for jobs with 500+ interactions.
#
# The fix: implement release_conn() to return _connection to the pool, exactly
# mirroring what urllib3.response.HTTPResponse.release_conn() does. This is safe
# because VCRHTTPConnection is just a stub — _put_conn() merely places it back in
# the pool queue so it can be reused on the next request, where vcrpy replaces the
# socket with a VCRFakeSocket / real_connection reconnect as usual.
if not hasattr(_VCRHTTPResponse, "release_conn"):
def _vcr_release_conn(self) -> None: # noqa: E306
pool = getattr(self, "_pool", None)
conn = getattr(self, "_connection", None)
if pool is not None and conn is not None:
pool._put_conn(conn)
self._connection = None

_VCRHTTPResponse.release_conn = _vcr_release_conn # type: ignore[attr-defined]
# --- end VCRHTTPResponse.release_conn fix ---

except ImportError:
vcr = None # type: ignore
_VCRHTTPResponse = None # type: ignore
Expand All @@ -29,7 +116,14 @@
except ImportError:
freeze_time = None # type: ignore

from .sanitizers import BaseSanitizer, CompositeSanitizer, DefaultSanitizer, create_default_sanitizer, extract_values
from .sanitizers import (
BaseSanitizer,
CompositeSanitizer,
DefaultSanitizer,
_dedup_sanitizers,
create_default_sanitizer,
extract_values,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -223,6 +317,7 @@ def record_debug_run(
chain: list[BaseSanitizer] = [DefaultSanitizer(config=config, sensitive_values=secret_values)]
if sanitizers:
chain.extend(sanitizers)
chain = _dedup_sanitizers(chain)

recorder = cls(
cassette_dir=output_dir,
Expand All @@ -231,6 +326,7 @@ def record_debug_run(
freeze_time_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
cassette_file=f"vcr_debug_{component_id}_{config_id}_{timestamp}.json",
)

recorder.record(component_runner)

def run(
Expand Down Expand Up @@ -312,11 +408,12 @@ def record(self, component_runner: Callable[[], None]) -> None:
# Suppress the context-exit _save() — we write directly to temp_path
cassette._save = lambda force=False: None

if self.freeze_time_at and self.freeze_time_at != "auto":
with freeze_time(self.freeze_time_at):
with _pool_reuse_patch():
if self.freeze_time_at and self.freeze_time_at != "auto":
with freeze_time(self.freeze_time_at):
component_runner()
else:
component_runner()
else:
component_runner()
finally:
_VCRHTTPResponse.__init__ = _original_vcr_init

Expand Down Expand Up @@ -425,13 +522,7 @@ def _append_interaction(self, temp_path: Path, cassette_before_record_response,
return

with open(temp_path, "a") as f:
json.dump(
{"request": filtered_request._to_dict(), "response": filtered_response},
f,
cls=_BytesEncoder,
sort_keys=True,
)
f.write("\n")
_write_interaction_chunked(f, filtered_request._to_dict(), filtered_response)

def _resolve_freeze_time(self) -> str | None:
"""Resolve effective freeze_time, reading from metadata if 'auto'."""
Expand Down Expand Up @@ -700,6 +791,143 @@ def getbuffer(self):
return memoryview(self._data)


def _write_json_string_chunked(f, s: str, chunk_size: int = 65536) -> None:
"""Write a JSON-encoded string to f in 64 KB chunks.

json.dumps(large_string) allocates the entire escaped form at once, which
for multi-MB bodies creates a transient allocation large enough to fragment
glibc's heap. Chunking keeps each escaped piece ≤ ~130 KB so it falls
just above glibc's mmap threshold and the allocator returns it to the OS
immediately on free rather than leaving it stranded on the heap.
"""
f.write('"')
for i in range(0, len(s), chunk_size):
f.write(json.dumps(s[i : i + chunk_size])[1:-1]) # strip enclosing quotes
f.write('"')


def _write_interaction_chunked(f, req_dict: dict, response: dict) -> None:
"""Serialize a VCR interaction to f as a JSONL record.

The response body string is written in 64 KB chunks (see _write_json_string_chunked)
to prevent large transient allocations from fragmenting glibc's heap.
All other fields are small and serialized normally.
"""
body_dict = response.get("body", {})
resp_rest = {k: v for k, v in response.items() if k != "body"}

f.write('{"request": ')
json.dump(req_dict, f, cls=_BytesEncoder, sort_keys=True)
f.write(', "response": {"body": ')
if "string" in body_dict:
body_string = body_dict["string"]
if isinstance(body_string, bytes):
try:
body_str = body_string.decode("utf-8")
except UnicodeDecodeError:
body_str = base64.b64encode(body_string).decode("ascii")
else:
body_str = body_string if body_string is not None else ""
body_rest = {k: v for k, v in body_dict.items() if k != "string"}
f.write('{"string": ')
_write_json_string_chunked(f, body_str)
if body_rest:
f.write(", ")
f.write(json.dumps(body_rest, cls=_BytesEncoder, sort_keys=True)[1:-1])
f.write("}")
else:
json.dump(body_dict, f, cls=_BytesEncoder, sort_keys=True)
if resp_rest:
f.write(", ")
f.write(json.dumps(resp_rest, cls=_BytesEncoder, sort_keys=True)[1:-1])
f.write("}}\n")


@contextlib.contextmanager
def _pool_reuse_patch():
"""Prevent urllib3 connection pool proliferation during VCR recording.

When the component creates a new requests.Session per API query (common with
Facebook Graph API batch requests), each session creates its own
urllib3.PoolManager which creates a fresh HTTPSConnectionPool for
graph.facebook.com:443. For N interactions this results in N pools, each
with its own SSLContext (~480 KB of C-heap loaded from the system CA store).

This C-heap growth does NOT appear in Python tracemalloc, is NOT collected
by gc.collect(), and adds ~8 MB per 10 interactions — causing OOM for
Facebook Ads jobs with 500–2000 interactions.

Fix 1 (pool reuse): intercept PoolManager.connection_from_host at the class
level and return the first pool for any given (scheme, host, port) instead of
letting each new PoolManager create its own.

Fix 2 (ssl_context reuse): inject a single shared ssl.SSLContext into each
pool's conn_kw so that ALL connections created by _new_conn() for that pool
share the same SSLContext. Without this, each _new_conn() + _validate_conn()
sequence calls real_connection.connect() → _ssl_wrap_socket_and_match_hostname
with ssl_context=None → create_urllib3_context() + load_default_certs() for
EVERY interaction. load_default_certs() loads ~480 KB of CA cert data into
OpenSSL's C-heap; jemalloc never returns this to the OS even after the
SSLContext is freed, so RSS grows ~480 KB per interaction.

With a shared SSLContext, load_default_certs() runs once per host and all
subsequent connect() calls reuse the already-loaded context.

IMPORTANT: must be applied INSIDE the vcrpy cassette context so that pools
created (and cached) here already carry vcrpy's patched connection stubs.
"""
try:
from urllib3.poolmanager import PoolManager
except ImportError:
yield
return

_shared_pools: dict = {}
_original_cfh = PoolManager.connection_from_host

def _inject_shared_ssl_context(pool, scheme: str) -> None:
"""Inject a single shared SSLContext into the pool's conn_kw.

This ensures all connections created by pool._new_conn() reuse one
SSLContext instead of creating a new one (and calling load_default_certs)
on every connect().
"""
if scheme != "https":
return
if pool.conn_kw.get("ssl_context") is not None:
return
try:
from urllib3.util.ssl_ import create_urllib3_context
ctx = create_urllib3_context()
# load_default_certs() is only called by urllib3 when it creates its own
# context (default_ssl_context=True). Since we supply one, we must call
# it ourselves so that certificate verification works correctly.
ctx.load_default_certs()
pool.conn_kw["ssl_context"] = ctx
except Exception as exc:
logger.warning("VCR: could not inject shared ssl_context into pool for %s: %s", pool.host, exc)

def _reusing_cfh(self, host, port=None, scheme="http", pool_kwargs=None, **kw):
key = (scheme, host, port)
if key not in _shared_pools:
pool = _original_cfh(self, host, port=port, scheme=scheme, pool_kwargs=pool_kwargs, **kw)
_inject_shared_ssl_context(pool, scheme)
_shared_pools[key] = pool
return _shared_pools[key]

PoolManager.connection_from_host = _reusing_cfh
try:
yield
finally:
PoolManager.connection_from_host = _original_cfh
for pool in _shared_pools.values():
try:
pool.close()
except Exception:
pass
_shared_pools.clear()


def _zero_copy_vcr_response_init(self_resp, recorded_response, *, original_init) -> None:
"""Patch target for VCRHTTPResponse.__init__ during recording.

Expand Down
Loading