diff --git a/.gitignore b/.gitignore
index c2065bc..eb9b10f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,3 +35,6 @@ out/
### VS Code ###
.vscode/
+
+
+db
\ No newline at end of file
diff --git a/.run/mail8080.run.xml b/.run/mail8080.run.xml
new file mode 100644
index 0000000..676610e
--- /dev/null
+++ b/.run/mail8080.run.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8081.run.xml b/.run/mail8081.run.xml
new file mode 100644
index 0000000..1e5e478
--- /dev/null
+++ b/.run/mail8081.run.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8082.run.xml b/.run/mail8082.run.xml
new file mode 100644
index 0000000..597e4f0
--- /dev/null
+++ b/.run/mail8082.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8083.run.xml b/.run/mail8083.run.xml
new file mode 100644
index 0000000..95c15ea
--- /dev/null
+++ b/.run/mail8083.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8084.run.xml b/.run/mail8084.run.xml
new file mode 100644
index 0000000..d22381a
--- /dev/null
+++ b/.run/mail8084.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8085.run.xml b/.run/mail8085.run.xml
new file mode 100644
index 0000000..84beb42
--- /dev/null
+++ b/.run/mail8085.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8086.run.xml b/.run/mail8086.run.xml
new file mode 100644
index 0000000..9a1675d
--- /dev/null
+++ b/.run/mail8086.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8087.run.xml b/.run/mail8087.run.xml
new file mode 100644
index 0000000..70a5053
--- /dev/null
+++ b/.run/mail8087.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8088.run.xml b/.run/mail8088.run.xml
new file mode 100644
index 0000000..8ab92b8
--- /dev/null
+++ b/.run/mail8088.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8089.run.xml b/.run/mail8089.run.xml
new file mode 100644
index 0000000..23fd19d
--- /dev/null
+++ b/.run/mail8089.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8090.run.xml b/.run/mail8090.run.xml
new file mode 100644
index 0000000..be303c3
--- /dev/null
+++ b/.run/mail8090.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8091.run.xml b/.run/mail8091.run.xml
new file mode 100644
index 0000000..7f2085f
--- /dev/null
+++ b/.run/mail8091.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8092.run.xml b/.run/mail8092.run.xml
new file mode 100644
index 0000000..d8428f9
--- /dev/null
+++ b/.run/mail8092.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8093.run.xml b/.run/mail8093.run.xml
new file mode 100644
index 0000000..3b63c6b
--- /dev/null
+++ b/.run/mail8093.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8094.run.xml b/.run/mail8094.run.xml
new file mode 100644
index 0000000..7c5a44c
--- /dev/null
+++ b/.run/mail8094.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8095.run.xml b/.run/mail8095.run.xml
new file mode 100644
index 0000000..886628e
--- /dev/null
+++ b/.run/mail8095.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8096.run.xml b/.run/mail8096.run.xml
new file mode 100644
index 0000000..15fadc7
--- /dev/null
+++ b/.run/mail8096.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8097.run.xml b/.run/mail8097.run.xml
new file mode 100644
index 0000000..0776cb7
--- /dev/null
+++ b/.run/mail8097.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8098.run.xml b/.run/mail8098.run.xml
new file mode 100644
index 0000000..34c8dc7
--- /dev/null
+++ b/.run/mail8098.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.run/mail8099.run.xml b/.run/mail8099.run.xml
new file mode 100644
index 0000000..7ccc96d
--- /dev/null
+++ b/.run/mail8099.run.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index ee2c891..959e95e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,6 +27,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-mail'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
+ implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
runtimeOnly 'com.mysql:mysql-connector-j'
diff --git a/docker-compose.yml b/docker-compose.yml
index 1cb29f3..193e84e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -22,3 +22,18 @@ services:
ports:
- "11025:1025"
- "18025:8025"
+ zookeeper:
+ image: wurstmeister/zookeeper
+ container_name: zookeeper
+ ports:
+ - "2181:2181"
+ kafka:
+ image: wurstmeister/kafka:2.12-2.5.0
+ container_name: kafka
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
diff --git a/src/main/java/com/aengdulab/distributedmail/DistributedLock.java b/src/main/java/com/aengdulab/distributedmail/DistributedLock.java
new file mode 100644
index 0000000..34e71dd
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/DistributedLock.java
@@ -0,0 +1,45 @@
+package com.aengdulab.distributedmail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DistributedLock {
+
+ public boolean tryLock(Connection connection, String key, int timeout) {
+ String sql = "select get_lock(?, ?)";
+
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.setInt(2, timeout);
+ return getResult(preparedStatement);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean getResult(PreparedStatement preparedStatement) throws SQLException {
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ return false;
+ }
+
+ int result = resultSet.getInt(1);
+ return result == 1;
+ }
+ }
+
+ public void releaseLock(Connection connection, String key) {
+ String sql = "select release_lock(?)";
+
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, key);
+ preparedStatement.execute();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/DistributedMailApplication.java b/src/main/java/com/aengdulab/distributedmail/DistributedMailApplication.java
index 151c0a6..a373ba7 100644
--- a/src/main/java/com/aengdulab/distributedmail/DistributedMailApplication.java
+++ b/src/main/java/com/aengdulab/distributedmail/DistributedMailApplication.java
@@ -1,7 +1,9 @@
package com.aengdulab.distributedmail;
+import java.util.concurrent.CountDownLatch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class DistributedMailApplication {
@@ -10,4 +12,10 @@ public static void main(String[] args) {
SpringApplication.run(DistributedMailApplication.class, args);
}
+ @Bean
+ public GlobalLatch globalLatch() {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ return new GlobalLatch(latch);
+ }
}
diff --git a/src/main/java/com/aengdulab/distributedmail/DistributedSupport.java b/src/main/java/com/aengdulab/distributedmail/DistributedSupport.java
new file mode 100644
index 0000000..e5ebdb2
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/DistributedSupport.java
@@ -0,0 +1,18 @@
+package com.aengdulab.distributedmail;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.stereotype.Component;
+
+@Getter
+@Setter
+@Component
+public class DistributedSupport {
+
+ private int index;
+ private int count;
+
+ public boolean isMine(Long id) {
+ return index == ((id % count) + 1);
+ }
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/FollowerMessage.java b/src/main/java/com/aengdulab/distributedmail/FollowerMessage.java
new file mode 100644
index 0000000..a8ec304
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/FollowerMessage.java
@@ -0,0 +1,4 @@
+package com.aengdulab.distributedmail;
+
+public record FollowerMessage(String address) {
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/FollowerTask.java b/src/main/java/com/aengdulab/distributedmail/FollowerTask.java
new file mode 100644
index 0000000..eb41a6f
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/FollowerTask.java
@@ -0,0 +1,50 @@
+package com.aengdulab.distributedmail;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import org.springframework.core.env.Environment;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class FollowerTask {
+
+ private final ApplicationContext applicationContext;
+ private final GlobalLatch globalLatch;
+ private final DistributedSupport distributedSupport;
+ private final KafkaTemplate kafkaTemplate;
+
+ public void start() {
+ log.info("clientTask started");
+ String address = createAddress();
+ FollowerMessage message = new FollowerMessage(address);
+ kafkaTemplate.send("loyalty", message);
+ globalLatch.await();
+ }
+
+ @KafkaListener(topics = "leader", groupId = "consumerGroup-" + "#{T(java.util.UUID).randomUUID()})")
+ public void receiveMessage(LeaderMessage leaderMessage) {
+ String address = createAddress();
+ if (address.equals(leaderMessage.address())) {
+ distributedSupport.setIndex(leaderMessage.index());
+ distributedSupport.setCount(leaderMessage.total());
+ globalLatch.countDown();
+ }
+ }
+
+ private String createAddress() {
+ try {
+ String ip = InetAddress.getLocalHost().getHostAddress();
+ int port = applicationContext.getBean(Environment.class).getProperty("server.port", Integer.class, 8080);
+ return ip + ":" + port;
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/GlobalLatch.java b/src/main/java/com/aengdulab/distributedmail/GlobalLatch.java
new file mode 100644
index 0000000..9f29165
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/GlobalLatch.java
@@ -0,0 +1,24 @@
+package com.aengdulab.distributedmail;
+
+import java.util.concurrent.CountDownLatch;
+
+public class GlobalLatch {
+
+ private final CountDownLatch latch;
+
+ public GlobalLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void await() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void countDown() {
+ latch.countDown();
+ }
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/LeaderMessage.java b/src/main/java/com/aengdulab/distributedmail/LeaderMessage.java
new file mode 100644
index 0000000..e0a08e0
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/LeaderMessage.java
@@ -0,0 +1,4 @@
+package com.aengdulab.distributedmail;
+
+public record LeaderMessage(String address, int total, int index) {
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/LeaderTask.java b/src/main/java/com/aengdulab/distributedmail/LeaderTask.java
new file mode 100644
index 0000000..386e9b9
--- /dev/null
+++ b/src/main/java/com/aengdulab/distributedmail/LeaderTask.java
@@ -0,0 +1,45 @@
+package com.aengdulab.distributedmail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class LeaderTask {
+
+ private final DistributedSupport distributedSupport;
+ private final KafkaTemplate kafkaTemplate;
+ private final List messages = Collections.synchronizedList(new ArrayList<>());
+
+ public void start() {
+ log.info("leaderTask started");
+ sleep(5000);
+
+ distributedSupport.setIndex(1);
+ distributedSupport.setCount(messages.size() + 1);
+ for (int i = 0; i < messages.size(); i++) {
+ LeaderMessage leaderMessage = new LeaderMessage(messages.get(i).address(), messages.size() + 1, i + 2);
+ kafkaTemplate.send("leader", leaderMessage);
+ }
+ }
+
+ private void sleep(int millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @KafkaListener(topics = "loyalty", groupId = "consumerGroup-" + "#{T(java.util.UUID).randomUUID()})")
+ public void receiveMessage(FollowerMessage followerMessage) {
+ messages.add(followerMessage);
+ }
+}
diff --git a/src/main/java/com/aengdulab/distributedmail/repository/QuestionRepository.java b/src/main/java/com/aengdulab/distributedmail/repository/QuestionRepository.java
index 233dd88..16fca72 100644
--- a/src/main/java/com/aengdulab/distributedmail/repository/QuestionRepository.java
+++ b/src/main/java/com/aengdulab/distributedmail/repository/QuestionRepository.java
@@ -1,7 +1,7 @@
package com.aengdulab.distributedmail.repository;
-import com.aengdulab.distributedmail.domain.Question;
import java.util.Optional;
+import com.aengdulab.distributedmail.domain.Question;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
diff --git a/src/main/java/com/aengdulab/distributedmail/service/SendQuestionScheduler.java b/src/main/java/com/aengdulab/distributedmail/service/SendQuestionScheduler.java
index d80208d..c64d126 100644
--- a/src/main/java/com/aengdulab/distributedmail/service/SendQuestionScheduler.java
+++ b/src/main/java/com/aengdulab/distributedmail/service/SendQuestionScheduler.java
@@ -1,10 +1,17 @@
package com.aengdulab.distributedmail.service;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+import com.aengdulab.distributedmail.DistributedLock;
+import com.aengdulab.distributedmail.DistributedSupport;
+import com.aengdulab.distributedmail.FollowerTask;
+import com.aengdulab.distributedmail.LeaderTask;
import com.aengdulab.distributedmail.domain.Subscribe;
import com.aengdulab.distributedmail.domain.SubscribeQuestionMessage;
import com.aengdulab.distributedmail.repository.SubscribeRepository;
-import java.util.List;
-import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
@@ -18,16 +25,48 @@ public class SendQuestionScheduler {
private final QuestionSender questionSender;
private final SubscribeRepository subscribeRepository;
+ private final DistributedLock distributedLock;
+ private final DataSource dataSource;
+ private final FollowerTask followerTask;
+ private final LeaderTask leaderTask;
+ private final DistributedSupport distributedSupport;
@Transactional
@Scheduled(cron = "0 0 9 * * *", zone = "Asia/Seoul")
public void sendQuestion() {
- List subscribes = subscribeRepository.findAll();
- sendQuestionMails(subscribes);
+ Connection connection;
+ try {
+ connection = dataSource.getConnection();
+ boolean isLeaderNode = distributedLock.tryLock(connection, "leader", 0);
+ detectNodes(isLeaderNode);
+ releaseLock(connection, isLeaderNode);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ log.info("distributed support index = {} count = {}", distributedSupport.getIndex(), distributedSupport.getCount());
+ List subscribes = subscribeRepository.findAll();
+ sendQuestionMails(subscribes);
+ }
+ }
+
+ private void detectNodes(boolean isLeaderNode) {
+ if (isLeaderNode) {
+ leaderTask.start();
+ return;
+ }
+
+ followerTask.start();
+ }
+
+ private void releaseLock(Connection connection, boolean isLeaderNode) {
+ if (isLeaderNode) {
+ distributedLock.releaseLock(connection, "leader");
+ }
}
private void sendQuestionMails(List subscribes) {
subscribes.stream()
+ .filter(subscribe -> distributedSupport.isMine(subscribe.getId()))
.flatMap(subscribe -> choiceQuestion(subscribe).stream())
.forEach(questionSender::sendQuestion);
}
diff --git a/src/main/java/com/aengdulab/distributedmail/service/SubscribeQuestionSequenceScheduler.java b/src/main/java/com/aengdulab/distributedmail/service/SubscribeQuestionSequenceScheduler.java
index 638900a..3259235 100644
--- a/src/main/java/com/aengdulab/distributedmail/service/SubscribeQuestionSequenceScheduler.java
+++ b/src/main/java/com/aengdulab/distributedmail/service/SubscribeQuestionSequenceScheduler.java
@@ -1,10 +1,10 @@
package com.aengdulab.distributedmail.service;
+import java.util.List;
import com.aengdulab.distributedmail.domain.Question;
import com.aengdulab.distributedmail.domain.Subscribe;
import com.aengdulab.distributedmail.repository.QuestionRepository;
import com.aengdulab.distributedmail.repository.SubscribeRepository;
-import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index b660901..4bc301d 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -4,7 +4,8 @@ spring:
url: jdbc:mysql://localhost:13306/mail?useSSL=false&serverTimezone=Asia/Seoul&characterEncoding=UTF-8&allowPublicKeyRetrieval=true
username: root
password: root
-
+ hikari:
+ maximum-pool-size: 3
jpa:
properties:
hibernate:
@@ -23,3 +24,13 @@ spring:
auth: false
starttls:
enable: false
+ kafka:
+ bootstrap-servers:
+ - "127.0.0.1:9092"
+ producer:
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ consumer:
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring.json.trusted.packages: "*"
+ spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
diff --git a/src/test/java/com/aengdulab/distributedmail/MultiServerRequestTest.java b/src/test/java/com/aengdulab/distributedmail/MultiServerRequestTest.java
index e4205b0..b7f7884 100644
--- a/src/test/java/com/aengdulab/distributedmail/MultiServerRequestTest.java
+++ b/src/test/java/com/aengdulab/distributedmail/MultiServerRequestTest.java
@@ -37,7 +37,12 @@ class MultiServerRequestTest {
/*
서버를 다중화할 경우, 새로운 요청에 대한 추가 포트를 설정
*/
- private static final List serverPorts = List.of(8080, 9090, 9999);
+ private static final List serverPorts = List.of(
+ 8080, 8081, 8082, 8083, 8084,
+ 8085, 8086, 8087, 8088, 8089,
+ 8090, 8091, 8092, 8093, 8094,
+ 8095, 8096, 8097, 8098, 8099
+ );
@Autowired
private SentMailEventRepository sentMailEventRepository;
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
new file mode 100644
index 0000000..fa9819b
--- /dev/null
+++ b/src/test/resources/application.yml
@@ -0,0 +1,35 @@
+spring:
+ datasource:
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ url: jdbc:mysql://localhost:13306/mail?useSSL=false&serverTimezone=Asia/Seoul&characterEncoding=UTF-8&allowPublicKeyRetrieval=true
+ username: root
+ password: root
+
+ jpa:
+ properties:
+ hibernate:
+ format_sql: true
+ show_sql: false
+ hbm2ddl.auto: none
+ mail:
+ host: localhost
+ port: 11025
+ console-port: 18025
+ username:
+ password:
+ properties:
+ mail:
+ smtp:
+ auth: false
+ starttls:
+ enable: false
+ kafka:
+ bootstrap-servers:
+ - "127.0.0.1:9092"
+ producer:
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ consumer:
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring.json.trusted.packages: "*"
+ spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer