From 0f77d0b30c267c880f5a5ddd4f00a543730c47e4 Mon Sep 17 00:00:00 2001 From: River-Walras Date: Thu, 4 Jun 2026 19:38:01 +0800 Subject: [PATCH 1/4] Add client-level HTTP headers Need: clients behind Cloudflare Access or similar HTTP gateways must send authentication headers on every request, including the initialization queries that run during client creation. Design: expose an explicit headers keyword on sync and async factories instead of reusing transport_settings, because these values are actual HTTP headers and transport_settings already has overloaded request-level meaning. Merge after driver defaults so users can intentionally override headers like Authorization or User-Agent, while per-request transport_settings still wins for one-off requests. Tests: cover sync and async header merge behavior plus client_factory construction with Cloudflare-style headers. --- clickhouse_connect/driver/__init__.py | 30 ++++++- clickhouse_connect/driver/asyncclient.py | 3 + clickhouse_connect/driver/httpclient.py | 3 + tests/integration_tests/test_client.py | 13 +++ .../unit_tests/test_driver/test_httpclient.py | 90 ++++++++++++++++++- 5 files changed, 135 insertions(+), 4 deletions(-) diff --git a/clickhouse_connect/driver/__init__.py b/clickhouse_connect/driver/__init__.py index d0fe9f1d..fbe54ba2 100644 --- a/clickhouse_connect/driver/__init__.py +++ b/clickhouse_connect/driver/__init__.py @@ -93,6 +93,7 @@ def create_client( secure: bool | str = False, dsn: str | None = None, settings: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, generic_args: dict[str, Any] | None = None, **kwargs, ) -> Client: @@ -115,6 +116,9 @@ def create_client( :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user) will be extracted from this string if not set otherwise. :param settings: ClickHouse server settings to be used with the session/every request + :param headers: Additional HTTP headers to send with every request. This can be used for proxy or gateway + authentication, such as Cloudflare Access service token headers. These headers are applied after driver defaults, + so they can intentionally override headers such as Authorization, User-Agent, or Accept-Encoding. :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. It is not recommended to use this parameter externally. @@ -169,7 +173,9 @@ def create_client( if generic_args: client_params = signature(HttpClient).parameters for name, value in generic_args.items(): - if name in client_params: + if name == "headers": + headers = value + elif name in client_params: kwargs[name] = value elif name == "compression": if "compress" not in kwargs: @@ -178,7 +184,18 @@ def create_client( if name.startswith("ch_"): name = name[3:] settings[name] = value - return HttpClient(interface, host, port, username, password, database, access_token, settings=settings, **kwargs) + return HttpClient( + interface, + host, + port, + username, + password, + database, + access_token, + settings=settings, + headers=headers, + **kwargs, + ) raise ProgrammingError(f"Unrecognized client type {interface}") @@ -194,6 +211,7 @@ async def create_async_client( secure: bool | str = False, dsn: str | None = None, settings: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, generic_args: dict[str, Any] | None = None, connector_limit: int = 100, connector_limit_per_host: int = 20, @@ -221,6 +239,9 @@ async def create_async_client( :param dsn: A string in standard DSN (Data Source Name) format. Other connection values (such as host or user) will be extracted from this string if not set otherwise. :param settings: ClickHouse server settings to be used with the session/every request + :param headers: Additional HTTP headers to send with every request. This can be used for proxy or gateway + authentication, such as Cloudflare Access service token headers. These headers are applied after driver defaults, + so they can intentionally override headers such as Authorization, User-Agent, or Accept-Encoding. :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. It is not recommended to use this parameter externally :param connector_limit: Maximum number of allowable connections to the server @@ -286,7 +307,9 @@ async def create_async_client( if generic_args: client_params = signature(_AsyncClient).parameters for name, value in generic_args.items(): - if name in client_params: + if name == "headers": + headers = value + elif name in client_params: kwargs[name] = value elif name == "compression": if "compress" not in kwargs: @@ -308,6 +331,7 @@ async def create_async_client( database=database, access_token=access_token, settings=settings, + headers=headers, connector_limit=connector_limit, connector_limit_per_host=connector_limit_per_host, keepalive_timeout=keepalive_timeout, diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index 1cadab3e..9d9fafed 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -224,6 +224,7 @@ def __init__( autogenerate_query_id: bool | None = None, form_encode_query_params: bool = False, rename_response_column: str | None = None, + headers: dict[str, str] | None = None, ): """ Async HTTP Client using aiohttp. Initialization is handled via _initialize(). @@ -329,6 +330,8 @@ def __init__( self._reported_libs = set() self._last_pool_reset = None self.headers["User-Agent"] = self.headers["User-Agent"].replace("mode:sync;", "mode:async;") + if headers: + self.headers.update(headers) # Store aiohttp-specific params for deferred initialization self._compress_param = compress diff --git a/clickhouse_connect/driver/httpclient.py b/clickhouse_connect/driver/httpclient.py index 8040cd27..81a80592 100644 --- a/clickhouse_connect/driver/httpclient.py +++ b/clickhouse_connect/driver/httpclient.py @@ -104,6 +104,7 @@ def __init__( proxy_path: str = "", form_encode_query_params: bool = False, rename_response_column: str | None = None, + headers: dict[str, str] | None = None, ): """ Create an HTTP ClickHouse Connect client @@ -157,6 +158,8 @@ def __init__( self._reported_libs = set() self.headers["User-Agent"] = common.build_client_name(client_name) + if headers: + self.headers.update(headers) self._read_format = self._write_format = "Native" self._transform = NativeTransform() diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index de22f95a..4999bce4 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -58,6 +58,19 @@ def test_transport_settings(param_client, call): assert len(result.result_set) > 0 +def test_client_headers(client_factory, call): + client = client_factory( + headers={ + "CF-Access-Client-Id": "test_client_id", + "CF-Access-Client-Secret": "test_client_secret", + } + ) + + assert client.headers["CF-Access-Client-Id"] == "test_client_id" + assert client.headers["CF-Access-Client-Secret"] == "test_client_secret" + assert call(client.command, "SELECT 79") == 79 + + def test_none_database(param_client, call): old_db = param_client.database test_db = call(param_client.command, "select currentDatabase()") diff --git a/tests/unit_tests/test_driver/test_httpclient.py b/tests/unit_tests/test_driver/test_httpclient.py index 90e1c8c9..549fcd96 100644 --- a/tests/unit_tests/test_driver/test_httpclient.py +++ b/tests/unit_tests/test_driver/test_httpclient.py @@ -1,9 +1,11 @@ import logging from typing import Any -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest +from clickhouse_connect.driver.asyncclient import AsyncClient +from clickhouse_connect.driver.client import Client from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError from clickhouse_connect.driver.external import ExternalData from clickhouse_connect.driver.httpclient import HttpClient, ex_header @@ -21,6 +23,92 @@ def create_mock_response(status=500, headers=None, data=None): return response +class TestHttpClientHeaders: + """Test client-level HTTP header configuration.""" + + def test_headers_are_available_during_initialization(self): + init_headers = {} + + def capture_headers(client, _tz_source): + init_headers.update(client.headers) + + with patch.object(Client, "_init_common_settings", autospec=True, side_effect=capture_headers): + HttpClient( + interface="http", + host="localhost", + port=8123, + username="default", + password="", + database="default", + headers={ + "CF-Access-Client-Id": "test_client_id", + "CF-Access-Client-Secret": "test_client_secret", + }, + ) + + assert init_headers["CF-Access-Client-Id"] == "test_client_id" + assert init_headers["CF-Access-Client-Secret"] == "test_client_secret" + assert "Authorization" in init_headers + assert "User-Agent" in init_headers + + def test_request_headers_override_client_headers(self): + response = create_mock_response(status=200) + pool_mgr = Mock() + pool_mgr.request.return_value = response + + with patch.object(Client, "_init_common_settings", autospec=True): + client = HttpClient( + interface="http", + host="localhost", + port=8123, + username="default", + password="", + database="default", + pool_mgr=pool_mgr, + headers={"X-Trace": "client", "X-Gateway": "cloudflare"}, + ) + + client._raw_request(b"", {}, headers={"X-Trace": "request"}) + + request_headers = pool_mgr.request.call_args.kwargs["headers"] + assert request_headers["X-Trace"] == "request" + assert request_headers["X-Gateway"] == "cloudflare" + assert request_headers["Authorization"] == client.headers["Authorization"] + assert request_headers["User-Agent"] == client.headers["User-Agent"] + + +class TestAsyncClientHeaders: + """Test async client-level HTTP header configuration.""" + + @pytest.mark.asyncio + async def test_request_headers_override_client_headers(self): + client = AsyncClient( + interface="http", + host="localhost", + port=8123, + username="default", + password="", + database="default", + headers={"X-Trace": "client", "X-Gateway": "cloudflare"}, + ) + session = Mock() + session.closed = False + response = Mock() + response.status = 200 + response.headers = {} + session.request = AsyncMock(return_value=response) + client._session = session + + await client._raw_request(None, {}, headers={"X-Trace": "request"}) + + request_headers = session.request.call_args.kwargs["headers"] + assert request_headers["X-Trace"] == "request" + assert request_headers["X-Gateway"] == "cloudflare" + assert request_headers["Authorization"] == client.headers["Authorization"] + assert request_headers["User-Agent"] == client.headers["User-Agent"] + assert request_headers["Accept-Encoding"] == client.headers["Accept-Encoding"] + + class TestHttpClientErrorHandler: """Test the error handling functionality of HttpClient""" From 7f3353f156ceed0c2cf29e55d644f1a2e4f33829 Mon Sep 17 00:00:00 2001 From: River-Walras Date: Fri, 5 Jun 2026 22:29:01 +0800 Subject: [PATCH 2/4] Add header validation and parsing for client creation --- clickhouse_connect/driver/__init__.py | 24 +++++++++++-- .../unit_tests/test_driver/test_httpclient.py | 36 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/clickhouse_connect/driver/__init__.py b/clickhouse_connect/driver/__init__.py index fbe54ba2..6426bc34 100644 --- a/clickhouse_connect/driver/__init__.py +++ b/clickhouse_connect/driver/__init__.py @@ -81,6 +81,20 @@ def _validate_access_token(access_token: str | None, username: str | None, passw raise ProgrammingError("Cannot use both access_token and username/password") +def _pop_headers_arg(headers: Any | None, kwargs: dict[str, Any]) -> Any | None: + """Hoist headers parsed through generic kwargs while preserving explicit headers.""" + if "headers" in kwargs: + kwargs_headers = kwargs.pop("headers") + if headers is None: + headers = kwargs_headers + return headers + + +def _validate_headers(headers: Any | None) -> None: + if headers is not None and not isinstance(headers, dict): + raise ProgrammingError("headers must be a dictionary of HTTP header names and values") + + def create_client( *, host: str | None = None, @@ -166,6 +180,7 @@ def create_client( host, username, password, port, database, interface = _parse_connection_params( host, username, password, port, database, interface, secure, dsn, kwargs ) + headers = _pop_headers_arg(headers, kwargs) _validate_access_token(access_token, username, password) settings = settings or {} @@ -174,7 +189,8 @@ def create_client( client_params = signature(HttpClient).parameters for name, value in generic_args.items(): if name == "headers": - headers = value + if headers is None: + headers = value elif name in client_params: kwargs[name] = value elif name == "compression": @@ -184,6 +200,7 @@ def create_client( if name.startswith("ch_"): name = name[3:] settings[name] = value + _validate_headers(headers) return HttpClient( interface, host, @@ -301,6 +318,7 @@ async def create_async_client( host, username, password, port, database, interface = _parse_connection_params( host, username, password, port, database, interface, secure, dsn, kwargs ) + headers = _pop_headers_arg(headers, kwargs) _validate_access_token(access_token, username, password) settings = settings or {} @@ -308,7 +326,8 @@ async def create_async_client( client_params = signature(_AsyncClient).parameters for name, value in generic_args.items(): if name == "headers": - headers = value + if headers is None: + headers = value elif name in client_params: kwargs[name] = value elif name == "compression": @@ -322,6 +341,7 @@ async def create_async_client( if "autogenerate_session_id" not in kwargs: kwargs["autogenerate_session_id"] = False + _validate_headers(headers) client = _AsyncClient( interface=interface, host=host, diff --git a/tests/unit_tests/test_driver/test_httpclient.py b/tests/unit_tests/test_driver/test_httpclient.py index 549fcd96..911c431a 100644 --- a/tests/unit_tests/test_driver/test_httpclient.py +++ b/tests/unit_tests/test_driver/test_httpclient.py @@ -4,9 +4,10 @@ import pytest +from clickhouse_connect.driver import create_async_client, create_client from clickhouse_connect.driver.asyncclient import AsyncClient from clickhouse_connect.driver.client import Client -from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError +from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError, ProgrammingError from clickhouse_connect.driver.external import ExternalData from clickhouse_connect.driver.httpclient import HttpClient, ex_header from clickhouse_connect.driver.query import QueryContext @@ -76,6 +77,24 @@ def test_request_headers_override_client_headers(self): assert request_headers["Authorization"] == client.headers["Authorization"] assert request_headers["User-Agent"] == client.headers["User-Agent"] + def test_dsn_headers_query_param_must_be_dict(self): + with pytest.raises(ProgrammingError, match="headers must be a dictionary"): + create_client(dsn="http://localhost:8123/default?headers=not_a_dict") + + def test_explicit_headers_override_dsn_headers_query_param(self): + init_headers = {} + + def capture_headers(client, _tz_source): + init_headers.update(client.headers) + + with patch.object(Client, "_init_common_settings", autospec=True, side_effect=capture_headers): + create_client( + dsn="http://localhost:8123/default?headers=not_a_dict", + headers={"X-Gateway": "cloudflare"}, + ) + + assert init_headers["X-Gateway"] == "cloudflare" + class TestAsyncClientHeaders: """Test async client-level HTTP header configuration.""" @@ -108,6 +127,21 @@ async def test_request_headers_override_client_headers(self): assert request_headers["User-Agent"] == client.headers["User-Agent"] assert request_headers["Accept-Encoding"] == client.headers["Accept-Encoding"] + @pytest.mark.asyncio + async def test_dsn_headers_query_param_must_be_dict(self): + with pytest.raises(ProgrammingError, match="headers must be a dictionary"): + await create_async_client(dsn="http://localhost:8123/default?headers=not_a_dict") + + @pytest.mark.asyncio + async def test_explicit_headers_override_dsn_headers_query_param(self): + with patch.object(AsyncClient, "_initialize", new=AsyncMock()): + client = await create_async_client( + dsn="http://localhost:8123/default?headers=not_a_dict", + headers={"X-Gateway": "cloudflare"}, + ) + + assert client.headers["X-Gateway"] == "cloudflare" + class TestHttpClientErrorHandler: """Test the error handling functionality of HttpClient""" From 5eee390651cec8b97d7c850f8d40b71522824a5c Mon Sep 17 00:00:00 2001 From: River-Walras Date: Sat, 6 Jun 2026 10:00:55 +0800 Subject: [PATCH 3/4] Fix ping client headers --- clickhouse_connect/driver/__init__.py | 4 +-- clickhouse_connect/driver/httpclient.py | 7 ++++- .../unit_tests/test_driver/test_httpclient.py | 27 +++++++++++++++++++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/clickhouse_connect/driver/__init__.py b/clickhouse_connect/driver/__init__.py index 6426bc34..5c284ff6 100644 --- a/clickhouse_connect/driver/__init__.py +++ b/clickhouse_connect/driver/__init__.py @@ -132,7 +132,7 @@ def create_client( :param settings: ClickHouse server settings to be used with the session/every request :param headers: Additional HTTP headers to send with every request. This can be used for proxy or gateway authentication, such as Cloudflare Access service token headers. These headers are applied after driver defaults, - so they can intentionally override headers such as Authorization, User-Agent, or Accept-Encoding. + so they can intentionally override headers such as Authorization or User-Agent. :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. It is not recommended to use this parameter externally. @@ -258,7 +258,7 @@ async def create_async_client( :param settings: ClickHouse server settings to be used with the session/every request :param headers: Additional HTTP headers to send with every request. This can be used for proxy or gateway authentication, such as Cloudflare Access service token headers. These headers are applied after driver defaults, - so they can intentionally override headers such as Authorization, User-Agent, or Accept-Encoding. + so they can intentionally override headers such as Authorization or User-Agent. :param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings. It is not recommended to use this parameter externally :param connector_limit: Maximum number of allowable connections to the server diff --git a/clickhouse_connect/driver/httpclient.py b/clickhouse_connect/driver/httpclient.py index 81a80592..6d59d8b1 100644 --- a/clickhouse_connect/driver/httpclient.py +++ b/clickhouse_connect/driver/httpclient.py @@ -742,7 +742,12 @@ def ping(self) -> bool: See BaseClient doc_string for this method """ try: - response = self.http.request("GET", f"{self.url}/ping", timeout=3, preload_content=True) + headers = dict_copy(self.headers) + kwargs = {"headers": headers, "timeout": 3, "preload_content": True} + if self.server_host_name: + kwargs["assert_same_host"] = False + headers["Host"] = self.server_host_name + response = self.http.request("GET", f"{self.url}/ping", **kwargs) return 200 <= response.status < 300 except HTTPError: logger.debug("ping failed", exc_info=True) diff --git a/tests/unit_tests/test_driver/test_httpclient.py b/tests/unit_tests/test_driver/test_httpclient.py index 911c431a..b2c50dad 100644 --- a/tests/unit_tests/test_driver/test_httpclient.py +++ b/tests/unit_tests/test_driver/test_httpclient.py @@ -77,6 +77,33 @@ def test_request_headers_override_client_headers(self): assert request_headers["Authorization"] == client.headers["Authorization"] assert request_headers["User-Agent"] == client.headers["User-Agent"] + def test_ping_uses_client_headers(self): + response = create_mock_response(status=200) + pool_mgr = Mock() + pool_mgr.request.return_value = response + + with patch.object(Client, "_init_common_settings", autospec=True): + client = HttpClient( + interface="http", + host="localhost", + port=8123, + username="default", + password="", + database="default", + pool_mgr=pool_mgr, + server_host_name="clickhouse.example.com", + headers={"X-Gateway": "cloudflare"}, + ) + + assert client.ping() is True + + request_headers = pool_mgr.request.call_args.kwargs["headers"] + assert request_headers["X-Gateway"] == "cloudflare" + assert request_headers["Authorization"] == client.headers["Authorization"] + assert request_headers["User-Agent"] == client.headers["User-Agent"] + assert request_headers["Host"] == "clickhouse.example.com" + assert pool_mgr.request.call_args.kwargs["assert_same_host"] is False + def test_dsn_headers_query_param_must_be_dict(self): with pytest.raises(ProgrammingError, match="headers must be a dictionary"): create_client(dsn="http://localhost:8123/default?headers=not_a_dict") From 9333bdef964a805893c641bdc5099c7e85e103a8 Mon Sep 17 00:00:00 2001 From: Joe S Date: Mon, 8 Jun 2026 15:09:52 -0700 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f4ed829..dc787784 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Improvements - SQLAlchemy: opt-in server-side bind parameters via `create_engine(url, server_side_params=True)`. The dialect then emits ClickHouse native `{name:Type}` / `{name:Array(Type)}` placeholders instead of client-side string interpolation. Off by default. Closes [#735](https://github.com/ClickHouse/clickhouse-connect/issues/735). - Added a `token_provider` client option (sync and async). It accepts a callable returning an access token string; the callable is invoked once for the initial token and again to fetch a fresh token whenever the server rejects the current one (authentication failure), retrying the request once. Mutually exclusive with `access_token` and `username`/`password`. +- Added a `headers` option to `create_client`/`create_async_client` for attaching custom HTTP headers to every request, including the initialization queries sent during client creation. Useful for HTTP gateways that require auth headers such as Cloudflare Access service tokens. ### Bug Fixes - A `datetime` bound to a server-side `{name:DateTime64(...)}` placeholder now keeps its sub-second precision instead of being truncated to seconds. The declared parameter type drives this, so no `_64` name suffix or manual `DT64Param` wrapper is needed, and it applies through `Array` and `Tuple` hints. Plain `DateTime` binds are unchanged. Closes [#739](https://github.com/ClickHouse/clickhouse-connect/issues/739).