Skip to content

Add shared AMQP connection registry with generation-based channel invalidation#117

Open
jwage wants to merge 7 commits into
mainfrom
feature/connection-registry
Open

Add shared AMQP connection registry with generation-based channel invalidation#117
jwage wants to merge 7 commits into
mainfrom
feature/connection-registry

Conversation

@jwage

@jwage jwage commented May 7, 2026

Copy link
Copy Markdown
Owner

fixes #116

@jwage jwage self-assigned this May 7, 2026
@jwage jwage added the improvement Not quite an enhancement, but an improvement to something label May 7, 2026
@jwage jwage force-pushed the feature/connection-registry branch from c26fd6b to 94be5f1 Compare May 7, 2026 16:48
@jwage

jwage commented May 7, 2026

Copy link
Copy Markdown
Owner Author

Would it be possible to get some help reviewing and testing this? @kraz @TamasSzigeti TamasSzigeti @dantleech

@kraz kraz left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the first sight, this new functionality is intended to be an "opt-in", but in reality the default state (none) actually changes how the connections are being managed, compared to before the connection_reuse was introduced.

Also, I remember there was a reported issue somewhere that when running the symfony messenger consumer from a frankenphp cli it forks the current process. So, it might be a good idea to use the process id as additional discriminator in AmqpConnectionRegistry::get along side with the generation so it does not reuse the same key.

Comment on lines 67 to 72
public function isConnected(): bool
{
return $this->connection !== null && $this->connection->isConnected();
return $this->amqpConnectionRegistry
->get($this->registryKey, $this->connectionConfig)
->isConnected();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should not introduce side effects. May be its better for the connection registry to have a has method, so it does not mutate its state unexpectedly.

public function close(): void
{
$this->consumer?->stop();
$this->connection?->close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now all the connections will remain in memory until the process is terminated? This does not seem like an opt-in functionality.

Comment on lines 82 to 88
public function reconnect(): void
{
$this->connection?->reconnect();
$this->channel = null;
$this->amqpConnectionRegistry->reconnect($this->registryKey, $this->connectionConfig);
$this->clearChannelState();
$this->consumer?->stop();
$this->consumer = null;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even on the old variant of the code, I am wondering why the connection is reinitialized before stopping the consumer. In my opinion it might be better to first terminate the consumer and then reinitialize the connection:

try { $this->consumer?->stop(); } catch (\Throwable) {}
$this->consumer = null;
$this->amqpConnectionRegistry->reconnect($this->connectionIdentity, $this->connectionConfig);
$this->clearChannelState();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Not quite an enhancement, but an improvement to something

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Reuse AMQP connections across Messenger transports in-process

2 participants