Skip to content

Commit c32b6c5

Browse files
Added event Hyperf\Kafka\Event\FailToAck and Hyperf\Kafka\Event\FailToRequeue. (#7585)
1 parent 35d3172 commit c32b6c5

3 files changed

Lines changed: 77 additions & 2 deletions

File tree

src/ConsumerManager.php

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
use Hyperf\Kafka\Event\AfterConsumerConfigCreated;
2424
use Hyperf\Kafka\Event\BeforeConsume;
2525
use Hyperf\Kafka\Event\BeforeLongLangConsumerCreated;
26+
use Hyperf\Kafka\Event\FailToAck;
2627
use Hyperf\Kafka\Event\FailToConsume;
28+
use Hyperf\Kafka\Event\FailToRequeue;
2729
use Hyperf\Kafka\Exception\InvalidConsumeResultException;
2830
use Hyperf\Process\AbstractProcess;
2931
use Hyperf\Process\ProcessManager;
@@ -131,11 +133,26 @@ function (ConsumeMessage $message) use ($consumer, $consumerConfig) {
131133
}
132134

133135
if ($result === Result::ACK) {
134-
$message->getConsumer()->ack($message);
136+
try {
137+
$message->getConsumer()->ack($message);
138+
} catch (Throwable $exception) {
139+
$this->dispatcher?->dispatch(new FailToAck($consumer, $message, $exception));
140+
throw $exception;
141+
}
135142
}
136143

137144
if ($result === Result::REQUEUE) {
138-
$this->producer->send($message->getTopic(), $message->getValue(), $message->getKey(), $message->getHeaders());
145+
try {
146+
$this->producer->send(
147+
$message->getTopic(),
148+
$message->getValue(),
149+
$message->getKey(),
150+
$message->getHeaders()
151+
);
152+
} catch (Throwable $exception) {
153+
$this->dispatcher?->dispatch(new FailToRequeue($consumer, $message, $exception));
154+
throw $exception;
155+
}
139156
}
140157
}
141158

src/Event/FailToAck.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\Kafka\Event;
14+
15+
use Hyperf\Kafka\AbstractConsumer;
16+
use Throwable;
17+
18+
class FailToAck extends Consume
19+
{
20+
public function __construct(AbstractConsumer $consumer, $data, protected Throwable $throwable)
21+
{
22+
parent::__construct($consumer, $data);
23+
}
24+
25+
public function getThrowable(): Throwable
26+
{
27+
return $this->throwable;
28+
}
29+
}

src/Event/FailToRequeue.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://hyperf.wiki
9+
* @contact group@hyperf.io
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
13+
namespace Hyperf\Kafka\Event;
14+
15+
use Hyperf\Kafka\AbstractConsumer;
16+
use Throwable;
17+
18+
class FailToRequeue extends Consume
19+
{
20+
public function __construct(AbstractConsumer $consumer, $data, protected Throwable $throwable)
21+
{
22+
parent::__construct($consumer, $data);
23+
}
24+
25+
public function getThrowable(): Throwable
26+
{
27+
return $this->throwable;
28+
}
29+
}

0 commit comments

Comments
 (0)