Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit aee206e

Browse files
committed
add req_type property to _process_requests
1 parent c69e9c9 commit aee206e

2 files changed

Lines changed: 20 additions & 23 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import logging
2222
import threading
2323
import typing
24-
from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
24+
from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple, Union, Literal
2525
import uuid
2626

2727
from opentelemetry import trace
@@ -220,6 +220,7 @@ def _process_requests(
220220
ack_reqs_dict: Dict[str, requests.AckRequest],
221221
errors_dict: Optional[Dict[str, str]],
222222
ack_histogram: Optional[histogram.Histogram] = None,
223+
req_type: Union[Literal["ack"], Literal["modack"]] = "ack",
223224
):
224225
"""Process requests when exactly-once delivery is enabled by referring to
225226
error_status and errors_dict.
@@ -230,38 +231,36 @@ def _process_requests(
230231
"""
231232
requests_completed = []
232233
requests_to_retry = []
233-
for ack_id in ack_reqs_dict:
234+
for ack_id, ack_request in ack_reqs_dict.items():
234235
# Debug logging: slow acks
235-
if ack_histogram and ack_reqs_dict[
236-
ack_id
237-
].time_to_ack > ack_histogram.percentile(percent=99):
236+
if req_type == "ack" and ack_histogram and ack_request.time_to_ack > ack_histogram.percentile(percent=99):
238237
_SLOW_ACK_LOGGER.debug(
239238
"Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration",
240-
ack_reqs_dict[ack_id].message_id,
241-
ack_reqs_dict[ack_id].ack_id,
239+
ack_request.message_id,
240+
ack_request.ack_id,
242241
)
243242

244243
# Handle special errors returned for ack/modack RPCs via the ErrorInfo
245244
# sidecar metadata when exactly-once delivery is enabled.
246245
if errors_dict and ack_id in errors_dict:
247246
exactly_once_error = errors_dict[ack_id]
248247
if exactly_once_error.startswith("TRANSIENT_"):
249-
requests_to_retry.append(ack_reqs_dict[ack_id])
248+
requests_to_retry.append(ack_request)
250249
else:
251250
if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID":
252251
exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
253252
else:
254253
exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
255-
future = ack_reqs_dict[ack_id].future
254+
future = ack_request.future
256255
if future is not None:
257256
future.set_exception(exc)
258-
requests_completed.append(ack_reqs_dict[ack_id])
257+
requests_completed.append(ack_request)
259258
# Temporary GRPC errors are retried
260259
elif (
261260
error_status
262261
and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
263262
):
264-
requests_to_retry.append(ack_reqs_dict[ack_id])
263+
requests_to_retry.append(ack_request)
265264
# Other GRPC errors are NOT retried
266265
elif error_status:
267266
if error_status.code == code_pb2.PERMISSION_DENIED:
@@ -270,20 +269,20 @@ def _process_requests(
270269
exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None)
271270
else:
272271
exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
273-
future = ack_reqs_dict[ack_id].future
272+
future = ack_request.future
274273
if future is not None:
275274
future.set_exception(exc)
276-
requests_completed.append(ack_reqs_dict[ack_id])
275+
requests_completed.append(ack_request)
277276
# Since no error occurred, requests with futures are completed successfully.
278-
elif ack_reqs_dict[ack_id].future:
279-
future = ack_reqs_dict[ack_id].future
277+
elif ack_request.future:
278+
future = ack_request.future
280279
# success
281280
assert future is not None
282281
future.set_result(AcknowledgeStatus.SUCCESS)
283-
requests_completed.append(ack_reqs_dict[ack_id])
282+
requests_completed.append(ack_request)
284283
# All other requests are considered completed.
285284
else:
286-
requests_completed.append(ack_reqs_dict[ack_id])
285+
requests_completed.append(ack_request)
287286

288287
return requests_completed, requests_to_retry
289288

@@ -743,7 +742,7 @@ def send_unary_ack(
743742

744743
if self._exactly_once_delivery_enabled():
745744
requests_completed, requests_to_retry = _process_requests(
746-
error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram
745+
error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram, "ack"
747746
)
748747
else:
749748
requests_completed = []
@@ -837,7 +836,7 @@ def send_unary_modack(
837836

838837
if self._exactly_once_delivery_enabled():
839838
requests_completed, requests_to_retry = _process_requests(
840-
error_status, ack_reqs_dict, modack_errors_dict, self.ack_histogram
839+
error_status, ack_reqs_dict, modack_errors_dict, self.ack_histogram, "modack"
841840
)
842841
else:
843842
requests_completed = []

samples/snippets/subscriber_test.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def subscriber_client() -> Generator[pubsub_v1.SubscriberClient, None, None]:
164164
def _publish_messages(
165165
publisher_client: pubsub_v1.PublisherClient,
166166
topic: str,
167-
message_num: int = 2,
167+
message_num: int = 5,
168168
**attrs: Any, # noqa: ANN401
169169
) -> List[str]:
170170
message_ids = []
@@ -1013,9 +1013,7 @@ def test_receive_messages_with_exactly_once_delivery_enabled(
10131013
PROJECT_ID, subscription_eod_for_receive_name, 200
10141014
)
10151015

1016-
out, err = capsys.readouterr()
1017-
if err:
1018-
print(err)
1016+
out, _ = capsys.readouterr()
10191017
assert subscription_eod_for_receive_name in out
10201018
for message_id in message_ids:
10211019
assert message_id in out

0 commit comments

Comments
 (0)