Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
@ConfigurationProperties(prefix = "app", ignoreUnknownFields = false)
public record ApplicationConfig(
@NotEmpty
String telegramToken
String telegramToken,
KafkaConfigurationInfo kafkaConfigurationInfo
) {
public record KafkaConfigurationInfo(String topicName) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package edu.java.bot.configuration;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaConfiguration {

@Bean
public NewTopic topic(ApplicationConfig config) {
return TopicBuilder.name(config.kafkaConfigurationInfo().topicName() + "_dlq")
.partitions(1)
.replicas(1)
.build();
}
}
20 changes: 20 additions & 0 deletions bot/src/main/java/edu/java/bot/service/KafkaUpdatesListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package edu.java.bot.service;

import edu.java.bot.dto.request.LinkUpdate;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaUpdatesListener {
private final LinkUpdatesSenderService senderService;

@KafkaListener(topics = "${app.kafka-configuration-info.topic-name}", groupId = "bot")
@RetryableTopic(attempts = "1", dltStrategy = DltStrategy.FAIL_ON_ERROR, dltTopicSuffix = "_dlq")
public void listenUpdates(LinkUpdate linkUpdate) {
senderService.sendLinkUpdate(linkUpdate);
}
}
20 changes: 20 additions & 0 deletions bot/src/main/java/edu/java/bot/service/RateLimiterService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package edu.java.bot.service;

import java.util.List;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class RateLimiterService {

private final List<String> whitelist;

public RateLimiterService(@Value("${rate-limiter.whitelist}") List<String> whitelist) {
this.whitelist = whitelist;
}

public boolean isSkipped(String ip) {
return whitelist.contains(ip);
}

}
18 changes: 18 additions & 0 deletions bot/src/main/java/edu/java/bot/util/URLCreator.java
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

сомнительная польза

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что вы имеете ввиду? Довольно удобно было использовать, поэтому я оставил

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Имею в виду, что здесь отдельный класс с единственным методом, представляющим собой 1 строку.)
Ради выноса обработки исключения?
Можешь оставить.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package edu.java.bot.util;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;

public final class URLCreator {
private URLCreator() {
}

public static URL createURL(String link) {
try {
return URI.create(link).toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
}
29 changes: 26 additions & 3 deletions bot/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
app:
telegram-token: ${TELEGRAM_BOT_TOKEN}
telegram-token: ${TELEGRAM_API_KEY}
kafka-configuration-info:
topic-name: updates

spring:
application:
Expand All @@ -13,6 +15,23 @@ spring:
spec: maximumSize=100000,expireAfterAccess=3600s
cache-names:
- rate-limit-bucket
kafka:
consumer:
bootstrap-servers: localhost:29092
group-id: bot
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
spring.json.value.default.type: edu.java.bot.dto.request.LinkUpdate
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false
bootstrap-servers: localhost:29092
bootstrap-servers: localhost:29092

server:
port: 8090
Expand All @@ -31,10 +50,10 @@ retry-query:
retries:
- target: scrapper
type: exponential
max-attempts: 3
max-attempts: 5
min-delay: 1s
max-delay: 10s
codes: 429
codes: 500

bucket4j:
enabled: true
Expand All @@ -49,3 +68,7 @@ bucket4j:
unit: hours
refill-speed: interval
cache-key: getRemoteAddr()
skip-condition: '@rateLimiterService.isSkipped(getRemoteAddr())'

rate-limiter:
whitelist: ${WHITELISTED_IPS:localhost}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package edu.java.bot.kafka;

import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
public class KafkaIntegrationEnvironment {
public static KafkaContainer KAFKA;

static {
KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));
KAFKA.start();
}

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.consumer.bootstrap-servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.producer.bootstrap-servers", KAFKA::getBootstrapServers);
}
}
81 changes: 81 additions & 0 deletions bot/src/test/java/edu/java/bot/kafka/KafkaUpdatesListenerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package edu.java.bot.kafka;


import edu.java.bot.configuration.ApplicationConfig;
import edu.java.bot.dto.request.LinkUpdate;
import edu.java.bot.service.LinkUpdatesSenderService;
import edu.java.bot.util.URLCreator;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.kafka.core.KafkaTemplate;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import static org.awaitility.Awaitility.await;

@SpringBootTest
public class KafkaUpdatesListenerTest extends KafkaIntegrationEnvironment {

@MockBean
private LinkUpdatesSenderService linkUpdatesSenderService;

@Autowired
private KafkaTemplate<String, LinkUpdate> kafkaTemplate;

@Autowired
private ApplicationConfig config;

@Autowired
private KafkaProperties kafkaProperties;

@Test
public void listenUpdateCorrectTest() {
LinkUpdate linkUpdate = new LinkUpdate(
1L,
URLCreator.createURL("https://github.com"),
"github",
List.of(1L),
Map.of()
);
kafkaTemplate.send("updates", linkUpdate);
await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> Mockito.verify(linkUpdatesSenderService, Mockito.times(1))
.sendLinkUpdate(linkUpdate));
}

@Test
public void listenUpdateInCorrectTest() {
LinkUpdate linkUpdate = new LinkUpdate(
1L,
URLCreator.createURL("https://github.com"),
"github",
List.of(1L),
Map.of()
);
Mockito.doThrow(RuntimeException.class).when(linkUpdatesSenderService).sendLinkUpdate(linkUpdate);
KafkaConsumer<String, LinkUpdate> dlqKafkaConsumer = new KafkaConsumer<>(
kafkaProperties.buildConsumerProperties(null)
);
dlqKafkaConsumer.subscribe(List.of(config.kafkaConfigurationInfo().topicName() + "_dlq"));
kafkaTemplate.send(config.kafkaConfigurationInfo().topicName(), linkUpdate);
await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
var values = dlqKafkaConsumer.poll(Duration.ofMillis(100));
Assertions.assertThat(values).hasSize(1);
Assertions.assertThat(values.iterator().next().value()).isEqualTo(linkUpdate);
Mockito.verify(linkUpdatesSenderService).sendLinkUpdate(linkUpdate);
});
}
}
36 changes: 36 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,38 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
volumes:
- zookeeper:/var/lib/zookeeper/data
- zookeeper:/var/lib/zookeeper/log

kafka1:
image: confluentinc/cp-kafka:7.6.0
hostname: kafka1
container_name: kafka1
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092, PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
BOOTSTRAP_SERVERS: kafka1:9092
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
volumes:
- kafka:/var/lib/kafka/data

postgresql:
image: postgres:16
ports:
Expand Down Expand Up @@ -30,6 +64,8 @@ services:

volumes:
postgresql: { }
kafka: { }
zookeeper: { }

networks:
backend: { }
16 changes: 0 additions & 16 deletions retry/src/main/java/edu/java/RetryElement.java

This file was deleted.

5 changes: 3 additions & 2 deletions retry/src/main/java/edu/java/RetryFactory.java
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

доработай, пожалуйста, по дз7, потом сделай git-rebase

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сделаю rebase после ответа по комментарию в дз 7

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
@UtilityClass
public class RetryFactory {

private static final Map<String, Function<RetryElement, Retry>> RETRY_BUILDERS = new HashMap<>();
private static final Map<String, Function<RetryQueryConfiguration.RetryElement, Retry>> RETRY_BUILDERS =
new HashMap<>();

static {
RETRY_BUILDERS.put("fixed", new FixedRetryBuilder());
Expand All @@ -36,6 +37,6 @@ public static ExchangeFilterFunction createFilter(Retry retry) {
public static Retry createRetry(RetryQueryConfiguration config, String target) {
return config.retries().stream().filter(element -> element.target().equals(target)).findFirst()
.map(element -> RETRY_BUILDERS.get(element.type()).apply(element))
.orElseThrow(() -> new RuntimeException("Unknown target " + target));
.orElseThrow(() -> new IllegalStateException("Unknown target " + target));
}
}
12 changes: 12 additions & 0 deletions retry/src/main/java/edu/java/RetryQueryConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
package edu.java;

import java.time.Duration;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

@Validated
@ConfigurationProperties(prefix = "retry-query", ignoreUnknownFields = false)
public record RetryQueryConfiguration(List<RetryElement> retries) {
public record RetryElement(
@NotNull String target,
@NotNull String type,
int maxAttempts,
double factor,
Duration minDelay,
Duration maxDelay,
List<Integer> codes
) {
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package edu.java.builders;

import edu.java.ErrorFilterPredicate;
import edu.java.RetryElement;
import edu.java.RetryQueryConfiguration;
import java.util.function.Function;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public class ExponentialRetryBuilder implements Function<RetryElement, Retry> {
public class ExponentialRetryBuilder implements Function<RetryQueryConfiguration.RetryElement, Retry> {
@Override
public Retry apply(RetryElement retryElement) {
public Retry apply(RetryQueryConfiguration.RetryElement retryElement) {
return RetryBackoffSpec.backoff(retryElement.maxAttempts(), retryElement.minDelay())
.maxBackoff(retryElement.maxDelay())
.filter(new ErrorFilterPredicate(retryElement.codes()));
Expand Down
6 changes: 3 additions & 3 deletions retry/src/main/java/edu/java/builders/FixedRetryBuilder.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package edu.java.builders;

import edu.java.ErrorFilterPredicate;
import edu.java.RetryElement;
import edu.java.RetryQueryConfiguration;
import java.util.function.Function;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public class FixedRetryBuilder implements Function<RetryElement, Retry> {
public class FixedRetryBuilder implements Function<RetryQueryConfiguration.RetryElement, Retry> {
@Override
public Retry apply(RetryElement retryElement) {
public Retry apply(RetryQueryConfiguration.RetryElement retryElement) {
return RetryBackoffSpec.fixedDelay(retryElement.maxAttempts(), retryElement.minDelay())
.filter(new ErrorFilterPredicate(retryElement.codes()));
}
Expand Down
Loading