From 12ad01953ed9d63341831bb7e4a92c740e44ebf4 Mon Sep 17 00:00:00 2001 From: Subrata Paitandi Date: Thu, 19 Mar 2026 11:12:29 +0530 Subject: [PATCH 1/2] base logic change --- mssql_python/pybind/ddbc_bindings.cpp | 141 ++++++++----- tests/test_013_encoding_decoding.py | 273 ++++++++++++++++++++++---- 2 files changed, 333 insertions(+), 81 deletions(-) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 0933d4fa..ca940779 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -2914,6 +2914,10 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p // Note: wcharEncoding parameter is reserved for future use // Currently WCHAR data always uses UTF-16LE for Windows compatibility (void)wcharEncoding; // Suppress unused parameter warning +#if !defined(__APPLE__) && !defined(__linux__) + // On Windows, VARCHAR is fetched as SQL_C_WCHAR, so charEncoding is unused. + (void)charEncoding; +#endif LOG("SQLGetData: Getting data from %d columns for statement_handle=%p", colCount, (void*)StatementHandle->get()); @@ -2949,6 +2953,8 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: { +#if defined(__APPLE__) || defined(__linux__) + // On Linux/macOS, the ODBC driver returns UTF-8 for SQL_C_CHAR. if (columnSize == SQL_NO_TOTAL || columnSize == 0 || columnSize > SQL_MAX_LOB_SIZE) { LOG("SQLGetData: Streaming LOB for column %d (SQL_C_CHAR) " @@ -2957,34 +2963,16 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p row.append( FetchLobColumnData(hStmt, i, SQL_C_CHAR, false, false, charEncoding)); } else { - // Allocate columnSize * 4 + 1 on ALL platforms (no #if guard). - // - // Why this differs from SQLBindColums / FetchBatchData: - // Those two functions use #if to apply *4 only on Linux/macOS, - // because on Windows with a non-UTF-8 collation (e.g. CP1252) - // each character occupies exactly 1 byte, so *1 suffices and - // saves memory across the entire batch (fetchSize × numCols - // buffers). - // - // SQLGetData_wrap allocates a single temporary buffer per - // column per row, so the over-allocation cost is negligible. - // Using *4 unconditionally here keeps the code simple and - // correct on every platform—including Windows with a UTF-8 - // collation where multi-byte chars could otherwise cause - // truncation at the exact column boundary (e.g. CP1252 é in - // VARCHAR(10)). + // Allocate columnSize * 4 + 1 to accommodate UTF-8 expansion. uint64_t fetchBufferSize = columnSize * 4 + 1 /* null-termination */; std::vector dataBuffer(fetchBufferSize); SQLLEN dataLen; ret = SQLGetData_ptr(hStmt, i, SQL_C_CHAR, dataBuffer.data(), dataBuffer.size(), &dataLen); if (SQL_SUCCEEDED(ret)) { - // columnSize is in chars, dataLen is in bytes if (dataLen > 0) { uint64_t numCharsInData = dataLen / sizeof(SQLCHAR); if (numCharsInData < dataBuffer.size()) { - // SQLGetData will null-terminate the data - // Use Python's codec system to decode bytes. const std::string decodeEncoding = GetEffectiveCharDecoding(charEncoding); py::bytes raw_bytes(reinterpret_cast(dataBuffer.data()), @@ -3001,11 +2989,9 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p LOG_ERROR( "SQLGetData: Failed to decode CHAR column %d with '%s': %s", i, decodeEncoding.c_str(), e.what()); - // Return raw bytes as fallback row.append(raw_bytes); } } else { - // Buffer too small, fallback to streaming LOG("SQLGetData: CHAR column %d data truncated " "(buffer_size=%zu), using streaming LOB", i, dataBuffer.size()); @@ -3037,6 +3023,66 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p row.append(py::none()); } } +#else + // On Windows, request SQL_C_WCHAR so the ODBC driver converts + // from the server's native encoding (e.g. CP1252) to UTF-16. + // This avoids the need to guess the server's code page and + // eliminates the bytes-vs-str inconsistency. + if (columnSize == SQL_NO_TOTAL || columnSize == 0 || + columnSize > SQL_MAX_LOB_SIZE) { + LOG("SQLGetData: Streaming LOB for column %d (VARCHAR as SQL_C_WCHAR) " + "- columnSize=%lu", + i, (unsigned long)columnSize); + row.append(FetchLobColumnData(hStmt, i, SQL_C_WCHAR, true, false, "utf-16le")); + } else { + uint64_t fetchBufferSize = + (columnSize + 1) * sizeof(SQLWCHAR); // +1 for null terminator + std::vector dataBuffer(columnSize + 1); + SQLLEN dataLen; + ret = SQLGetData_ptr(hStmt, i, SQL_C_WCHAR, dataBuffer.data(), fetchBufferSize, + &dataLen); + if (SQL_SUCCEEDED(ret)) { + if (dataLen > 0) { + uint64_t numCharsInData = dataLen / sizeof(SQLWCHAR); + if (numCharsInData < dataBuffer.size()) { + std::wstring wstr(reinterpret_cast(dataBuffer.data())); + row.append(py::cast(wstr)); + LOG("SQLGetData: VARCHAR column %d decoded via SQL_C_WCHAR, " + "length=%lu", + i, (unsigned long)numCharsInData); + } else { + LOG("SQLGetData: VARCHAR column %d data truncated " + "(as WCHAR), using streaming LOB", + i); + row.append(FetchLobColumnData(hStmt, i, SQL_C_WCHAR, true, false, + "utf-16le")); + } + } else if (dataLen == SQL_NULL_DATA) { + LOG("SQLGetData: Column %d is NULL (VARCHAR via WCHAR)", i); + row.append(py::none()); + } else if (dataLen == 0) { + row.append(py::str("")); + } else if (dataLen == SQL_NO_TOTAL) { + LOG("SQLGetData: Cannot determine data length " + "(SQL_NO_TOTAL) for column %d (VARCHAR via WCHAR), " + "returning NULL", + i); + row.append(py::none()); + } else if (dataLen < 0) { + LOG("SQLGetData: Unexpected negative data length " + "for column %d (VARCHAR via WCHAR) - dataLen=%ld", + i, (long)dataLen); + ThrowStdException("SQLGetData returned an unexpected negative " + "data length"); + } + } else { + LOG("SQLGetData: Error retrieving data for column %d " + "(VARCHAR via WCHAR) - SQLRETURN=%d, returning NULL", + i, ret); + row.append(py::none()); + } + } +#endif break; } case SQL_SS_XML: { @@ -3487,29 +3533,26 @@ SQLRETURN SQLBindColums(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& column // TODO: handle variable length data correctly. This logic wont // suffice HandleZeroColumnSizeAtFetch(columnSize); - // Use columnSize * 4 + 1 on Linux/macOS to accommodate UTF-8 - // expansion. The ODBC driver returns UTF-8 for SQL_C_CHAR where - // each character can be up to 4 bytes. #if defined(__APPLE__) || defined(__linux__) + // On Linux/macOS, the ODBC driver returns UTF-8 for SQL_C_CHAR + // where each character can be up to 4 bytes. uint64_t fetchBufferSize = columnSize * 4 + 1 /*null-terminator*/; -#else - uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; -#endif - // TODO: For LONGVARCHAR/BINARY types, columnSize is returned as - // 2GB-1 by SQLDescribeCol. So fetchBufferSize = 2GB. - // fetchSize=1 if columnSize>1GB. So we'll allocate a vector of - // size 2GB. If a query fetches multiple (say N) LONG... - // columns, we will have allocated multiple (N) 2GB sized - // vectors. This will make driver very slow. And if the N is - // high enough, we could hit the OS limit for heap memory that - // we can allocate, & hence get a std::bad_alloc. The process - // could also be killed by OS for consuming too much memory. - // Hence this will be revisited in beta to not allocate 2GB+ - // memory, & use streaming instead buffers.charBuffers[col - 1].resize(fetchSize * fetchBufferSize); ret = SQLBindCol_ptr(hStmt, col, SQL_C_CHAR, buffers.charBuffers[col - 1].data(), fetchBufferSize * sizeof(SQLCHAR), buffers.indicators[col - 1].data()); +#else + // On Windows, the ODBC driver returns bytes in the server's + // native encoding (e.g., CP1252). Rather than guessing the + // code page, we request SQL_C_WCHAR so the driver performs + // the conversion to UTF-16 — exactly matching how NVARCHAR + // columns are already handled. + uint64_t fetchBufferSize = columnSize + 1 /*null-terminator*/; + buffers.wcharBuffers[col - 1].resize(fetchSize * fetchBufferSize); + ret = SQLBindCol_ptr(hStmt, col, SQL_C_WCHAR, buffers.wcharBuffers[col - 1].data(), + fetchBufferSize * sizeof(SQLWCHAR), + buffers.indicators[col - 1].data()); +#endif break; } case SQL_WCHAR: @@ -3675,9 +3718,9 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum HandleZeroColumnSizeAtFetch(columnInfos[col].processedColumnSize); // On Linux/macOS, the ODBC driver returns UTF-8 for SQL_C_CHAR where // each character can be up to 4 bytes. Must match SQLBindColums buffer. -#if defined(__APPLE__) || defined(__linux__) SQLSMALLINT dt = columnInfos[col].dataType; bool isCharType = (dt == SQL_CHAR || dt == SQL_VARCHAR || dt == SQL_LONGVARCHAR); +#if defined(__APPLE__) || defined(__linux__) if (isCharType) { columnInfos[col].fetchBufferSize = columnInfos[col].processedColumnSize * 4 + 1; // *4 for UTF-8, +1 for null terminator @@ -3686,6 +3729,10 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum columnInfos[col].processedColumnSize + 1; // +1 for null terminator } #else + // On Windows, VARCHAR columns are fetched as SQL_C_WCHAR (see + // SQLBindColums). The fetchBufferSize is in SQLWCHAR elements, + // matching the wcharBuffers layout. + (void)isCharType; // same formula for all types on Windows columnInfos[col].fetchBufferSize = columnInfos[col].processedColumnSize + 1; // +1 for null terminator #endif @@ -3740,7 +3787,14 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_CHAR: case SQL_VARCHAR: case SQL_LONGVARCHAR: +#if defined(__APPLE__) || defined(__linux__) columnProcessors[col] = ColumnProcessors::ProcessChar; +#else + // On Windows, VARCHAR columns are fetched as SQL_C_WCHAR + // (the driver converts from the server's native encoding to + // UTF-16), so we reuse the NVARCHAR processor. + columnProcessors[col] = ColumnProcessors::ProcessWChar; +#endif break; case SQL_WCHAR: case SQL_WVARCHAR: @@ -4048,7 +4102,8 @@ size_t calculateRowSize(py::list& columnNames, SQLUSMALLINT numCols) { break; case SQL_SS_UDT: rowSize += (static_cast(columnSize) == SQL_NO_TOTAL || columnSize == 0) - ? SQL_MAX_LOB_SIZE : columnSize; + ? SQL_MAX_LOB_SIZE + : columnSize; break; case SQL_BINARY: case SQL_VARBINARY: @@ -4112,8 +4167,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || dataType == SQL_VARBINARY || - dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML || - dataType == SQL_SS_UDT) && + dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML || dataType == SQL_SS_UDT) && (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { lobColumns.push_back(i + 1); // 1-based } @@ -4252,8 +4306,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows, if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || dataType == SQL_VARBINARY || - dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML || - dataType == SQL_SS_UDT) && + dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML || dataType == SQL_SS_UDT) && (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { lobColumns.push_back(i + 1); // 1-based } diff --git a/tests/test_013_encoding_decoding.py b/tests/test_013_encoding_decoding.py index 034afae6..3fe672a6 100644 --- a/tests/test_013_encoding_decoding.py +++ b/tests/test_013_encoding_decoding.py @@ -4,7 +4,7 @@ This consolidated module provides complete testing for encoding/decoding functionality in mssql-python, thread safety, and connection pooling support. -Total Tests: 131 +Total Tests: 154 Test Categories: ================ @@ -1078,13 +1078,15 @@ def test_setdecoding_with_unicode_data(db_connection): try: # Create test table with NVARCHAR columns for Unicode support - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_decoding_unicode ( id INT IDENTITY(1,1), ascii_col VARCHAR(100), unicode_col NVARCHAR(100) ) - """) + """ + ) # Test ASCII strings in VARCHAR (safe) ascii_strings = [ @@ -1159,7 +1161,8 @@ def test_encoding_decoding_comprehensive_unicode_characters(db_connection): try: # Create test table with different column types - use NVARCHAR for better Unicode support - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_encoding_comprehensive ( id INT PRIMARY KEY, varchar_col VARCHAR(1000), @@ -1167,7 +1170,8 @@ def test_encoding_decoding_comprehensive_unicode_characters(db_connection): text_col TEXT, ntext_col NTEXT ) - """) + """ + ) # Test cases with different Unicode character categories test_cases = [ @@ -1329,7 +1333,8 @@ def test_encoding_decoding_edge_case_data_types(db_connection): try: # Create table with various data types - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_encoding_datatypes ( id INT PRIMARY KEY, varchar_small VARCHAR(50), @@ -1341,7 +1346,8 @@ def test_encoding_decoding_edge_case_data_types(db_connection): text_type TEXT, ntext_type NTEXT ) - """) + """ + ) # Test different encoding configurations test_configs = [ @@ -1633,14 +1639,16 @@ def test_encoding_decoding_large_dataset_performance(db_connection): cursor = db_connection.cursor() try: - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_large_encoding ( id INT PRIMARY KEY, ascii_data VARCHAR(1000), unicode_data NVARCHAR(1000), mixed_data NVARCHAR(MAX) ) - """) + """ + ) # Generate test data - ensure it fits in column sizes ascii_text = "This is ASCII text with numbers 12345." * 10 # ~400 chars @@ -1809,13 +1817,15 @@ def test_encoding_decoding_metadata_columns(db_connection): try: # Create table with Unicode column names if supported - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_metadata ( [normal_col] NVARCHAR(100), [column_with_unicode_测试] NVARCHAR(100), [special_chars_ñáéíóú] INT ) - """) + """ + ) # Test metadata decoding configuration db_connection.setdecoding(mssql_python.SQL_WMETADATA, encoding="utf-16le", ctype=SQL_WCHAR) @@ -1889,7 +1899,8 @@ def test_encoding_decoding_stress_test_comprehensive(db_connection): cursor = db_connection.cursor() try: - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #stress_test_encoding ( id INT IDENTITY(1,1) PRIMARY KEY, ascii_text VARCHAR(500), @@ -1897,7 +1908,8 @@ def test_encoding_decoding_stress_test_comprehensive(db_connection): binary_data VARBINARY(500), mixed_content NVARCHAR(MAX) ) - """) + """ + ) # Generate diverse test data test_datasets = [] @@ -2018,13 +2030,15 @@ def test_encoding_decoding_sql_char_various_encodings(db_connection): try: # Create test table with VARCHAR columns (SQL_CHAR type) - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_sql_char_encodings ( id INT PRIMARY KEY, data_col VARCHAR(100), description VARCHAR(200) ) - """) + """ + ) # Define various encoding types to test with SQL_CHAR encoding_tests = [ @@ -2301,13 +2315,15 @@ def test_encoding_decoding_sql_char_with_unicode_fallback(db_connection): try: # Create test table with both VARCHAR and NVARCHAR - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_unicode_fallback ( id INT PRIMARY KEY, varchar_data VARCHAR(100), nvarchar_data NVARCHAR(100) ) - """) + """ + ) # Test Unicode data unicode_test_cases = [ @@ -2378,13 +2394,15 @@ def test_encoding_decoding_sql_char_native_character_sets(db_connection): try: # Create test table - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_native_chars ( id INT PRIMARY KEY, data VARCHAR(200), encoding_used VARCHAR(50) ) - """) + """ + ) # Test encoding-specific character sets that should work encoding_native_tests = [ @@ -2519,13 +2537,15 @@ def test_encoding_decoding_sql_char_boundary_encoding_cases(db_connection): try: # Create test table - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_encoding_boundaries ( id INT PRIMARY KEY, test_data VARCHAR(500), test_type VARCHAR(100) ) - """) + """ + ) # Test boundary cases for different encodings boundary_tests = [ @@ -2626,14 +2646,16 @@ def test_encoding_decoding_sql_char_unicode_issue_diagnosis(db_connection): try: # Create test table with both VARCHAR and NVARCHAR for comparison - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_unicode_issue ( id INT PRIMARY KEY, varchar_col VARCHAR(100), nvarchar_col NVARCHAR(100), encoding_used VARCHAR(50) ) - """) + """ + ) # Test Unicode strings that commonly cause issues test_strings = [ @@ -2679,9 +2701,11 @@ def test_encoding_decoding_sql_char_unicode_issue_diagnosis(db_connection): ) # Retrieve results - cursor.execute(""" + cursor.execute( + """ SELECT varchar_col, nvarchar_col FROM #test_unicode_issue WHERE id = 1 - """) + """ + ) result = cursor.fetchone() if result: @@ -2736,7 +2760,8 @@ def test_encoding_decoding_sql_char_best_practices_guide(db_connection): try: # Create test table demonstrating different column types - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_best_practices ( id INT PRIMARY KEY, -- ASCII-safe columns (VARCHAR with SQL_CHAR) @@ -2750,7 +2775,8 @@ def test_encoding_decoding_sql_char_best_practices_guide(db_connection): -- Mixed approach column safe_text VARCHAR(200) ) - """) + """ + ) # Configure optimal settings db_connection.setencoding(encoding="utf-8", ctype=SQL_CHAR) # For ASCII data @@ -4966,13 +4992,15 @@ def test_execute_executemany_encoding_consistency(db_connection): try: # Create test table that can handle both VARCHAR and NVARCHAR data - cursor.execute(""" + cursor.execute( + """ CREATE TABLE #test_encoding_consistency ( id INT IDENTITY(1,1) PRIMARY KEY, varchar_col VARCHAR(1000) COLLATE SQL_Latin1_General_CP1_CI_AS, nvarchar_col NVARCHAR(1000) ) - """) + """ + ) # Test data with various encoding challenges # Using ASCII-safe characters that work across different encodings @@ -5025,11 +5053,13 @@ def test_execute_executemany_encoding_consistency(db_connection): ) # Retrieve immediately to verify encoding worked - cursor.execute(""" + cursor.execute( + """ SELECT varchar_col, nvarchar_col FROM #test_encoding_consistency WHERE id = (SELECT MAX(id) FROM #test_encoding_consistency) - """) + """ + ) result = cursor.fetchone() execute_results.append((result[0], result[1])) @@ -5054,11 +5084,13 @@ def test_execute_executemany_encoding_consistency(db_connection): ) # Retrieve all results from executemany - cursor.execute(""" + cursor.execute( + """ SELECT varchar_col, nvarchar_col FROM #test_encoding_consistency ORDER BY id - """) + """ + ) executemany_results = cursor.fetchall() # Verify executemany results match execute results @@ -5095,11 +5127,13 @@ def test_execute_executemany_encoding_consistency(db_connection): test_string, ) - cursor.execute(""" + cursor.execute( + """ SELECT nvarchar_col FROM #test_encoding_consistency WHERE id = (SELECT MAX(id) FROM #test_encoding_consistency) - """) + """ + ) result = cursor.fetchone() unicode_execute_results.append(result[0]) @@ -5126,11 +5160,13 @@ def test_execute_executemany_encoding_consistency(db_connection): unicode_params, ) - cursor.execute(""" + cursor.execute( + """ SELECT nvarchar_col FROM #test_encoding_consistency ORDER BY id - """) + """ + ) unicode_executemany_results = cursor.fetchall() # Compare Unicode results @@ -7256,5 +7292,168 @@ def test_dae_encoding_large_string(db_connection): cursor.close() +# ==================================================================================== +# 11. VARCHAR CP1252 DECODING CONSISTENCY (23 tests) +# ==================================================================================== +# +# Verifies that VARCHAR columns using a CP1252 collation +# (SQL_Latin1_General_CP1_CI_AS) always return ``str`` — not ``bytes`` — +# regardless of platform or fetch method. +# +# Byte values that are valid in CP1252 but form invalid single-byte UTF-8 +# sequences (0x80-0x9F, 0xAD) historically caused a cross-platform +# inconsistency: +# - Linux/macOS : unixODBC converts CP1252 → UTF-8 → ``str``. +# - Windows : ODBC driver returned raw bytes; UTF-8 decode failed → +# ``bytes`` fallback. +# The fix (fetch VARCHAR as SQL_C_WCHAR on Windows) eliminates this. +# ==================================================================================== + +# All CP1252 byte values that are NOT valid as a single UTF-8 byte. +# Each tuple: (byte_value, expected_unicode_char, human_readable_description) +CP1252_PROBLEMATIC_BYTES = [ + (128, "\u20ac", "Euro sign"), + (130, "\u201a", "Single low-9 quotation mark"), + (131, "\u0192", "Latin small f with hook"), + (132, "\u201e", "Double low-9 quotation mark"), + (133, "\u2026", "Horizontal ellipsis"), + (140, "\u0152", "Latin capital OE"), + (142, "\u017d", "Latin capital Z with caron"), + (145, "\u2018", "Left single quotation mark"), + (146, "\u2019", "Right single quotation mark"), + (147, "\u201c", "Left double quotation mark"), + (148, "\u201d", "Right double quotation mark"), + (150, "\u2013", "En dash"), + (151, "\u2014", "Em dash"), + (152, "\u02dc", "Small tilde"), + (153, "\u2122", "Trade mark sign"), + (156, "\u0153", "Latin small oe"), + (158, "\u017e", "Latin small z with caron"), + (159, "\u0178", "Latin capital Y with diaeresis"), + (173, "\u00ad", "Soft hyphen"), +] + + +@pytest.mark.parametrize( + "byte_val,expected_char,description", + CP1252_PROBLEMATIC_BYTES, + ids=[f"{bv}-{desc}" for bv, _, desc in CP1252_PROBLEMATIC_BYTES], +) +def test_cp1252_varchar_byte_returns_str_fetchone( + db_connection, byte_val, expected_char, description +): + """Each problematic CP1252 byte in a VARCHAR column should decode to str via fetchone.""" + cursor = db_connection.cursor() + try: + cursor.execute( + f"CREATE TABLE #t_byte{byte_val} " + f"(val VARCHAR(10) COLLATE SQL_Latin1_General_CP1_CI_AS)" + ) + cursor.execute(f"INSERT INTO #t_byte{byte_val} VALUES (CHAR({byte_val}))") + db_connection.commit() + + cursor.execute(f"SELECT val FROM #t_byte{byte_val}") + row = cursor.fetchone() + assert row is not None + + value = row[0] + if value is None: + pytest.skip( + f"Server returned NULL for CHAR({byte_val}) — " f"collation may not map this byte" + ) + + assert isinstance(value, str), ( + f"Expected str for CHAR({byte_val}) ({description}) but got " + f"{type(value).__name__}: {value!r} (platform={sys.platform})." + ) + finally: + cursor.close() + + +@pytest.mark.parametrize("fetch_method", ["fetchall", "fetchmany"]) +def test_cp1252_varchar_byte173_batch_fetch(db_connection, fetch_method): + """Byte 0xAD must decode to str via fetchall and fetchmany (batch paths).""" + table = f"#t_b173_{fetch_method}" + cursor = db_connection.cursor() + try: + cursor.execute( + f"CREATE TABLE {table} " f"(val VARCHAR(10) COLLATE SQL_Latin1_General_CP1_CI_AS)" + ) + cursor.execute(f"INSERT INTO {table} VALUES (CHAR(173))") + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table}") + if fetch_method == "fetchall": + rows = cursor.fetchall() + else: + rows = cursor.fetchmany(1) + + assert len(rows) == 1 + value = rows[0][0] + assert isinstance(value, str), ( + f"{fetch_method}: expected str but got {type(value).__name__}: " + f"{value!r} (platform={sys.platform})." + ) + finally: + cursor.close() + + +def test_cp1252_varchar_byte173_embedded_in_string(db_connection): + """Byte 0xAD embedded within a longer ASCII string should decode to str.""" + cursor = db_connection.cursor() + try: + cursor.execute( + "CREATE TABLE #t_b173_embed " "(val VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS)" + ) + cursor.execute("INSERT INTO #t_b173_embed VALUES ('hello' + CHAR(173) + 'world')") + db_connection.commit() + + cursor.execute("SELECT val FROM #t_b173_embed") + row = cursor.fetchone() + assert row is not None + value = row[0] + + assert isinstance(value, str), ( + f"Expected str but got {type(value).__name__}: {value!r}. " + f"Embedded byte 0xAD was not decoded (platform={sys.platform})." + ) + assert "hello" in value and "world" in value + finally: + cursor.close() + + +def test_cp1252_varchar_explicit_decoding(db_connection): + """Byte 0xAD with explicit CP1252 decoding returns the correct character (control group).""" + cursor = db_connection.cursor() + original_decoding = db_connection.getdecoding(SQL_CHAR) + try: + cursor.execute( + "CREATE TABLE #t_cp1252_explicit " + "(id INT PRIMARY KEY, val VARCHAR(10) COLLATE SQL_Latin1_General_CP1_CI_AS)" + ) + cursor.execute("INSERT INTO #t_cp1252_explicit VALUES (1, CHAR(173))") + cursor.execute("INSERT INTO #t_cp1252_explicit VALUES (2, 'abc' + CHAR(173) + 'def')") + db_connection.commit() + + db_connection.setdecoding(SQL_CHAR, encoding="cp1252", ctype=SQL_CHAR) + + cursor.execute("SELECT val FROM #t_cp1252_explicit ORDER BY id") + rows = cursor.fetchall() + + assert len(rows) == 2 + assert isinstance(rows[0][0], str) + assert isinstance(rows[1][0], str) + # CP1252 byte 0xAD → U+00AD SOFT HYPHEN + assert rows[0][0] == "\u00ad" + assert rows[1][0] == "abc\u00addef" + finally: + db_connection.setdecoding( + SQL_CHAR, + encoding=original_decoding["encoding"], + ctype=original_decoding["ctype"], + ) + cursor.close() + + if __name__ == "__main__": pytest.main([__file__, "-v"]) From d7e894bbf287d0f761fe1ce7a2f724381b67f56d Mon Sep 17 00:00:00 2001 From: Subrata Paitandi Date: Thu, 19 Mar 2026 11:21:38 +0530 Subject: [PATCH 2/2] linting fix --- tests/test_013_encoding_decoding.py | 108 ++++++++++------------------ 1 file changed, 36 insertions(+), 72 deletions(-) diff --git a/tests/test_013_encoding_decoding.py b/tests/test_013_encoding_decoding.py index 3fe672a6..4ca4b297 100644 --- a/tests/test_013_encoding_decoding.py +++ b/tests/test_013_encoding_decoding.py @@ -1078,15 +1078,13 @@ def test_setdecoding_with_unicode_data(db_connection): try: # Create test table with NVARCHAR columns for Unicode support - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_decoding_unicode ( id INT IDENTITY(1,1), ascii_col VARCHAR(100), unicode_col NVARCHAR(100) ) - """ - ) + """) # Test ASCII strings in VARCHAR (safe) ascii_strings = [ @@ -1161,8 +1159,7 @@ def test_encoding_decoding_comprehensive_unicode_characters(db_connection): try: # Create test table with different column types - use NVARCHAR for better Unicode support - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_encoding_comprehensive ( id INT PRIMARY KEY, varchar_col VARCHAR(1000), @@ -1170,8 +1167,7 @@ def test_encoding_decoding_comprehensive_unicode_characters(db_connection): text_col TEXT, ntext_col NTEXT ) - """ - ) + """) # Test cases with different Unicode character categories test_cases = [ @@ -1333,8 +1329,7 @@ def test_encoding_decoding_edge_case_data_types(db_connection): try: # Create table with various data types - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_encoding_datatypes ( id INT PRIMARY KEY, varchar_small VARCHAR(50), @@ -1346,8 +1341,7 @@ def test_encoding_decoding_edge_case_data_types(db_connection): text_type TEXT, ntext_type NTEXT ) - """ - ) + """) # Test different encoding configurations test_configs = [ @@ -1639,16 +1633,14 @@ def test_encoding_decoding_large_dataset_performance(db_connection): cursor = db_connection.cursor() try: - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_large_encoding ( id INT PRIMARY KEY, ascii_data VARCHAR(1000), unicode_data NVARCHAR(1000), mixed_data NVARCHAR(MAX) ) - """ - ) + """) # Generate test data - ensure it fits in column sizes ascii_text = "This is ASCII text with numbers 12345." * 10 # ~400 chars @@ -1817,15 +1809,13 @@ def test_encoding_decoding_metadata_columns(db_connection): try: # Create table with Unicode column names if supported - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_metadata ( [normal_col] NVARCHAR(100), [column_with_unicode_测试] NVARCHAR(100), [special_chars_ñáéíóú] INT ) - """ - ) + """) # Test metadata decoding configuration db_connection.setdecoding(mssql_python.SQL_WMETADATA, encoding="utf-16le", ctype=SQL_WCHAR) @@ -1899,8 +1889,7 @@ def test_encoding_decoding_stress_test_comprehensive(db_connection): cursor = db_connection.cursor() try: - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #stress_test_encoding ( id INT IDENTITY(1,1) PRIMARY KEY, ascii_text VARCHAR(500), @@ -1908,8 +1897,7 @@ def test_encoding_decoding_stress_test_comprehensive(db_connection): binary_data VARBINARY(500), mixed_content NVARCHAR(MAX) ) - """ - ) + """) # Generate diverse test data test_datasets = [] @@ -2030,15 +2018,13 @@ def test_encoding_decoding_sql_char_various_encodings(db_connection): try: # Create test table with VARCHAR columns (SQL_CHAR type) - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_sql_char_encodings ( id INT PRIMARY KEY, data_col VARCHAR(100), description VARCHAR(200) ) - """ - ) + """) # Define various encoding types to test with SQL_CHAR encoding_tests = [ @@ -2315,15 +2301,13 @@ def test_encoding_decoding_sql_char_with_unicode_fallback(db_connection): try: # Create test table with both VARCHAR and NVARCHAR - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_unicode_fallback ( id INT PRIMARY KEY, varchar_data VARCHAR(100), nvarchar_data NVARCHAR(100) ) - """ - ) + """) # Test Unicode data unicode_test_cases = [ @@ -2394,15 +2378,13 @@ def test_encoding_decoding_sql_char_native_character_sets(db_connection): try: # Create test table - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_native_chars ( id INT PRIMARY KEY, data VARCHAR(200), encoding_used VARCHAR(50) ) - """ - ) + """) # Test encoding-specific character sets that should work encoding_native_tests = [ @@ -2537,15 +2519,13 @@ def test_encoding_decoding_sql_char_boundary_encoding_cases(db_connection): try: # Create test table - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_encoding_boundaries ( id INT PRIMARY KEY, test_data VARCHAR(500), test_type VARCHAR(100) ) - """ - ) + """) # Test boundary cases for different encodings boundary_tests = [ @@ -2646,16 +2626,14 @@ def test_encoding_decoding_sql_char_unicode_issue_diagnosis(db_connection): try: # Create test table with both VARCHAR and NVARCHAR for comparison - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_unicode_issue ( id INT PRIMARY KEY, varchar_col VARCHAR(100), nvarchar_col NVARCHAR(100), encoding_used VARCHAR(50) ) - """ - ) + """) # Test Unicode strings that commonly cause issues test_strings = [ @@ -2701,11 +2679,9 @@ def test_encoding_decoding_sql_char_unicode_issue_diagnosis(db_connection): ) # Retrieve results - cursor.execute( - """ + cursor.execute(""" SELECT varchar_col, nvarchar_col FROM #test_unicode_issue WHERE id = 1 - """ - ) + """) result = cursor.fetchone() if result: @@ -2760,8 +2736,7 @@ def test_encoding_decoding_sql_char_best_practices_guide(db_connection): try: # Create test table demonstrating different column types - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_best_practices ( id INT PRIMARY KEY, -- ASCII-safe columns (VARCHAR with SQL_CHAR) @@ -2775,8 +2750,7 @@ def test_encoding_decoding_sql_char_best_practices_guide(db_connection): -- Mixed approach column safe_text VARCHAR(200) ) - """ - ) + """) # Configure optimal settings db_connection.setencoding(encoding="utf-8", ctype=SQL_CHAR) # For ASCII data @@ -4992,15 +4966,13 @@ def test_execute_executemany_encoding_consistency(db_connection): try: # Create test table that can handle both VARCHAR and NVARCHAR data - cursor.execute( - """ + cursor.execute(""" CREATE TABLE #test_encoding_consistency ( id INT IDENTITY(1,1) PRIMARY KEY, varchar_col VARCHAR(1000) COLLATE SQL_Latin1_General_CP1_CI_AS, nvarchar_col NVARCHAR(1000) ) - """ - ) + """) # Test data with various encoding challenges # Using ASCII-safe characters that work across different encodings @@ -5053,13 +5025,11 @@ def test_execute_executemany_encoding_consistency(db_connection): ) # Retrieve immediately to verify encoding worked - cursor.execute( - """ + cursor.execute(""" SELECT varchar_col, nvarchar_col FROM #test_encoding_consistency WHERE id = (SELECT MAX(id) FROM #test_encoding_consistency) - """ - ) + """) result = cursor.fetchone() execute_results.append((result[0], result[1])) @@ -5084,13 +5054,11 @@ def test_execute_executemany_encoding_consistency(db_connection): ) # Retrieve all results from executemany - cursor.execute( - """ + cursor.execute(""" SELECT varchar_col, nvarchar_col FROM #test_encoding_consistency ORDER BY id - """ - ) + """) executemany_results = cursor.fetchall() # Verify executemany results match execute results @@ -5127,13 +5095,11 @@ def test_execute_executemany_encoding_consistency(db_connection): test_string, ) - cursor.execute( - """ + cursor.execute(""" SELECT nvarchar_col FROM #test_encoding_consistency WHERE id = (SELECT MAX(id) FROM #test_encoding_consistency) - """ - ) + """) result = cursor.fetchone() unicode_execute_results.append(result[0]) @@ -5160,13 +5126,11 @@ def test_execute_executemany_encoding_consistency(db_connection): unicode_params, ) - cursor.execute( - """ + cursor.execute(""" SELECT nvarchar_col FROM #test_encoding_consistency ORDER BY id - """ - ) + """) unicode_executemany_results = cursor.fetchall() # Compare Unicode results