Skip to content
264 changes: 203 additions & 61 deletions tests/test_009_pooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,50 @@
"""

import pytest
import os
import re
import subprocess
import sys
import textwrap
import time
import threading


def _run_in_subprocess(body: str, conn_str: str) -> None:
"""Run a test body in a fresh Python process.

Some tests need to be the *first* to call ``pooling(...)`` in the
process (the C++ ``enable_pooling`` is wrapped in ``std::call_once``
so only the first call's max_size/idle_timeout take effect). Running
them in a subprocess gives each a clean process state.

The subprocess inherits the current ``DB_CONNECTION_STRING`` env var
so the worker uses the same database. ``body`` must be a self-contained
Python snippet that exits non-zero on failure (any uncaught assertion
is fine).
"""
env = os.environ.copy()
env["DB_CONNECTION_STRING"] = conn_str
proc = subprocess.run(
[sys.executable, "-c", textwrap.dedent(body)],
env=env,
capture_output=True,
text=True,
timeout=120,
)
# Sentinel exit code 77 means the subprocess decided to skip
# (e.g. the test prerequisite is unmet on this server, like missing
# KILL permission). The reason is printed to stderr.
if proc.returncode == 77:
pytest.skip(proc.stderr.strip() or "Subprocess requested skip")
if proc.returncode != 0:
pytest.fail(
"Subprocess test body failed\n"
f"--- stdout ---\n{proc.stdout}\n"
f"--- stderr ---\n{proc.stderr}"
)


import statistics
from mssql_python import connect, pooling
from mssql_python.pooling import PoolingManager
Expand Down Expand Up @@ -314,82 +356,182 @@ def test_pool_release_overflow_disconnects_outside_mutex(conn_str):
conn3.close()


@pytest.mark.skip("Flaky test - idle timeout behavior needs investigation")
def test_pool_idle_timeout_removes_connections(conn_str):
"""Test that idle_timeout removes connections from the pool after the timeout."""
pooling(max_size=2, idle_timeout=1)
conn1 = connect(conn_str)
spid_list = []
cursor1 = conn1.cursor()
cursor1.execute("SELECT @@SPID")
spid1 = cursor1.fetchone()[0]
spid_list.append(spid1)
conn1.close()

# Wait for longer than idle_timeout
time.sleep(3)

# Get a new connection, which should not reuse the previous SPID
conn2 = connect(conn_str)
cursor2 = conn2.cursor()
cursor2.execute("SELECT @@SPID")
spid2 = cursor2.fetchone()[0]
spid_list.append(spid2)
conn2.close()

assert spid1 != spid2, "Idle timeout did not remove connection from pool"
"""Test that idle_timeout removes connections from the pool after the timeout.

Run in a subprocess so this test's pooling(idle_timeout=1) is the
first call in the process — the C++ ``enable_pooling`` is wrapped in
``std::call_once``, so only the first call's settings take effect for
the lifetime of the process.

A bare SPID-inequality assertion is unreliable: SQL Server is free to
reassign a recently-freed SPID to the next session. So we identify a
session by the (SPID, login_time) tuple from sys.dm_exec_sessions —
login_time has millisecond resolution and is unique per physical
connection.
"""
_run_in_subprocess(
"""
import os, time
from mssql_python import connect, pooling

conn_str = os.environ["DB_CONNECTION_STRING"]
pooling(max_size=2, idle_timeout=1)

def session_identity(conn):
cur = conn.cursor()
cur.execute(
"SELECT @@SPID, "
" (SELECT login_time FROM sys.dm_exec_sessions "
" WHERE session_id = @@SPID)"
)
spid, login_time = cur.fetchone()
return (spid, login_time)

c1 = connect(conn_str)
id1 = session_identity(c1)
c1.close()

time.sleep(3)

c2 = connect(conn_str)
id2 = session_identity(c2)
c2.close()

assert id1 != id2, (
f"Idle timeout did not remove connection from pool: "
f"got the same session both times {id1}"
)
""",
conn_str,
)


# =============================================================================
# Error Handling and Recovery Tests
# =============================================================================


@pytest.mark.skip(
"Test causes fatal crash - forcibly closing underlying connection leads to undefined behavior"
)
def test_pool_removes_invalid_connections(conn_str):
"""Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
pooling(max_size=1, idle_timeout=30)
conn = connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT 1")
# Simulate invalidation by forcibly closing the connection at the driver level
try:
# Try to access a private attribute or method to forcibly close the underlying connection
# This is implementation-specific; if not possible, skip
if hasattr(conn, "_conn") and hasattr(conn._conn, "close"):
conn._conn.close()
else:
pytest.skip("Cannot forcibly close underlying connection for this driver")
except Exception:
pass
# Safely close the connection, ignoring errors due to forced invalidation
try:
conn.close()
except RuntimeError as e:
if "not initialized" not in str(e):
"""Pool must replace a pooled connection whose server-side session has died.

Run in a subprocess so this test does not pollute the in-process pool
state for sibling tests (KILL leaves dead pool entries that survive
Python-side teardown because the C++ pool config is locked in for the
lifetime of the process via ``std::call_once``).

Simulates the realistic failure mode (DBA KILL, failover, server-side
idle timeout) by:
1. Opening two connections concurrently (distinct physical sessions)
in autocommit mode.
2. Using one to KILL the other's server-side session out-of-band.
3. Returning both to the pool.
4. Re-acquiring repeatedly: every connection must work and the
killed SPID must never reappear.

Only public APIs are used.
"""
_run_in_subprocess(
"""
import os
import time
from mssql_python import connect, pooling

conn_str = os.environ["DB_CONNECTION_STRING"]
pooling(max_size=2, idle_timeout=30)

def session_identity(conn):
cur = conn.cursor()
cur.execute(
"SELECT @@SPID, "
" (SELECT login_time FROM sys.dm_exec_sessions "
" WHERE session_id = @@SPID)"
)
spid, login_time = cur.fetchone()
return (spid, login_time)

# Step 1: two distinct, autocommit connections. Autocommit avoids
# the implicit rollback in Connection.close(), which would
# otherwise fail on the killed session and leak its pool slot.
victim = connect(conn_str)
admin = connect(conn_str)
victim.autocommit = True
admin.autocommit = True

victim_id = session_identity(victim)
admin_id = session_identity(admin)
assert victim_id != admin_id, (
"Pool handed out the same physical session to two concurrent "
"acquires"
)
victim_spid = victim_id[0]

# Step 2: admin KILLs the victim's session. Requires server
# permission (ALTER ANY CONNECTION or sysadmin); on hosted/CI
# databases the test login often lacks it, so skip gracefully.
try:
admin.cursor().execute(f"KILL {victim_spid}")
except Exception as e:
msg = str(e)
if "permission" in msg.lower() or "KILL" in msg:
import sys as _sys
print(
f"Skipping: KILL not permitted for this login: {msg}",
file=_sys.stderr,
)
victim.close()
admin.close()
_sys.exit(77)
raise
# Now, get a new connection from the pool and ensure it works
new_conn = connect(conn_str)
new_cursor = new_conn.cursor()
try:
new_cursor.execute("SELECT 1")
result = new_cursor.fetchone()
assert result is not None and result[0] == 1, "Pool did not remove invalid connection"
finally:
new_conn.close()

# KILL is processed asynchronously on the server, but we don't
# need to wait for it here. The test's correctness contract is
# "the killed (SPID, login_time) must never reappear in
# subsequent acquires." Any session that gets handed back
# later — whether the same SPID reused by the server or a
# transparently-reconnected one — necessarily has a different
# login_time, so the identity check below catches the only
# failure mode that matters.

# Step 3: return both to the pool.
victim.close()
admin.close()

# Step 4: re-acquire from the pool. Each must be working; the
# killed *physical session* (SPID, login_time) must never come
# back. SQL Server is free to reassign the SPID number to a new
# session, so SPID alone is not a reliable identity.
seen_ids = set()
for _ in range(4):
c = connect(conn_str)
try:
seen_ids.add(session_identity(c))
assert c.cursor().execute("SELECT 1").fetchone()[0] == 1, (
"Pool handed out an unusable connection"
)
finally:
c.close()
assert victim_id not in seen_ids, (
f"Pool returned the killed session {victim_id}; "
f"saw sessions {seen_ids}"
)
""",
conn_str,
)


def test_pool_recovery_after_failed_connection(conn_str):
"""Test that the pool recovers after a failed connection attempt."""
pooling(max_size=1, idle_timeout=30)
# First, try to connect with a bad password (should fail)
if "Pwd=" in conn_str:
bad_conn_str = conn_str.replace("Pwd=", "Pwd=wrongpassword")
elif "Password=" in conn_str:
bad_conn_str = conn_str.replace("Password=", "Password=wrongpassword")
else:
# First, try to connect with a bad password (should fail).
# Match the password keyword case-insensitively since ODBC accepts any case.
bad_conn_str = re.sub(
r"(?i)(\b(?:pwd|password)\s*=)([^;]*)",
r"\1wrongpassword",
conn_str,
count=1,
)
if bad_conn_str == conn_str:
pytest.skip("No password found in connection string to modify")
with pytest.raises(Exception):
connect(bad_conn_str)
Expand Down
Loading