Skip to content

Commit 086daa4

Browse files
committed
feat: add support for psycopg
Psycopg does not support providing a pre-connected socket in its connect API as there is no matching API in the underlying libpq C interface. To work around this limitation, this commit creates an in-memory proxy that copies data between a Unix domain socket and the secure SSL Socket created by the Connector. This approach works, but suffers from poor performance and makes the driver comparable to pg8000 (a pure Python implementation). The best solution would be for psycopg (and libpq) to expose a way to provide a socket creator function as we have in the other drivers. Until then, this is better than nothing. Fixes #377.
1 parent a07e0c3 commit 086daa4

13 files changed

Lines changed: 888 additions & 8 deletions

README.md

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ Python applications. It provides:
3232

3333
[iam-db-authn]: https://cloud.google.com/alloydb/docs/manage-iam-authn
3434

35-
**Supported drivers:** [`pg8000`](https://codeberg.org/tlocke/pg8000) (sync) · [`asyncpg`](https://magicstack.github.io/asyncpg) (async)
35+
**Supported drivers:** [`pg8000`][pg8000] (sync) · [`psycopg`][psycopg] (sync) · [`asyncpg`][asyncpg] (async)
36+
37+
[pg8000]: https://codeberg.org/tlocke/pg8000
38+
[psycopg]: https://www.psycopg.org/
39+
[asyncpg]: https://magicstack.github.io/asyncpg
3640

3741
## Quickstart
3842

@@ -67,6 +71,37 @@ with Connector() as connector:
6771
print(result)
6872
```
6973

74+
### Sync (psycopg + SQLAlchemy)
75+
76+
**Install:**
77+
```sh
78+
pip install "google-cloud-alloydb-connector[psycopg]" sqlalchemy
79+
```
80+
81+
**Connect:**
82+
```python
83+
import sqlalchemy
84+
from google.cloud.alloydbconnector import Connector
85+
86+
INSTANCE_URI = "projects/MY_PROJECT/locations/MY_REGION/clusters/MY_CLUSTER/instances/MY_INSTANCE"
87+
88+
with Connector() as connector:
89+
pool = sqlalchemy.create_engine(
90+
"postgresql+psycopg://",
91+
creator=lambda: connector.connect(
92+
INSTANCE_URI,
93+
"psycopg",
94+
user="my-user",
95+
password="my-password",
96+
db="my-db",
97+
),
98+
)
99+
100+
with pool.connect() as conn:
101+
result = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
102+
print(result)
103+
```
104+
70105
### Async (asyncpg + SQLAlchemy)
71106

72107
**Install:**
@@ -188,7 +223,7 @@ database user][add-iam-user].
188223
```python
189224
connector.connect(
190225
INSTANCE_URI,
191-
"pg8000", # or "asyncpg"
226+
"pg8000", # or "psycopg" (sync) / "asyncpg" (async)
192227
user="service-account@my-project.iam", # omit .gserviceaccount.com suffix
193228
db="my-db",
194229
enable_iam_auth=True,

google/cloud/alloydbconnector/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def __init__(
9797
self._is_sync = False
9898
if client:
9999
self._client = client
100-
elif driver == "pg8000":
100+
elif driver == "pg8000" or driver == "psycopg":
101101
self._client = v1beta.AlloyDBAdminClient(
102102
credentials=credentials,
103103
transport="grpc",
@@ -126,7 +126,11 @@ def __init__(
126126
self._credentials = credentials
127127
# asyncpg does not currently support using metadata exchange
128128
# only use metadata exchange for pg8000 driver
129-
self._use_metadata = True if driver == "pg8000" else False
129+
use_metadata = True
130+
if driver in ("asyncpg", None):
131+
use_metadata = False
132+
133+
self._use_metadata = use_metadata
130134
self._user_agent = user_agent
131135

132136
async def _get_metadata(

google/cloud/alloydbconnector/connector.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import struct
2525
from threading import Thread
2626
from types import TracebackType
27-
from typing import Any, Optional, TYPE_CHECKING
27+
from typing import Any, Callable, Optional, TYPE_CHECKING
2828

2929
from google.auth import default
3030
from google.auth.credentials import TokenState
@@ -39,6 +39,7 @@
3939
from google.cloud.alloydbconnector.instance import RefreshAheadCache
4040
from google.cloud.alloydbconnector.lazy import LazyRefreshCache
4141
import google.cloud.alloydbconnector.pg8000 as pg8000
42+
import google.cloud.alloydbconnector.psycopg as psycopg
4243
from google.cloud.alloydbconnector.static import StaticConnectionInfoCache
4344
from google.cloud.alloydbconnector.types import CacheTypes
4445
from google.cloud.alloydbconnector.utils import generate_keys
@@ -227,8 +228,9 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
227228
self._cache[instance_uri] = cache
228229
logger.debug(f"['{instance_uri}']: Connection info added to cache")
229230

230-
connect_func = {
231+
connect_func: dict[str, Callable[..., Any]] = {
231232
"pg8000": pg8000.connect,
233+
"psycopg": psycopg.connect,
232234
}
233235
# only accept supported database drivers
234236
try:
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import os
17+
import socket
18+
import ssl
19+
import tempfile
20+
import threading
21+
from typing import Any, TYPE_CHECKING
22+
23+
if TYPE_CHECKING:
24+
import psycopg
25+
26+
logger = logging.getLogger(name=__name__)
27+
28+
_CHUNK_SIZE = 8 * 1024 # bytes per recv() call inside the proxy forwarding loop
29+
30+
31+
def _proxy(local: socket.socket, remote: "ssl.SSLSocket") -> None:
32+
"""Bidirectionally proxy bytes between a local Unix socket and a remote
33+
SSL socket.
34+
35+
Spawns one daemon thread for the remote→local direction and runs the
36+
local→remote direction in the calling thread. Blocks until the calling
37+
thread's direction reaches EOF or a socket error, at which point both
38+
sockets are closed so the other thread also unblocks and exits.
39+
40+
Args:
41+
local: The Unix domain socket connected to the database driver.
42+
remote: The SSL socket connected to the AlloyDB proxy server.
43+
"""
44+
45+
def forward(src: Any, dst: Any) -> None:
46+
buf = bytearray(_CHUNK_SIZE)
47+
view = memoryview(buf)
48+
try:
49+
while True:
50+
n = src.recv_into(view)
51+
if n == 0:
52+
logger.debug("psycopg proxy: EOF on %s, closing both sockets", src)
53+
break
54+
dst.sendall(view[:n])
55+
except (OSError, ssl.SSLError) as e:
56+
logger.debug("psycopg proxy: socket error on %s: %s", src, e)
57+
finally:
58+
# Close both ends so the sibling thread also unblocks.
59+
for s in (local, remote):
60+
try:
61+
# shutdown is required on POSIX systems to forcefully
62+
# interrupt the sibling thread's blocking recv_into() call.
63+
# Simply closing the socket does not interrupt the system
64+
# call and leads to leaked threads.
65+
s.shutdown(socket.SHUT_RDWR)
66+
except OSError:
67+
pass
68+
try:
69+
s.close()
70+
except OSError:
71+
pass
72+
73+
threading.Thread(target=forward, args=(remote, local), daemon=True).start()
74+
forward(local, remote) # run in calling thread rather than spawning a third
75+
76+
77+
def connect(remote_sock: "ssl.SSLSocket", **kwargs: Any) -> "psycopg.Connection":
78+
"""Create a psycopg DBAPI connection object.
79+
80+
Because psycopg does not accept a pre-connected socket, this function
81+
creates a temporary Unix domain socket, tells psycopg to connect there,
82+
and runs a background proxy that forwards bytes between that socket and
83+
the already-established AlloyDB TLS connection.
84+
85+
Args:
86+
remote_sock (ssl.SSLSocket): SSL/TLS secure socket stream connected to the
87+
AlloyDB proxy server.
88+
89+
Returns:
90+
psycopg.Connection: A psycopg Connection object for the AlloyDB instance.
91+
"""
92+
try:
93+
import psycopg
94+
except ImportError:
95+
raise ImportError(
96+
'Unable to import module "psycopg." Please install and try again.'
97+
)
98+
99+
tmpdir = tempfile.mkdtemp()
100+
socket_path = os.path.join(tmpdir, ".s.PGSQL.5432")
101+
logger.debug("psycopg: created Unix socket at %s", socket_path)
102+
103+
local_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
104+
local_sock.bind(socket_path)
105+
local_sock.listen(1)
106+
107+
def _accept_and_proxy() -> None:
108+
"""Accept one connection then proxy bytes until the connection closes."""
109+
try:
110+
unix_conn, _ = local_sock.accept()
111+
local_sock.close()
112+
logger.debug("psycopg proxy: accepted connection, starting proxy")
113+
except OSError as e:
114+
logger.debug("psycopg proxy: accept failed: %s", e)
115+
try:
116+
remote_sock.close()
117+
except OSError:
118+
pass
119+
return
120+
_proxy(unix_conn, remote_sock)
121+
122+
threading.Thread(target=_accept_and_proxy, daemon=True).start()
123+
124+
user = kwargs.pop("user")
125+
db = kwargs.pop("db")
126+
passwd = kwargs.pop("password", None)
127+
# SSL is already handled by the underlying SSLSocket; disable it on the
128+
# Unix socket so psycopg does not attempt a second TLS handshake.
129+
kwargs.pop("sslmode", None)
130+
131+
logger.debug("psycopg: connecting as user=%s dbname=%s", user, db)
132+
try:
133+
conn = psycopg.connect(
134+
user=user,
135+
dbname=db,
136+
password=passwd,
137+
host=tmpdir,
138+
port=5432,
139+
sslmode="disable",
140+
**kwargs,
141+
)
142+
logger.debug("psycopg: connection established")
143+
return conn
144+
except Exception as e:
145+
logger.debug("psycopg: connection failed: %s", e)
146+
# psycopg never connected (or failed mid-handshake); close the server
147+
# socket so the proxy thread unblocks and exits cleanly.
148+
try:
149+
local_sock.close()
150+
except OSError:
151+
pass
152+
try:
153+
remote_sock.close()
154+
except OSError:
155+
pass
156+
raise
157+
finally:
158+
# The socket file and its parent directory are only needed during the
159+
# initial connect() call; remove them now regardless of outcome.
160+
try:
161+
os.remove(socket_path)
162+
except OSError:
163+
pass
164+
try:
165+
os.rmdir(tmpdir)
166+
except OSError:
167+
pass

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Changelog = "https://github.com/GoogleCloudPlatform/alloydb-python-connector/blo
6060
[project.optional-dependencies]
6161
pg8000 = ["pg8000>=1.31.1"]
6262
asyncpg = ["asyncpg>=0.31.0"]
63+
psycopg = ["psycopg>=3.1.0"]
6364

6465
[tool.setuptools.dynamic]
6566
version = { attr = "google.cloud.alloydbconnector.version.__version__" }

requirements-test.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ asyncpg==0.31.0
22
mock==5.2.0
33
pg8000==1.31.5
44
psycopg2-binary==2.9.11
5+
psycopg==3.3.3
6+
psycopg-binary==3.3.3
57
pytest==9.0.2
68
pytest-asyncio==1.3.0
79
pytest-cov==7.0.0

0 commit comments

Comments
 (0)