Skip to content

Commit ad367ed

Browse files
committed
feat: add support for psycopg
Psycopg does not support providing a pre-connected socket in its connect API as there is no matching API in the underlying libpq C interface. To work around this limitation, this commit creates an in-memory proxy that copies data between a Unix domain socket and the secure SSL Socket created by the Connector. This approach works, but suffers from poor performance and makes the driver comparable to pg8000 (a pure Python implementation). The best solution would be for psycopg (and libpq) to expose a way to provide a socket creator function as we have in the other drivers. Until then, this is better than nothing. Fixes #377.
1 parent a07e0c3 commit ad367ed

13 files changed

Lines changed: 720 additions & 8 deletions

README.md

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ Python applications. It provides:
3232

3333
[iam-db-authn]: https://cloud.google.com/alloydb/docs/manage-iam-authn
3434

35-
**Supported drivers:** [`pg8000`](https://codeberg.org/tlocke/pg8000) (sync) · [`asyncpg`](https://magicstack.github.io/asyncpg) (async)
35+
**Supported drivers:** [`pg8000`][pg8000] (sync) · [`psycopg`][psycopg] (sync) · [`asyncpg`][asyncpg] (async)
36+
37+
[pg8000]: https://codeberg.org/tlocke/pg8000
38+
[psycopg]: https://www.psycopg.org/
39+
[asyncpg]: https://magicstack.github.io/asyncpg
3640

3741
## Quickstart
3842

@@ -67,6 +71,37 @@ with Connector() as connector:
6771
print(result)
6872
```
6973

74+
### Sync (psycopg + SQLAlchemy)
75+
76+
**Install:**
77+
```sh
78+
pip install "google-cloud-alloydb-connector[psycopg]" sqlalchemy
79+
```
80+
81+
**Connect:**
82+
```python
83+
import sqlalchemy
84+
from google.cloud.alloydbconnector import Connector
85+
86+
INSTANCE_URI = "projects/MY_PROJECT/locations/MY_REGION/clusters/MY_CLUSTER/instances/MY_INSTANCE"
87+
88+
with Connector() as connector:
89+
pool = sqlalchemy.create_engine(
90+
"postgresql+psycopg://",
91+
creator=lambda: connector.connect(
92+
INSTANCE_URI,
93+
"psycopg",
94+
user="my-user",
95+
password="my-password",
96+
db="my-db",
97+
),
98+
)
99+
100+
with pool.connect() as conn:
101+
result = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
102+
print(result)
103+
```
104+
70105
### Async (asyncpg + SQLAlchemy)
71106

72107
**Install:**
@@ -188,7 +223,7 @@ database user][add-iam-user].
188223
```python
189224
connector.connect(
190225
INSTANCE_URI,
191-
"pg8000", # or "asyncpg"
226+
"pg8000", # or "psycopg" (sync) / "asyncpg" (async)
192227
user="service-account@my-project.iam", # omit .gserviceaccount.com suffix
193228
db="my-db",
194229
enable_iam_auth=True,

google/cloud/alloydbconnector/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def __init__(
9797
self._is_sync = False
9898
if client:
9999
self._client = client
100-
elif driver == "pg8000":
100+
elif driver == "pg8000" or driver == "psycopg":
101101
self._client = v1beta.AlloyDBAdminClient(
102102
credentials=credentials,
103103
transport="grpc",
@@ -126,7 +126,11 @@ def __init__(
126126
self._credentials = credentials
127127
# asyncpg does not currently support using metadata exchange
128128
# only use metadata exchange for pg8000 driver
129-
self._use_metadata = True if driver == "pg8000" else False
129+
use_metadata = True
130+
if driver in ("asyncpg", None):
131+
use_metadata = False
132+
133+
self._use_metadata = use_metadata
130134
self._user_agent = user_agent
131135

132136
async def _get_metadata(

google/cloud/alloydbconnector/connector.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import struct
2525
from threading import Thread
2626
from types import TracebackType
27-
from typing import Any, Optional, TYPE_CHECKING
27+
from typing import Any, Callable, Optional, TYPE_CHECKING
2828

2929
from google.auth import default
3030
from google.auth.credentials import TokenState
@@ -39,6 +39,7 @@
3939
from google.cloud.alloydbconnector.instance import RefreshAheadCache
4040
from google.cloud.alloydbconnector.lazy import LazyRefreshCache
4141
import google.cloud.alloydbconnector.pg8000 as pg8000
42+
import google.cloud.alloydbconnector.psycopg as psycopg
4243
from google.cloud.alloydbconnector.static import StaticConnectionInfoCache
4344
from google.cloud.alloydbconnector.types import CacheTypes
4445
from google.cloud.alloydbconnector.utils import generate_keys
@@ -227,8 +228,9 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
227228
self._cache[instance_uri] = cache
228229
logger.debug(f"['{instance_uri}']: Connection info added to cache")
229230

230-
connect_func = {
231+
connect_func: dict[str, Callable[..., Any]] = {
231232
"pg8000": pg8000.connect,
233+
"psycopg": psycopg.connect,
232234
}
233235
# only accept supported database drivers
234236
try:
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import os
17+
import socket
18+
import ssl
19+
import tempfile
20+
import threading
21+
from typing import Any, TYPE_CHECKING
22+
23+
if TYPE_CHECKING:
24+
import psycopg
25+
26+
logger = logging.getLogger(name=__name__)
27+
28+
_CHUNK_SIZE = 8 * 1024 # bytes per recv() call inside the proxy forwarding loop
29+
30+
31+
def _proxy(local: socket.socket, remote: "ssl.SSLSocket") -> None:
32+
"""Bidirectionally proxy bytes between a local Unix socket and a remote
33+
SSL socket.
34+
35+
Spawns one daemon thread for the remote→local direction and runs the
36+
local→remote direction in the calling thread. Blocks until the calling
37+
thread's direction reaches EOF or a socket error, at which point both
38+
sockets are closed so the other thread also unblocks and exits.
39+
40+
Args:
41+
local: The Unix domain socket connected to the database driver.
42+
remote: The SSL socket connected to the AlloyDB proxy server.
43+
"""
44+
45+
def forward(src: Any, dst: Any) -> None:
46+
buf = bytearray(_CHUNK_SIZE)
47+
view = memoryview(buf)
48+
try:
49+
while True:
50+
n = src.recv_into(view)
51+
if n == 0:
52+
logger.debug("psycopg proxy: EOF on %s, closing both sockets", src)
53+
break
54+
dst.sendall(view[:n])
55+
except (OSError, ssl.SSLError) as e:
56+
logger.debug("psycopg proxy: socket error on %s: %s", src, e)
57+
finally:
58+
# Close both ends so the sibling thread also unblocks.
59+
for s in (local, remote):
60+
try:
61+
s.close()
62+
except OSError:
63+
pass
64+
65+
threading.Thread(target=forward, args=(remote, local), daemon=True).start()
66+
forward(local, remote) # run in calling thread rather than spawning a third
67+
68+
69+
def connect(remote_sock: "ssl.SSLSocket", **kwargs: Any) -> "psycopg.Connection":
70+
"""Create a psycopg DBAPI connection object.
71+
72+
Because psycopg does not accept a pre-connected socket, this function
73+
creates a temporary Unix domain socket, tells psycopg to connect there,
74+
and runs a background proxy that forwards bytes between that socket and
75+
the already-established AlloyDB TLS connection.
76+
77+
Args:
78+
remote_sock (ssl.SSLSocket): SSL/TLS secure socket stream connected to the
79+
AlloyDB proxy server.
80+
81+
Returns:
82+
psycopg.Connection: A psycopg Connection object for the AlloyDB instance.
83+
"""
84+
try:
85+
import psycopg
86+
except ImportError:
87+
raise ImportError(
88+
'Unable to import module "psycopg." Please install and try again.'
89+
)
90+
91+
tmpdir = tempfile.mkdtemp()
92+
socket_path = os.path.join(tmpdir, ".s.PGSQL.5432")
93+
logger.debug("psycopg: created Unix socket at %s", socket_path)
94+
95+
local_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
96+
local_sock.bind(socket_path)
97+
local_sock.listen(1)
98+
99+
def _accept_and_proxy() -> None:
100+
"""Accept one connection then proxy bytes until the connection closes."""
101+
try:
102+
unix_conn, _ = local_sock.accept()
103+
local_sock.close()
104+
logger.debug("psycopg proxy: accepted connection, starting proxy")
105+
except OSError as e:
106+
logger.debug("psycopg proxy: accept failed: %s", e)
107+
return
108+
_proxy(unix_conn, remote_sock)
109+
110+
threading.Thread(target=_accept_and_proxy, daemon=True).start()
111+
112+
user = kwargs.pop("user")
113+
db = kwargs.pop("db")
114+
passwd = kwargs.pop("password", None)
115+
# SSL is already handled by the underlying SSLSocket; disable it on the
116+
# Unix socket so psycopg does not attempt a second TLS handshake.
117+
kwargs.pop("sslmode", None)
118+
119+
logger.debug("psycopg: connecting as user=%s dbname=%s", user, db)
120+
try:
121+
conn = psycopg.connect(
122+
user=user,
123+
dbname=db,
124+
password=passwd,
125+
host=tmpdir,
126+
port=5432,
127+
sslmode="disable",
128+
**kwargs,
129+
)
130+
logger.debug("psycopg: connection established")
131+
return conn
132+
except Exception as e:
133+
logger.debug("psycopg: connection failed: %s", e)
134+
# psycopg never connected (or failed mid-handshake); close the server
135+
# socket so the proxy thread unblocks and exits cleanly.
136+
try:
137+
local_sock.close()
138+
except OSError:
139+
pass
140+
try:
141+
remote_sock.close()
142+
except OSError:
143+
pass
144+
raise
145+
finally:
146+
# The socket file and its parent directory are only needed during the
147+
# initial connect() call; remove them now regardless of outcome.
148+
try:
149+
os.remove(socket_path)
150+
except OSError:
151+
pass
152+
try:
153+
os.rmdir(tmpdir)
154+
except OSError:
155+
pass

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Changelog = "https://github.com/GoogleCloudPlatform/alloydb-python-connector/blo
6060
[project.optional-dependencies]
6161
pg8000 = ["pg8000>=1.31.1"]
6262
asyncpg = ["asyncpg>=0.31.0"]
63+
psycopg = ["psycopg>=3.1.0"]
6364

6465
[tool.setuptools.dynamic]
6566
version = { attr = "google.cloud.alloydbconnector.version.__version__" }

requirements-test.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ asyncpg==0.31.0
22
mock==5.2.0
33
pg8000==1.31.5
44
psycopg2-binary==2.9.11
5+
psycopg==3.3.3
6+
psycopg-binary==3.3.3
57
pytest==9.0.2
68
pytest-asyncio==1.3.0
79
pytest-cov==7.0.0
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from datetime import datetime
16+
import os
17+
18+
# [START alloydb_sqlalchemy_connect_connector_psycopg]
19+
import sqlalchemy
20+
21+
from google.cloud.alloydbconnector import Connector
22+
23+
24+
def create_sqlalchemy_engine(
25+
inst_uri: str,
26+
user: str,
27+
password: str,
28+
db: str,
29+
refresh_strategy: str = "background",
30+
) -> tuple[sqlalchemy.engine.Engine, Connector]:
31+
"""Creates a connection pool for an AlloyDB instance and returns the pool
32+
and the connector. Callers are responsible for closing the pool and the
33+
connector.
34+
35+
A sample invocation looks like:
36+
37+
engine, connector = create_sqlalchemy_engine(
38+
inst_uri,
39+
user,
40+
password,
41+
db,
42+
)
43+
with engine.connect() as conn:
44+
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
45+
conn.commit()
46+
curr_time = time[0]
47+
# do something with query result
48+
connector.close()
49+
50+
Args:
51+
instance_uri (str):
52+
The instance URI specifies the instance relative to the project,
53+
region, and cluster. For example:
54+
"projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
55+
user (str):
56+
The database user name, e.g., postgres
57+
password (str):
58+
The database user's password, e.g., secret-password
59+
db (str):
60+
The name of the database, e.g., mydb
61+
refresh_strategy (Optional[str]):
62+
Refresh strategy for the AlloyDB Connector. Can be one of "lazy"
63+
or "background". For serverless environments use "lazy" to avoid
64+
errors resulting from CPU being throttled.
65+
"""
66+
connector = Connector(refresh_strategy=refresh_strategy)
67+
68+
# create SQLAlchemy connection pool
69+
engine = sqlalchemy.create_engine(
70+
"postgresql+psycopg://",
71+
creator=lambda: connector.connect(
72+
inst_uri,
73+
"psycopg",
74+
user=user,
75+
password=password,
76+
db=db,
77+
),
78+
)
79+
return engine, connector
80+
81+
82+
# [END alloydb_sqlalchemy_connect_connector_psycopg]
83+
84+
85+
def test_psycopg_connection() -> None:
86+
"""Basic test to get time from database."""
87+
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
88+
user = os.environ["ALLOYDB_USER"]
89+
password = os.environ["ALLOYDB_PASS"]
90+
db = os.environ["ALLOYDB_DB"]
91+
92+
engine, connector = create_sqlalchemy_engine(inst_uri, user, password, db)
93+
with engine.connect() as conn:
94+
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
95+
conn.commit()
96+
curr_time = time[0]
97+
assert type(curr_time) is datetime
98+
connector.close()
99+
100+
101+
def test_lazy_psycopg_connection() -> None:
102+
"""Basic test to get time from database."""
103+
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
104+
user = os.environ["ALLOYDB_USER"]
105+
password = os.environ["ALLOYDB_PASS"]
106+
db = os.environ["ALLOYDB_DB"]
107+
108+
engine, connector = create_sqlalchemy_engine(inst_uri, user, password, db, "lazy")
109+
with engine.connect() as conn:
110+
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
111+
conn.commit()
112+
curr_time = time[0]
113+
assert type(curr_time) is datetime
114+
connector.close()

0 commit comments

Comments
 (0)