HCMPRE-3432:: USER-DEACTIVATION-FIX#1341
Conversation
📝 WalkthroughWalkthroughThe egov-user service gains user status change event emission: configuration exposes a Kafka topic and feature flag, a new UserProducer service sends events, and UserService detects when a user's active flag changes during updates and publishes events containing user UUID, tenant id, status, and timestamp. ChangesUser Status Change Event Emission
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@core-services/egov-user/src/main/java/org/egov/user/domain/service/UserService.java`:
- Around line 385-396: The Kafka publish in UserService (around the block that
builds userStatusChangeEvent and calls userProducer.push) must be made
fail-open: wrap the userProducer.push(...) call in a try-catch that catches the
CustomException (or Exception if CustomException type isn't accessible) and logs
the failure with contextual info (topic name from
userServiceConstants.getUserStatusChangeTopic(), user.getUuid(),
user.getTenantId(), user.getActive()) instead of letting the exception
propagate; do not rethrow so the DB update remains successful even if Kafka
publish fails. Ensure the try-catch is local to the publish logic so
transactional semantics of the surrounding update methods are unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6c493d2d-9af0-4d2a-a2e8-ca6c7831151b
📒 Files selected for processing (4)
core-services/egov-user/src/main/java/org/egov/user/config/UserServiceConstants.javacore-services/egov-user/src/main/java/org/egov/user/domain/service/UserService.javacore-services/egov-user/src/main/java/org/egov/user/kafka/UserProducer.javacore-services/egov-user/src/main/resources/application.properties
| // Check if active status has changed and feature flag is enabled | ||
| if (userServiceConstants.isUserStatusChangeEventEnabled() && existingUserActiveStatus != user.getActive()) { | ||
| // Store user status change details in a map | ||
| Map<String, String> userStatusChangeEvent = new HashMap<>(); | ||
| userStatusChangeEvent.put(USER_UUID_KEY, user.getUuid()); | ||
| userStatusChangeEvent.put(TENANT_ID_KEY, user.getTenantId()); | ||
| userStatusChangeEvent.put(ACTIVE_KEY, String.valueOf(user.getActive())); | ||
| userStatusChangeEvent.put(EFFECTIVE_DATE_KEY, String.valueOf(System.currentTimeMillis())); | ||
|
|
||
| // Publish user status change event to Kafka | ||
| userProducer.push(userServiceConstants.getUserStatusChangeTopic(), user.getUuid(), userStatusChangeEvent); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Spring Kafka 1.3.x / Kafka client 1.1.1, under what conditions can KafkaTemplate.send(...) throw synchronously before returning (e.g., serialization/config/topic metadata errors)?
💡 Result:
In Spring Kafka 1.3.x (using Kafka client 1.1.1), the KafkaTemplate.send(...) method is designed to be asynchronous and typically returns a ListenableFuture without throwing exceptions immediately [1]. However, it can throw exceptions synchronously before returning under specific conditions, primarily related to the underlying KafkaProducer's initialization and record preparation [2][3]. The primary conditions under which KafkaTemplate.send(...) may throw synchronously include: 1. Serialization Errors: If the configured key or value serializers fail to serialize the record, the underlying KafkaProducer.send call will throw a SerializationException [3]. Because this occurs during the preparation of the record before it is added to the producer's buffer, it propagates synchronously to the caller of KafkaTemplate.send(...) [2]. 2. Producer State/Lifecycle Errors: If the producer is in an invalid state—such as being closed or having encountered a fatal, irrecoverable error—the KafkaProducer.send method may throw an IllegalStateException or other KafkaException synchronously [4][3]. 3. Buffer/Metadata Blocking (Configuration Dependent): While KafkaProducer.send is generally asynchronous, it may block synchronously if the producer's internal buffer is full or if it is waiting for metadata (e.g., topic partition information) and the max.block.ms timeout is reached [5][6]. In such cases, the call will throw a TimeoutException synchronously [6]. It is important to note that many common "errors" (such as network connectivity issues or broker-side failures) are typically handled asynchronously [6]. These errors are reported via the ListenableFuture (e.g., through a callback or by calling.get) rather than being thrown synchronously from the send method itself [7][1]. To ensure robust error handling, it is recommended to wrap the send operation in a try-catch block to handle immediate synchronous exceptions, and to use the returned Future to handle asynchronous completion or failure [8][6].
Citations:
- 1: https://docs.spring.io/spring-kafka/docs/1.3.0.RC1/reference/html/_reference.html
- 2: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java
- 3: https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
- 4: https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
- 5: https://stackoverflow.com/questions/45529308/how-to-send-messages-synchronously-in-kafka
- 6: KafkaTemplate.send(...) does not invoke FailureCallback when broker is down spring-projects/spring-kafka#1807
- 7: https://stackoverflow.com/questions/60458161/exception-handling-on-sending-a-message-via-kafka-template
- 8: https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html
🏁 Script executed:
find . -type f -name "UserProducer.java" | head -5Repository: egovernments/Digit-Core
Length of output: 145
🏁 Script executed:
# Search for UserProducer class definition
rg "class UserProducer" -t javaRepository: egovernments/Digit-Core
Length of output: 171
🏁 Script executed:
# Find the updateWithoutOtpValidation method to understand transaction context
rg "updateWithoutOtpValidation" -t java -A 30 | head -100Repository: egovernments/Digit-Core
Length of output: 14161
🏁 Script executed:
cat -n core-services/egov-user/src/main/java/org/egov/user/kafka/UserProducer.javaRepository: egovernments/Digit-Core
Length of output: 790
🏁 Script executed:
# Find the updateWithoutOtpValidation method in UserService
rg "public.*updateWithoutOtpValidation" -t java -A 50 | head -150Repository: egovernments/Digit-Core
Length of output: 7472
🏁 Script executed:
# Check for `@Transactional` annotations in UserService
rg "`@Transactional`|public.*updateWithoutOtpValidation|userProducer.push" -t java -B 2 -A 10 core-services/egov-user/src/main/java/org/egov/user/domain/service/UserService.java | head -200Repository: egovernments/Digit-Core
Length of output: 1478
🏁 Script executed:
# Check for `@Transactional` and class-level decorators on UserService
rg "class UserService|`@Transactional`|`@Service`" -t java core-services/egov-user/src/main/java/org/egov/user/domain/service/UserService.java | head -20Repository: egovernments/Digit-Core
Length of output: 103
🏁 Script executed:
# Search for CustomKafkaTemplate implementation to understand exception behavior
find . -type f -name "CustomKafkaTemplate.java" | head -5Repository: egovernments/Digit-Core
Length of output: 161
🏁 Script executed:
# Look for other uses of userProducer.push() in the codebase to see if there's error handling elsewhere
rg "userProducer.push|\.push\(" -t java -B 2 -A 2 core-services/egov-user/ | head -100Repository: egovernments/Digit-Core
Length of output: 675
🏁 Script executed:
cat -n ./core-services/libraries/tracer/src/main/java/org/egov/tracer/kafka/CustomKafkaTemplate.javaRepository: egovernments/Digit-Core
Length of output: 2441
🏁 Script executed:
# Check for error handling patterns in other Kafka producers in the codebase
rg "try.*catch|kafkaTemplate.send|KafkaTemplate" -t java -B 2 -A 3 core-services/ | grep -A 5 -B 5 "catch\|push\|send" | head -150Repository: egovernments/Digit-Core
Length of output: 17884
Guard Kafka publish failure from breaking a successful user update.
At line 395, Kafka event emission runs after the DB update commits. Since CustomKafkaTemplate.send() blocks with .get() and throws CustomException on any failure, if publish fails the API returns error even though user status is already persisted in the database. This causes clients to perceive failure and potentially retry, while the state was actually updated. Handle publish exceptions here (fail-open with logging) rather than fail-closed.
Proposed fix
- if (userServiceConstants.isUserStatusChangeEventEnabled() && existingUserActiveStatus != user.getActive()) {
+ if (userServiceConstants.isUserStatusChangeEventEnabled() && existingUserActiveStatus != user.getActive()) {
// Store user status change details in a map
Map<String, String> userStatusChangeEvent = new HashMap<>();
userStatusChangeEvent.put(USER_UUID_KEY, user.getUuid());
userStatusChangeEvent.put(TENANT_ID_KEY, user.getTenantId());
userStatusChangeEvent.put(ACTIVE_KEY, String.valueOf(user.getActive()));
userStatusChangeEvent.put(EFFECTIVE_DATE_KEY, String.valueOf(System.currentTimeMillis()));
// Publish user status change event to Kafka
- userProducer.push(userServiceConstants.getUserStatusChangeTopic(), user.getUuid(), userStatusChangeEvent);
+ try {
+ userProducer.push(userServiceConstants.getUserStatusChangeTopic(), user.getUuid(), userStatusChangeEvent);
+ } catch (Exception ex) {
+ log.error("Failed to publish user status change event for userUuid: {}", user.getUuid(), ex);
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check if active status has changed and feature flag is enabled | |
| if (userServiceConstants.isUserStatusChangeEventEnabled() && existingUserActiveStatus != user.getActive()) { | |
| // Store user status change details in a map | |
| Map<String, String> userStatusChangeEvent = new HashMap<>(); | |
| userStatusChangeEvent.put(USER_UUID_KEY, user.getUuid()); | |
| userStatusChangeEvent.put(TENANT_ID_KEY, user.getTenantId()); | |
| userStatusChangeEvent.put(ACTIVE_KEY, String.valueOf(user.getActive())); | |
| userStatusChangeEvent.put(EFFECTIVE_DATE_KEY, String.valueOf(System.currentTimeMillis())); | |
| // Publish user status change event to Kafka | |
| userProducer.push(userServiceConstants.getUserStatusChangeTopic(), user.getUuid(), userStatusChangeEvent); | |
| } | |
| // Check if active status has changed and feature flag is enabled | |
| if (userServiceConstants.isUserStatusChangeEventEnabled() && existingUserActiveStatus != user.getActive()) { | |
| // Store user status change details in a map | |
| Map<String, String> userStatusChangeEvent = new HashMap<>(); | |
| userStatusChangeEvent.put(USER_UUID_KEY, user.getUuid()); | |
| userStatusChangeEvent.put(TENANT_ID_KEY, user.getTenantId()); | |
| userStatusChangeEvent.put(ACTIVE_KEY, String.valueOf(user.getActive())); | |
| userStatusChangeEvent.put(EFFECTIVE_DATE_KEY, String.valueOf(System.currentTimeMillis())); | |
| // Publish user status change event to Kafka | |
| try { | |
| userProducer.push(userServiceConstants.getUserStatusChangeTopic(), user.getUuid(), userStatusChangeEvent); | |
| } catch (Exception ex) { | |
| log.error("Failed to publish user status change event for userUuid: {}", user.getUuid(), ex); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@core-services/egov-user/src/main/java/org/egov/user/domain/service/UserService.java`
around lines 385 - 396, The Kafka publish in UserService (around the block that
builds userStatusChangeEvent and calls userProducer.push) must be made
fail-open: wrap the userProducer.push(...) call in a try-catch that catches the
CustomException (or Exception if CustomException type isn't accessible) and logs
the failure with contextual info (topic name from
userServiceConstants.getUserStatusChangeTopic(), user.getUuid(),
user.getTenantId(), user.getActive()) instead of letting the exception
propagate; do not rethrow so the DB update remains successful even if Kafka
publish fails. Ensure the try-catch is local to the publish logic so
transactional semantics of the surrounding update methods are unchanged.
Summary by CodeRabbit
Release Notes