Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/commerce-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
// add-ons
implementation(project(":modules:jpa"))
implementation(project(":modules:redis"))
implementation(project(":modules:kafka"))
implementation(project(":supports:jackson"))
implementation(project(":supports:logging"))
implementation(project(":supports:monitoring"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.loopers.domain.payment.PaymentApproveInfo;
import com.loopers.domain.payment.PaymentApproveResponse;
import com.loopers.domain.payment.PaymentApproveResponse.Meta.Result;
import com.loopers.domain.payment.PaymentEvent.PaymentPaid;
import com.loopers.domain.payment.PaymentEvent.PaymentFailed;
import com.loopers.domain.payment.PaymentEventPublisher;
import com.loopers.domain.payment.PaymentGateway;
import com.loopers.domain.payment.PaymentService;
import com.loopers.domain.payment.TransactionStatus;
Expand Down Expand Up @@ -43,6 +46,7 @@ public class PaymentFacade {
private final PaymentGateway paymentGateway;
private final ProductCacheService productCacheService;
private final PointService pointService;
private final PaymentEventPublisher paymentEventPublisher;

@Transactional
public PaymentInfo requestPaidPayment(Long userId, Long paymentId) {
Expand Down Expand Up @@ -86,6 +90,7 @@ private PaymentApproveResponse approvePayment(Payment payment) {

private void paymentSuccess(final Payment payment) {
payment.paid();
paymentEventPublisher.publish(PaymentPaid.from(payment));
orderService.paid(payment.getOrderId());
Order order = orderService.findById(payment.getOrderId());
List<OrderItem> orderItems = orderService.getOrderItemsByOrderId(order.getId());
Expand Down Expand Up @@ -114,6 +119,7 @@ private void paymentSuccess(final Payment payment) {
}
private void paymentFail(final Payment payment, final String message) {
payment.fail("PG ์Šน์ธ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: " + message);
paymentEventPublisher.publish(PaymentFailed.from(payment));
orderService.fail(payment.getOrderId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import com.loopers.domain.brand.Brand;
import com.loopers.domain.brand.BrandService;
import com.loopers.domain.product.EventType;
import com.loopers.domain.product.Product;
import com.loopers.domain.product.ProductEvent;
import com.loopers.domain.product.ProductEventPublisher;
import com.loopers.domain.product.ProductEventService;
import com.loopers.domain.product.ProductOutboxEvent;
import com.loopers.domain.product.ProductService;
import java.time.Duration;
import java.util.List;
Expand All @@ -13,6 +18,7 @@
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@RequiredArgsConstructor
Expand All @@ -23,7 +29,9 @@ public class ProductFacade {

private final ProductCacheService productCacheService;
private final ProductService productService;
private final ProductEventService productEventService;
private final BrandService brandService;
private final ProductEventPublisher productEventPublisher;

public ProductWithBrandInfo getProductDetail(final Long productId) {
ProductWithBrandInfo cached = productCacheService.readDetail(productId);
Expand All @@ -36,7 +44,7 @@ public ProductWithBrandInfo getProductDetail(final Long productId) {
ProductWithBrandInfo result = ProductWithBrandInfo.from(product, brand);

productCacheService.createOrUpdateDetail(productId, result, TTL);

productEventPublisher.publish(ProductEvent.ProductViewed.from(productId));
return result;
}

Expand Down Expand Up @@ -70,4 +78,13 @@ private Map<Long, Brand> getBrandMapByBrandIds(final Set<Long> brandIds) {
.collect(Collectors.toMap(Brand::getId, b -> b));
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void productViewOutboxHandle(final ProductEvent.ProductViewed event) {
productEventService.saveOutboxEvent(ProductOutboxEvent.create(EventType.VIEWED, event.productId()));
}

@Transactional
public void productViewPublishKafka(final ProductEvent.ProductViewed event) {
productEventService.findByProductEvent(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.loopers.domain.payment;

public class PaymentEvent {
public record PaymentPaid(Payment payment) {
public static PaymentPaid from(final Payment payment) {
return new PaymentPaid(payment);
}
}
public record PaymentFailed(Payment payment) {
public static PaymentFailed from(final Payment payment) {
return new PaymentFailed(payment);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.loopers.domain.payment;

import com.loopers.domain.payment.PaymentEvent.PaymentPaid;

public interface PaymentEventPublisher {
void publish(PaymentPaid paymentCreated);
void publish(PaymentEvent.PaymentFailed paymentFailed);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.loopers.domain.product;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public enum EventType {
VIEWED("์ƒํ’ˆ ์ƒ์„ธ ์กฐํšŒ"),
;

private final String description;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.loopers.domain.product;

public record ProductEvent() {
public record ProductViewed(Long productId) {
public static ProductViewed from(Long productId) {
return new ProductViewed(productId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.loopers.domain.product;

public interface ProductEventPublisher {
void publish(ProductEvent.ProductViewed event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.loopers.domain.product;

import com.loopers.domain.product.ProductEvent.ProductViewed;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@RequiredArgsConstructor
@Transactional(readOnly = true)
@Component
public class ProductEventService {
private final ProductOutboxEventRepository productOutboxEventRepository;

@Transactional
public ProductOutboxEvent saveOutboxEvent(final ProductOutboxEvent productOutboxEvent) {
return productOutboxEventRepository.save(productOutboxEvent);
}

public void findByProductEvent(final ProductViewed event) {
productOutboxEventRepository.findBy
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.loopers.domain.product;

import com.loopers.domain.BaseEntity;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Table;
import lombok.Getter;

@Getter
@Table(name = "product_outbox_events")
@Entity
public class ProductOutboxEvent extends BaseEntity {

@Enumerated(EnumType.STRING)
private EventType eventType;

@Enumerated(EnumType.STRING)
private ProductOutboxStatus productOutboxStatus;

private Long productId;

public static ProductOutboxEvent create(final EventType eventType, final Long productId) {
ProductOutboxEvent productOutboxEvent = new ProductOutboxEvent();
productOutboxEvent.eventType = eventType;
productOutboxEvent.productOutboxStatus = ProductOutboxStatus.PENDING;
productOutboxEvent.productId = productId;
return productOutboxEvent;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.loopers.domain.product;

public interface ProductOutboxEventRepository {
ProductOutboxEvent save(ProductOutboxEvent productOutboxEvent);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.loopers.domain.product;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public enum ProductOutboxStatus {
PENDING("์ด๋ฒคํŠธ ๋ฐœํ–‰ ๋Œ€๊ธฐ ์ค‘"),
PUBLISHED("์ด๋ฒคํŠธ ๋ฐœํ–‰ ์™„๋ฃŒ"),
FAILED("์ด๋ฒคํŠธ ๋ฐœํ–‰ ์‹คํŒจ"),
;

private final String description;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.loopers.infrastructure.payment;

import com.loopers.domain.payment.PaymentEvent.PaymentPaid;
import com.loopers.domain.payment.PaymentEvent.PaymentFailed;
import com.loopers.domain.payment.PaymentEventPublisher;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
public class PaymentCoreEventPublisher implements PaymentEventPublisher {

private static final String paidTopic = "payment.paid";
private static final String failedTopic = "payment.failed";
private final KafkaTemplate<Object, Object> kafkaTemplate;


@Override
public void publish(final PaymentPaid paymentCreated) {
kafkaTemplate.send(paidTopic, paymentCreated.payment().getId(), paymentCreated);
}

@Override
public void publish(final PaymentFailed paymentFailed) {
kafkaTemplate.send(failedTopic, paymentFailed.payment().getId(), paymentFailed);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.loopers.infrastructure.product;

import com.loopers.domain.product.ProductEvent;
import com.loopers.domain.product.ProductEventPublisher;
import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
public class ProductCoreEventPublisher implements ProductEventPublisher {

private final ApplicationEventPublisher applicationEventPublisher;
// private final KafkaTemplate<Object, Object> kafkaTemplate;

@Override
public void publish(final ProductEvent.ProductViewed event) {
applicationEventPublisher.publishEvent(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.loopers.infrastructure.product;

import com.loopers.domain.product.ProductOutboxEvent;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;

public interface ProductOutboxEventJpaRepository extends JpaRepository<ProductOutboxEvent, Long> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.loopers.infrastructure.product;

import com.loopers.domain.product.ProductOutboxEvent;
import com.loopers.domain.product.ProductOutboxEventRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
public class ProductOutboxEventRepositoryImpl implements ProductOutboxEventRepository {
private final ProductOutboxEventJpaRepository productOutboxEventJpaRepository;

@Override
public ProductOutboxEvent save(final ProductOutboxEvent productOutboxEvent) {
return productOutboxEventJpaRepository.save(productOutboxEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.loopers.interfaces.event.order;

import com.loopers.confg.kafka.KafkaConfig;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class OrderEventListener {

@KafkaListener(
topics = {"payment.paid"},
containerFactory = KafkaConfig.BATCH_LISTENER,
groupId = "order"
)
public void orderPaidListener(
List<ConsumerRecord<Object,Object>> messages,
Acknowledgment acknowledgment
){
// messages.stream()
acknowledgment.acknowledge();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.loopers.interfaces.event.product;

import com.loopers.application.product.ProductFacade;
import com.loopers.confg.kafka.KafkaConfig;
import com.loopers.domain.product.ProductEvent;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@RequiredArgsConstructor
@Component
public class ProductEventListener {

private final ProductFacade productFacade;

@KafkaListener(
topics = {"payment.paid"},
containerFactory = KafkaConfig.BATCH_LISTENER,
groupId = "product"
)
public void handle(
List<ConsumerRecord<Object,Object>> messages,
Acknowledgment acknowledgment
){
// messages.stream()
acknowledgment.acknowledge();
}

@EventListener
public void productViewOutboxHandle(ProductEvent.ProductViewed event) {
productFacade.productViewOutboxHandle(event);
}

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void productViewPublishKafka(ProductEvent.ProductViewed event) {
productFacade.productViewPublishKafka(event);
}
}
5 changes: 4 additions & 1 deletion modules/kafka/src/main/resources/kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ spring:
request.timeout.ms: 20000
retry.backoff.ms: 500
auto:
create.topics.enable: false
create.topics.enable: true
register.schemas: false
offset.reset: latest
use.latest.version: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
acks: all
properties:
enable.idempotence: true
Comment on lines +19 to +20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

โš ๏ธ Potential issue | ๐Ÿ”ด Critical

Kafka ํ”„๋กœ๋“€์„œ ์†์„ฑ์˜ ํ‘œ๊ธฐ๋ฒ•์„ ์ˆ˜์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

Spring Boot์˜ spring.kafka.producer.properties ๋งต ๋‚ด๋ถ€์—์„œ๋Š” ์ (dot)์„ ํฌํ•จํ•œ Kafka ํด๋ผ์ด์–ธํŠธ ์†์„ฑ ์ด๋ฆ„์„ ๋Œ€๊ด„ํ˜ธ ํ‘œ๊ธฐ๋ฒ•์œผ๋กœ ์ž‘์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํ˜„์žฌ enable.idempotence: true๋Š” YAML ํŒŒ์„œ๊ฐ€ ์ค‘์ฒฉ๋œ ํ‚ค๋กœ ํ•ด์„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Based on learnings, Kafka ํด๋ผ์ด์–ธํŠธ ์†์„ฑ์€ ์ •ํ™•ํ•œ ์ด๋ฆ„์œผ๋กœ ์ธ์‹๋˜๋„๋ก ๋Œ€๊ด„ํ˜ธ๋กœ ๊ฐ์‹ธ์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๐Ÿ”Ž ์ˆ˜์ • ์ œ์•ˆ
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
       retries: 3
       acks: all
       properties:
-        enable.idempotence: true
+        "[enable.idempotence]": true
๐Ÿ“ Committable suggestion

โ€ผ๏ธ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
properties:
enable.idempotence: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
acks: all
properties:
"[enable.idempotence]": true
๐Ÿค– Prompt for AI Agents
In modules/kafka/src/main/resources/kafka.yml around lines 19 to 20, the Kafka
producer property key is written as enable.idempotence which YAML may parse as a
nested key; update the key to use Spring Boot's bracket notation for Kafka
client properties (i.e., wrap the full dotted property name in brackets and
quotes as the map key) so the client receives the correct "enable.idempotence"
setting and not a nested YAML structure.

consumer:
group-id: loopers-default-consumer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Expand Down