diff --git a/Application/src/main/java/Application/Models/Entities/BankAccount.java b/Application/src/main/java/Application/Models/Entities/BankAccount.java index 84a2a27..deaf09e 100644 --- a/Application/src/main/java/Application/Models/Entities/BankAccount.java +++ b/Application/src/main/java/Application/Models/Entities/BankAccount.java @@ -1,5 +1,6 @@ package Application.Models.Entities; +import com.fasterxml.jackson.annotation.JsonIgnore; import jakarta.persistence.*; import lombok.*; @@ -18,11 +19,13 @@ public class BankAccount { private Double balance; @Column(name = "userlogin", nullable = false) + @Setter private String userLogin; @ManyToOne @Setter @JoinColumn(name = "user_id", nullable = false) + @JsonIgnore private User user; public BankAccount(User user) { diff --git a/Application/src/main/java/Application/Models/Entities/User.java b/Application/src/main/java/Application/Models/Entities/User.java index 7d23605..a22e3c3 100644 --- a/Application/src/main/java/Application/Models/Entities/User.java +++ b/Application/src/main/java/Application/Models/Entities/User.java @@ -1,5 +1,6 @@ package Application.Models.Entities; +import com.fasterxml.jackson.annotation.JsonManagedReference; import jakarta.persistence.*; import lombok.*; import Application.Models.Enums.HairColor; @@ -35,6 +36,7 @@ public class User { @OneToMany(mappedBy = "user", cascade = {CascadeType.PERSIST, CascadeType.MERGE, CascadeType.REMOVE} , orphanRemoval = true, fetch = FetchType.EAGER) + @JsonManagedReference private List bankAccounts = new ArrayList<>(); @ManyToMany(fetch = FetchType.EAGER) diff --git a/Presentation/pom.xml b/Presentation/pom.xml index 310d0e3..7272f96 100644 --- a/Presentation/pom.xml +++ b/Presentation/pom.xml @@ -21,8 +21,6 @@ - - org.junit.jupiter junit-jupiter @@ -30,8 +28,6 @@ test - - org.projectlombok lombok @@ -70,6 +66,23 @@ spring-boot-starter-security + + org.springframework.kafka + spring-kafka + 3.3.4 + + + + org.apache.kafka + kafka_2.13 + 4.0.0 + + + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + diff --git a/Presentation/src/main/java/Presentation/Configs/AppConfig.java b/Presentation/src/main/java/Presentation/Configs/AppConfig.java index c9d1f91..ba71f0a 100644 --- a/Presentation/src/main/java/Presentation/Configs/AppConfig.java +++ b/Presentation/src/main/java/Presentation/Configs/AppConfig.java @@ -7,6 +7,7 @@ import DataAccess.Services.Interfaces.IUserService; import Presentation.Interfaces.IBaseController; import Presentation.Controllers.BaseController; +import Presentation.Kafka.Services.KafkaProducerService; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -26,8 +27,8 @@ public BaseController baseController( IUserManager userManager, IUserService userService, IBankAccountService bankAccountService, - IOperationService operationService) { - return new BaseController(userManager, userService, bankAccountService, operationService); + IOperationService operationService, KafkaProducerService kafkaProducerService) { + return new BaseController(userManager, userService, bankAccountService, operationService, kafkaProducerService); } @Bean diff --git a/Presentation/src/main/java/Presentation/Controllers/BankAccountDTOController.java b/Presentation/src/main/java/Presentation/Controllers/BankAccountDTOController.java index be441f4..0a1ea61 100644 --- a/Presentation/src/main/java/Presentation/Controllers/BankAccountDTOController.java +++ b/Presentation/src/main/java/Presentation/Controllers/BankAccountDTOController.java @@ -1,8 +1,10 @@ package Presentation.Controllers; import Application.Models.Entities.BankAccount; +import Application.Models.Entities.User; import Application.ResultTypes.BankAccountResult; import Presentation.DTO.BankAccountDTO; +import Presentation.DTO.CreateBankAccountDTO; import Presentation.Interfaces.IBaseController; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -15,13 +17,13 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; - @RestController -@RequestMapping("/bankaccounts") +@RequestMapping(value = "/bankaccounts", produces = "application/json") public class BankAccountDTOController { private final IBaseController baseController; + @Autowired public BankAccountDTOController(IBaseController baseController) { this.baseController = baseController; @@ -32,7 +34,7 @@ public BankAccountDTOController(IBaseController baseController) { @ApiResponse(responseCode = "200", description = "Счет найден", content = @Content(mediaType = "application/json", schema = @Schema(implementation = BankAccountDTO.class))), @ApiResponse(responseCode = "404", description = "Счет не найден") }) - @GetMapping("/{id}") + @GetMapping(value = "/{id}") public ResponseEntity getBankAccountById(@Parameter(description = "ID счета") @PathVariable int id) { BankAccount account = baseController.GetBankAccountById(id); if (account == null) { @@ -47,12 +49,23 @@ public ResponseEntity getBankAccountById(@Parameter(description @ApiResponse(responseCode = "400", description = "Некорректные данные") }) @PostMapping(consumes = "application/json", produces = "application/json") - public ResponseEntity createBankAccount(@RequestBody BankAccount account) { + public ResponseEntity createBankAccount(@RequestBody CreateBankAccountDTO dto) { try { + BankAccount account = new BankAccount(); account.setId(null); - BankAccountResult result = baseController.AddBankAccount(account.getUser().getId(), account); + account.setBalance(dto.getBalance()); + + User user = baseController.GetUserById(dto.getUserId()); + if (user == null) { + return ResponseEntity.badRequest().body("User not found with ID: " + dto.getUserId()); + } + + account.setUser(user); + account.setUserLogin(user.getLogin()); + + BankAccountResult result = baseController.AddBankAccount(user.getId(), account); if (result instanceof BankAccountResult.Success) { - return ResponseEntity.status(HttpStatus.CREATED).body(result); + return ResponseEntity.status(HttpStatus.CREATED).body(new BankAccountDTO(account)); } else { return ResponseEntity.badRequest().body(result); } @@ -63,39 +76,37 @@ public ResponseEntity createBankAccount(@RequestBody BankAccount account) { } - @Operation(summary = "Обновить данные счета", description = "Обновляет данные о счете по ID") @ApiResponses(value = { @ApiResponse(responseCode = "200", description = "Счет обновлен", content = @Content(mediaType = "application/json", schema = @Schema(implementation = BankAccountDTO.class))), @ApiResponse(responseCode = "404", description = "Счет не найден"), @ApiResponse(responseCode = "400", description = "Некорректные данные") }) - @PutMapping("/{id}") + @PutMapping(value = "/{id}", consumes = "application/json", produces = "application/json") public ResponseEntity updateBankAccount(@Parameter(description = "ID счета") @PathVariable int id, - @RequestBody BankAccount bankAccount) { + @RequestBody BankAccount account) { BankAccount existingAccount = baseController.GetBankAccountById(id); if (existingAccount == null) { return ResponseEntity.notFound().build(); } - existingAccount.setBalance(bankAccount.getBalance()); - existingAccount.setUser(bankAccount.getUser()); + existingAccount.setBalance(account.getBalance()); + existingAccount.setUser(account.getUser()); BankAccountResult result = baseController.UpdateBankAccount(existingAccount); if (result instanceof BankAccountResult.Success) { return ResponseEntity.ok(new BankAccountDTO(existingAccount)); } else { - return ResponseEntity.badRequest().body(result); + return ResponseEntity.status(HttpStatus.CREATED).body(new BankAccountDTO(account)); } } - @Operation(summary = "Удалить счет", description = "Удаляет счет по ID") @ApiResponses(value = { @ApiResponse(responseCode = "204", description = "Счет удален"), @ApiResponse(responseCode = "404", description = "Счет не найден") }) - @DeleteMapping("/{id}") + @DeleteMapping(value = "/{id}") public ResponseEntity deleteBankAccount(@Parameter(description = "ID счета") @PathVariable int id) { BankAccount account = baseController.GetBankAccountById(id); if (account == null) { @@ -105,7 +116,7 @@ public ResponseEntity deleteBankAccount(@Parameter(description = "ID счет if (result instanceof BankAccountResult.Success) { return ResponseEntity.noContent().build(); } else { - return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result); + return ResponseEntity.status(HttpStatus.CREATED).body(new BankAccountDTO(account)); } } } diff --git a/Presentation/src/main/java/Presentation/Controllers/BaseController.java b/Presentation/src/main/java/Presentation/Controllers/BaseController.java index 06cc749..8799080 100644 --- a/Presentation/src/main/java/Presentation/Controllers/BaseController.java +++ b/Presentation/src/main/java/Presentation/Controllers/BaseController.java @@ -14,6 +14,7 @@ import DataAccess.Services.Interfaces.IBankAccountService; import DataAccess.Services.Interfaces.IOperationService; import DataAccess.Services.Interfaces.IUserService; +import Presentation.Kafka.Services.KafkaProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -26,28 +27,36 @@ public class BaseController implements IBaseController { private final IUserService userService; private final IBankAccountService bankAccountService; private final IOperationService operationService; + private final KafkaProducerService kafkaProducerService; @Autowired - public BaseController(IUserManager userManager, IUserService userService, IBankAccountService bankAccountService, IOperationService operationService) { + public BaseController(IUserManager userManager, IUserService userService, IBankAccountService bankAccountService, IOperationService operationService, KafkaProducerService kafkaProducerService) { this.userManager = userManager; this.userService = userService; this.bankAccountService = bankAccountService; this.operationService = operationService; + this.kafkaProducerService = kafkaProducerService; } @Override public UserResult CreateUser(User user) { - if (user == null || user.getLogin() == null || user.getName() == null) { - return new UserResult.UserCreationError("Некорректные данные пользователя"); + try { + if (user == null || user.getLogin() == null || user.getName() == null) { + return new UserResult.UserCreationError("Некорректные данные пользователя"); + } + userService.SaveUser(user); + kafkaProducerService.sendEvent("client-topic", String.valueOf(user.getId()), user); + return new UserResult.Success(); + } catch (Exception e) { + return new UserResult.UserCreationError(e.getMessage()); } - userService.SaveUser(user); - return new UserResult.Success(); } @Override public UserResult UpdateUser(User user) { try { userService.SaveUser(user); + kafkaProducerService.sendEvent("client-topic", String.valueOf(user.getId()), user); return new UserResult.Success(); } catch (Exception e) { return new UserResult.UserUpdateError(e.getMessage()); @@ -162,6 +171,7 @@ public BankAccountResult AddBankAccount(int userId, BankAccount bankAccount) { bankAccount.setId(null); userManager.AddBankAccount(user, bankAccount); userService.SaveUser(user); + kafkaProducerService.sendEvent("account-topic", String.valueOf(bankAccount.getId()), bankAccount); return new BankAccountResult.Success(); } catch (Exception e) { return new BankAccountResult.BankAccountCreationError(e.getMessage()); @@ -175,6 +185,7 @@ public BankAccountResult UpdateBankAccount(BankAccount bankAccount) { return new BankAccountResult.BankAccountUpdateError("Некорректные данные счета"); } bankAccountService.UpdateAccount(bankAccount); + kafkaProducerService.sendEvent("account-topic", String.valueOf(bankAccount.getId()), bankAccount); return new BankAccountResult.Success(); } catch (Exception e) { return new BankAccountResult.BankAccountUpdateError(e.getMessage()); diff --git a/Presentation/src/main/java/Presentation/Controllers/UserDTOController.java b/Presentation/src/main/java/Presentation/Controllers/UserDTOController.java index 0faeafc..2ed1a7a 100644 --- a/Presentation/src/main/java/Presentation/Controllers/UserDTOController.java +++ b/Presentation/src/main/java/Presentation/Controllers/UserDTOController.java @@ -16,7 +16,7 @@ import org.springframework.web.bind.annotation.*; @RestController -@RequestMapping("/users") +@RequestMapping(value = "/users", produces = "application/json") public class UserDTOController { private final IBaseController baseController; @@ -31,7 +31,7 @@ public UserDTOController(IBaseController baseController) { @ApiResponse(responseCode = "200", description = "Пользователь найден", content = @Content(mediaType = "application/json", schema = @Schema(implementation = UserDTO.class))), @ApiResponse(responseCode = "404", description = "Пользователь не найден") }) - @GetMapping("/{id}") + @GetMapping(value = "/{id}", produces = "application/json") public ResponseEntity getUserById(@Parameter(description = "ID пользователя") @PathVariable int id) { User user = baseController.GetUserById(id); if (user == null) { @@ -45,7 +45,7 @@ public ResponseEntity getUserById(@Parameter(description = "ID поль @ApiResponse(responseCode = "201", description = "Пользователь создан"), @ApiResponse(responseCode = "400", description = "Некорректные данные") }) - @PostMapping("/create") + @PostMapping(value = "/create", consumes = "application/json", produces = "application/json") public ResponseEntity createUser(@RequestBody User user) { UserResult result = baseController.CreateUser(user); if (result instanceof UserResult.Success) { @@ -61,7 +61,7 @@ public ResponseEntity createUser(@RequestBody User user) { @ApiResponse(responseCode = "404", description = "Пользователь не найден"), @ApiResponse(responseCode = "400", description = "Некорректные данные") }) - @PutMapping("/update/{id}") + @PutMapping(value = "/update/{id}", consumes = "application/json", produces = "application/json") public ResponseEntity updateUser(@Parameter(description = "ID пользователя") @PathVariable int id, @RequestBody User user) { User existingUser = baseController.GetUserById(id); if (existingUser == null) { @@ -80,7 +80,7 @@ public ResponseEntity updateUser(@Parameter(description = "ID пользов @ApiResponse(responseCode = "204", description = "Пользователь удален"), @ApiResponse(responseCode = "404", description = "Пользователь не найден") }) - @DeleteMapping("/delete/{id}") + @DeleteMapping(value = "/delete/{id}", produces = "application/json") public ResponseEntity deleteUser(@Parameter(description = "ID пользователя") @PathVariable int id) { User user = baseController.GetUserById(id); if (user == null) { @@ -93,4 +93,4 @@ public ResponseEntity deleteUser(@Parameter(description = "ID пользов return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result); } } -} \ No newline at end of file +} diff --git a/Presentation/src/main/java/Presentation/DTO/CreateBankAccountDTO.java b/Presentation/src/main/java/Presentation/DTO/CreateBankAccountDTO.java new file mode 100644 index 0000000..32b0d59 --- /dev/null +++ b/Presentation/src/main/java/Presentation/DTO/CreateBankAccountDTO.java @@ -0,0 +1,10 @@ +package Presentation.DTO; + +import lombok.Data; + +@Data +public class CreateBankAccountDTO { + private Integer id; + private double balance; + private Integer userId; +} diff --git a/Presentation/src/main/java/Presentation/Kafka/Configs/KafkaProducerConfig.java b/Presentation/src/main/java/Presentation/Kafka/Configs/KafkaProducerConfig.java new file mode 100644 index 0000000..4a6004a --- /dev/null +++ b/Presentation/src/main/java/Presentation/Kafka/Configs/KafkaProducerConfig.java @@ -0,0 +1,28 @@ +package Presentation.Kafka.Configs; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.*; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + @Bean + public ProducerFactory producerFactory() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(config); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/Presentation/src/main/java/Presentation/Kafka/Services/KafkaProducerService.java b/Presentation/src/main/java/Presentation/Kafka/Services/KafkaProducerService.java new file mode 100644 index 0000000..232266b --- /dev/null +++ b/Presentation/src/main/java/Presentation/Kafka/Services/KafkaProducerService.java @@ -0,0 +1,26 @@ +package Presentation.Kafka.Services; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper = new ObjectMapper(); + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void sendEvent(String topic, String key, T payload) { + try { + String json = objectMapper.writeValueAsString(payload); + kafkaTemplate.send(topic, key, json); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} diff --git a/StorageService/pom.xml b/StorageService/pom.xml new file mode 100644 index 0000000..bec3162 --- /dev/null +++ b/StorageService/pom.xml @@ -0,0 +1,53 @@ + + + 4.0.0 + + com.example + lim0sha + 1.0-SNAPSHOT + + + StorageService + + + 21 + 21 + UTF-8 + + + + + org.projectlombok + lombok + 1.18.30 + provided + + + + org.springframework.kafka + spring-kafka + 3.3.4 + + + + org.apache.kafka + kafka_2.13 + 4.0.0 + + + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + + + io.swagger.core.v3 + swagger-annotations-jakarta + 2.2.7 + compile + + + + \ No newline at end of file diff --git a/StorageService/src/main/java/StorageService/Configs/KafkaConsumerConfig.java b/StorageService/src/main/java/StorageService/Configs/KafkaConsumerConfig.java new file mode 100644 index 0000000..cdaa310 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Configs/KafkaConsumerConfig.java @@ -0,0 +1,33 @@ +package StorageService.Configs; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaConsumerConfig { + private final String server = "localhost:9092"; + + @Bean + public ConsumerFactory consumerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); + configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); + configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(configProps); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/StorageService/src/main/java/StorageService/Console/Storage.java b/StorageService/src/main/java/StorageService/Console/Storage.java new file mode 100644 index 0000000..2efabe2 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Console/Storage.java @@ -0,0 +1,13 @@ +package StorageService.Console; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafka; + +@SpringBootApplication +@EnableKafka +public class Storage { + public static void main(String[] args) { + SpringApplication.run(Storage.class, args); + } +} diff --git a/StorageService/src/main/java/StorageService/Controllers/EventController.java b/StorageService/src/main/java/StorageService/Controllers/EventController.java new file mode 100644 index 0000000..e86f172 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Controllers/EventController.java @@ -0,0 +1,83 @@ +package StorageService.Controllers; + +import StorageService.DTO.AccountEventDTO; +import StorageService.DTO.UserEventDTO; +import StorageService.Models.AccountEvent; +import StorageService.Models.UserEvent; +import StorageService.Repositories.AccountEventRepository; +import StorageService.Repositories.UserEventRepository; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.stream.Collectors; + +@RestController +@RequestMapping("/events") +@RequiredArgsConstructor +public class EventController { + + private final UserEventRepository userEventRepository; + private final AccountEventRepository accountEventRepository; + + @Operation(summary = "Получить все события пользователей") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "События получены", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = UserEventDTO.class))) + }) + @GetMapping("/users") + public ResponseEntity> getAllUserEvents() { + List result = userEventRepository.findAll() + .stream() + .map(UserEventDTO::new) + .collect(Collectors.toList()); + return ResponseEntity.ok(result); + } + + @Operation(summary = "Получить все события аккаунтов") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "События получены", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = AccountEventDTO.class))) + }) + @GetMapping("/accounts") + public ResponseEntity> getAllAccountEvents() { + List result = accountEventRepository.findAll() + .stream() + .map(AccountEventDTO::new) + .collect(Collectors.toList()); + return ResponseEntity.ok(result); + } + + @Operation(summary = "Получить событие пользователя по ID") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Событие найдено", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = UserEventDTO.class))), + @ApiResponse(responseCode = "404", description = "Событие не найдено") + }) + @GetMapping("/users/{id}") + public ResponseEntity getUserEventById(@Parameter(description = "ID пользователя") @PathVariable Long id) { + return userEventRepository.findById(id) + .map(event -> ResponseEntity.ok(new UserEventDTO(event))) + .orElse(ResponseEntity.notFound().build()); + } + + @Operation(summary = "Получить событие аккаунта по ID") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Событие найдено", + content = @Content(mediaType = "application/json", schema = @Schema(implementation = AccountEventDTO.class))), + @ApiResponse(responseCode = "404", description = "Событие не найдено") + }) + @GetMapping("/accounts/{id}") + public ResponseEntity getAccountEventById(@Parameter(description = "ID аккаунта") @PathVariable Long id) { + return accountEventRepository.findById(id) + .map(event -> ResponseEntity.ok(new AccountEventDTO(event))) + .orElse(ResponseEntity.notFound().build()); + } +} diff --git a/StorageService/src/main/java/StorageService/Controllers/HomepageController.java b/StorageService/src/main/java/StorageService/Controllers/HomepageController.java new file mode 100644 index 0000000..3cf4d29 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Controllers/HomepageController.java @@ -0,0 +1,12 @@ +package StorageService.Controllers; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class HomepageController { + @GetMapping("/") + public String home() { + return "StorageService is running!"; + } +} diff --git a/StorageService/src/main/java/StorageService/DTO/AccountEventDTO.java b/StorageService/src/main/java/StorageService/DTO/AccountEventDTO.java new file mode 100644 index 0000000..83bfd31 --- /dev/null +++ b/StorageService/src/main/java/StorageService/DTO/AccountEventDTO.java @@ -0,0 +1,19 @@ +package StorageService.DTO; + +import StorageService.Models.AccountEvent; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AccountEventDTO { + private Long accountId; + private String eventData; + + public AccountEventDTO(AccountEvent event) { + this.accountId = event.getAccountId(); + this.eventData = event.getEventData(); + } +} diff --git a/StorageService/src/main/java/StorageService/DTO/UserEventDTO.java b/StorageService/src/main/java/StorageService/DTO/UserEventDTO.java new file mode 100644 index 0000000..ccdaaa6 --- /dev/null +++ b/StorageService/src/main/java/StorageService/DTO/UserEventDTO.java @@ -0,0 +1,19 @@ +package StorageService.DTO; + +import StorageService.Models.UserEvent; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class UserEventDTO { + private Long userId; + private String eventData; + + public UserEventDTO(UserEvent event) { + this.userId = event.getUserId(); + this.eventData = event.getEventData(); + } +} diff --git a/StorageService/src/main/java/StorageService/Models/AccountEvent.java b/StorageService/src/main/java/StorageService/Models/AccountEvent.java new file mode 100644 index 0000000..830c6dc --- /dev/null +++ b/StorageService/src/main/java/StorageService/Models/AccountEvent.java @@ -0,0 +1,21 @@ +package StorageService.Models; + +import jakarta.persistence.*; +import lombok.*; + +@Entity +@Table(name = "account_event") +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class AccountEvent { + + @Id + @Column(name = "account_id", nullable = false, unique = true) + private Long accountId; + + @Lob + @Column(name = "event_data", nullable = false) + private String eventData; +} diff --git a/StorageService/src/main/java/StorageService/Models/UserEvent.java b/StorageService/src/main/java/StorageService/Models/UserEvent.java new file mode 100644 index 0000000..627f1c5 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Models/UserEvent.java @@ -0,0 +1,21 @@ +package StorageService.Models; + +import jakarta.persistence.*; +import lombok.*; + +@Entity +@Table(name = "user_event") +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class UserEvent { + + @Id + @Column(name = "user_id", nullable = false, unique = true) + private Long userId; + + @Lob + @Column(name = "event_data", nullable = false) + private String eventData; +} diff --git a/StorageService/src/main/java/StorageService/Repositories/AccountEventRepository.java b/StorageService/src/main/java/StorageService/Repositories/AccountEventRepository.java new file mode 100644 index 0000000..8d92de9 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Repositories/AccountEventRepository.java @@ -0,0 +1,9 @@ +package StorageService.Repositories; + +import StorageService.Models.AccountEvent; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface AccountEventRepository extends JpaRepository { +} \ No newline at end of file diff --git a/StorageService/src/main/java/StorageService/Repositories/UserEventRepository.java b/StorageService/src/main/java/StorageService/Repositories/UserEventRepository.java new file mode 100644 index 0000000..8430e1f --- /dev/null +++ b/StorageService/src/main/java/StorageService/Repositories/UserEventRepository.java @@ -0,0 +1,9 @@ +package StorageService.Repositories; + +import StorageService.Models.UserEvent; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface UserEventRepository extends JpaRepository { +} diff --git a/StorageService/src/main/java/StorageService/Services/ConsumerService.java b/StorageService/src/main/java/StorageService/Services/ConsumerService.java new file mode 100644 index 0000000..b8720c1 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Services/ConsumerService.java @@ -0,0 +1,44 @@ +package StorageService.Services; + +import StorageService.Models.AccountEvent; +import StorageService.Models.UserEvent; +import StorageService.Repositories.AccountEventRepository; +import StorageService.Repositories.UserEventRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class ConsumerService { + + private final UserEventRepository userEventRepository; + private final AccountEventRepository accountEventRepository; + + @Autowired + public ConsumerService(UserEventRepository userEventRepository, AccountEventRepository accountEventRepository) { + this.userEventRepository = userEventRepository; + this.accountEventRepository = accountEventRepository; + } + + public void ConsumeUserEvent(String message, org.apache.kafka.clients.consumer.ConsumerRecord record) { + try { + Long userId = Long.parseLong(record.key()); + UserEvent event = new UserEvent(userId, message); + userEventRepository.save(event); + System.out.println("[UserEvent] Saved: " + event); + } catch (NumberFormatException e) { + System.err.println("Invalid userId key: " + record.key()); + } + } + + public void ConsumeAccountEvent(String message, org.apache.kafka.clients.consumer.ConsumerRecord record) { + System.out.println("Received account event: " + message); + try { + Long accountId = Long.parseLong(record.key()); + AccountEvent event = new AccountEvent(accountId, message); + accountEventRepository.save(event); + System.out.println("[AccountEvent] Saved: " + event); + } catch (NumberFormatException e) { + System.err.println("Invalid accountId key: " + record.key()); + } + } +} diff --git a/StorageService/src/main/java/StorageService/Services/EventConsumer.java b/StorageService/src/main/java/StorageService/Services/EventConsumer.java new file mode 100644 index 0000000..43fac99 --- /dev/null +++ b/StorageService/src/main/java/StorageService/Services/EventConsumer.java @@ -0,0 +1,26 @@ +package StorageService.Services; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class EventConsumer { + + private final ConsumerService consumerService; + + @Autowired + public EventConsumer(ConsumerService consumerService) { + this.consumerService = consumerService; + } + + @KafkaListener(topics = "client-topic", groupId = "my-group") + public void consumeUserEvent(String message, org.apache.kafka.clients.consumer.ConsumerRecord record) { + consumerService.ConsumeUserEvent(message, record); + } + + @KafkaListener(topics = "account-topic", groupId = "my-group") + public void consumeAccountEvent(String message, org.apache.kafka.clients.consumer.ConsumerRecord record) { + consumerService.ConsumeAccountEvent(message, record); + } +} diff --git a/StorageService/src/main/resources/application.yml b/StorageService/src/main/resources/application.yml new file mode 100644 index 0000000..7c67ee7 --- /dev/null +++ b/StorageService/src/main/resources/application.yml @@ -0,0 +1,41 @@ +server: + port: 8082 + +spring: + application: + name: storage-service + flyway: + enabled: false + + + datasource: + url: jdbc:postgresql://localhost:5432/JavaLabsEventsDb + username: postgres + password: limosha + driver-class-name: org.postgresql.Driver + + jpa: + hibernate: + ddl-auto: update + show-sql: true + properties: + hibernate: + dialect: org.hibernate.dialect.PostgreSQLDialect + format_sql: true + + kafka: + bootstrap-servers: localhost:9092 + consumer: + group-id: storage-consumer-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + missing-topics-fatal: false + +logging: + level: + org: + springframework: + kafka: DEBUG + root: INFO diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..cddd602 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,16 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.3.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:7.3.0 + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/pom.xml b/pom.xml index b7f3d90..2f229d9 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ Presentation ApiGateway ApiGateway/DataAccessApiGateway + StorageService