Skip to content

Implementation proposal for #3#4

Open
hi117 wants to merge 4 commits intojackc:masterfrom
hi117:master
Open

Implementation proposal for #3#4
hi117 wants to merge 4 commits intojackc:masterfrom
hi117:master

Conversation

@hi117
Copy link

@hi117 hi117 commented Jul 21, 2025

This change has some breaking changes, so might require a v2, but I couldn't sanely get around that.

Let me know about any other changes/feedback on this.

@hi117
Copy link
Author

hi117 commented Jul 21, 2025

#3 since it didn't link from putting it in the title.

@jackc
Copy link
Owner

jackc commented Aug 2, 2025

We wouldn't need a v2 because we are still pre v1.

But with regards to the actual change, would it be possible to test this new functionality?

@hi117
Copy link
Author

hi117 commented Aug 6, 2025

Ok, I found an issue with the current implementation. Because listen() waits forever, we can't run subsequent Exec() calls to listen on new channels.

A way around this is to make the context we pass to WaitForNotification cancelable, and cancel the context when we want to listen on a new channel. That does introduce a race condition where a new Handle() call and WaitForNotification race for who can send a postgres message again. If we had a mutex that got unlocked after WaitForNotification started waiting, then that could work, but I don't want to just call sleep() in normal code. Do you have any ideas how we could unlock a mutex only after WaitForNotification wins the race? @jackc

@jackc
Copy link
Owner

jackc commented Aug 9, 2025

Yeah, it does get a lot more complicated... That's why my initial implementation for pgxlisten doesn't allow changes once started. 🤷

FWIW, at the pgx level notifications are buffered. So it would be possible to stop listening, send a new listen command, then resume listening and not lose any. The concurrency issues inside of pgxlisten get messy though.

I'm curious about the underlying need for this feature as well. In the use case I had for it, all channels are known when the application starts. And in all the time I've used PostgreSQL I don't think I've had a case for changing what channels are listened to at run time.

Along with that, would it be possible for your application to replace pgxlisteners whenever the channels you want to listen to change?

@hi117
Copy link
Author

hi117 commented Aug 9, 2025

FWIW, at the pgx level notifications are buffered. So it would be possible to stop listening, send a new listen command, then resume listening and not lose any. The concurrency issues inside of pgxlisten get messy though.

Good because I think I found a way to do it.

We use 2 normal mutexes, one prevents the listen for more notifications thread from winning the race after we call context cancel, and the other is the actual lock on the connection.

When we want to add a new channel to listen on, we first pull the one that prevents the listener from winning the race, then we cancel the context, then we pull the actual connection mutex, run our command, then we let go of both mutexes.

The listener thread will pull the prevent listener mutex, then pull the connection mutex, then release the prevent listener mutex, then listen, then release the connection mutex.

This should ensure that there are no race conditions and no errors from two things trying to use the connection at once.

Here's a diagram of the process:

sequenceDiagram
    participant l as listener
    participant n as someNewThing
    participant lm as listenMux
    participant cm as connMux
    participant ctx as context
    activate ctx
    l -> lm: listener pulls listener mutex
    activate lm
    l -> cm: listener pulls connection mutex
    activate cm
    l -> lm: listener releases listener mutex
    deactivate lm
    n -> lm: new thing pulls listener mutex
    activate lm
    n -> ctx: new thing cancels context
    deactivate ctx
    ctx -> l: context tells listener to wake up
    n --> cm: new thing waits for the connection mutex
    l -> cm: listener releases connection mutex
    deactivate cm
    l --> lm: listener waits for listener mutex
    n -> cm: new thing pulls the connection mutex
    activate cm
    n -> cm: new thing runs its command
    n -> cm: new thing releases connection mutex
    deactivate cm
    n -> lm: new thing releases listener mutex
    deactivate lm
    l -> lm: listener pulls listener mutex
    activate lm
    l -> cm: listener pulls connection mutex
    activate cm
    l -> lm: listener releases listener mutex
    deactivate lm
    l -> ctx: listens for events with new context
    activate ctx
    deactivate cm
    deactivate ctx
Loading

I'm curious about the underlying need for this feature as well.

What I want to make is a sort of command and response system. I want to be able to send a message to a set of other hosts and give them a channel to send the reply back on. Because I'll be generating a new reply channel every time, I'll need to listen on new channels after program start. There are other things too when it comes to sending commands (for instance, to clients of a web server connected via websocket) where it can make sense to do this.

@jackc
Copy link
Owner

jackc commented Aug 30, 2025

This seems pretty complicated both on the pgxlisten side and on the side of your system architecture.

On the pgxlisten side, I wonder if it would make more sense in your use case to use pgx directly instead. My original use case for pgxlisten was I had multiple separate kinds of notifications, but I only wanted to use a single PostgreSQL connection. So I needed a multiplexer. In your case, you have many channels, but only a single kind of notification. I suspect that a few dozen lines of code using pgx.Conn directly would probably be simpler than adapting pgxlisten and give you closer to what you are looking for.

On the system side, I would suggest building a proof of concept for that type of usage of channels. I know notifications can have some performance issues at high usage, and I would be curious if they also have any performance cliffs with large numbers of channels.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants