Add send event to kafaka about post create#3536
Add send event to kafaka about post create#3536Pavel-Lukyanov wants to merge 3 commits intotriton-master-stream12from
Conversation
|
|
|
|
|
|
|
|
||
| do { | ||
| Pageable pageable = PageRequest.of(page, PAGE_SIZE); | ||
| subscriberPage = userRepository.findFollowerIdsPaged(event.getAuthorId(), pageable); |
There was a problem hiding this comment.
Душка, но чисто добавлю, что по сути ты должен ходить в сервис юзеров и оттуда брать id юзеров
|
|
||
| ack.acknowledge(); | ||
| log.info("Finished processing PostCreatedEvent for postId={}", event.getId()); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Слишком обобщенное исключение, много операций и не понятно что может вылететь, мб несколько сценариев как-то обработать
| private Map<OutboxEventType, OutboxEventPublisher> publisherMap; | ||
|
|
||
| @PostConstruct | ||
| void init() { |
| public class OutboxPublisher { | ||
|
|
||
| private final OutboxRepository outboxRepository; | ||
| private final List<OutboxEventPublisher> publishers; |
There was a problem hiding this comment.
Ставлю класс, задумка на перспективу
| Map<String, Object> props = new HashMap<>(); | ||
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); | ||
| props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
There was a problem hiding this comment.
Если в топике отправляются JSON объекты, они будут десериализованы как строки, а не как объекты.
There was a problem hiding this comment.
Да, это действительно так, в слушателе я потом делаю десериализацию в объект, т.к дублировать конфиг консьюмера под каждый топик в дальнейшим мне кажется излишним из-за типов данных.
Направь пожалуйста в правильное русло, т.к тут действительно пробел в знаниях имеется)
| private final String topic; | ||
|
|
||
| @Async | ||
| public void publish(Object message) { |
There was a problem hiding this comment.
Спасибо большое! Учел этот момент, теперь понимаю зачем дженерики нужны, чтобы нельзя было другой объект подсунуть и на этапе компиляции можно было отловить ошибку типов. Поправил)
| .whenComplete((result, ex) -> { | ||
| if (ex != null) { | ||
| log.error("Failed to send message: {}", message, ex); | ||
| throw new KafkaSendMessageException( |
There was a problem hiding this comment.
нормально ошибка выбрасывается? она точно тут нужна?
There was a problem hiding this comment.
Нет, из-за асинхронности отправитель не ловит ее, переделал на синхронную отправку
| objectMapper.readValue(payload, PostToFeedEvent.class); | ||
|
|
||
| postToFeedProducer.publish(event); | ||
| } catch (Exception e) { |
|
|
||
| @Transactional | ||
| @KafkaListener(topics = "${kafka.topics.post-created:post.created}", groupId = "user-service-group") | ||
| public void listen(String message, Acknowledgment ack) { |
There was a problem hiding this comment.
При большом количестве подписчиков будет создано много outbox событий в одной транзакции
saveAndFlush на каждой итерации сбрасывает изменения в БД, что очень медленно
Вся обработка в одной транзакции может привести к:
Длинным блокировкам
Out of memory
There was a problem hiding this comment.
Действительно, хотел оптимизировать, а получилось как всегда)
Поправил
|
|
||
| @Transactional | ||
| @KafkaListener(topics = "${kafka.topics.post-created:post.created}", groupId = "user-service-group") | ||
| public void listen(String message, Acknowledgment ack) { |
There was a problem hiding this comment.
что будет если будет ошибка на бд? и транзакция откатится
There was a problem hiding this comment.
Будет плохо)
Переделал, чтобы каждая пачка подписчиков была в отдельной транзакции
No description provided.