diff --git a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepository.java b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepository.java index 96a05ef..a650483 100644 --- a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepository.java +++ b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepository.java @@ -7,6 +7,7 @@ import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import java.time.LocalDateTime; import java.util.List; import java.util.Optional; @@ -32,6 +33,4 @@ public interface ChatRoomRepository extends JpaRepository, ChatR WHERE cr.id = :roomId """) Optional findByIdForUpdate(@Param("roomId") Long roomId); - - } diff --git a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustom.java b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustom.java index 4e044db..08b4a57 100644 --- a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustom.java +++ b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustom.java @@ -6,6 +6,9 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import java.time.LocalDateTime; +import java.util.List; + public interface ChatRoomRepositoryCustom { /** @@ -13,4 +16,23 @@ public interface ChatRoomRepositoryCustom { * 전체 채팅방 조회 */ Page findAllWithFilter(ChatRoomStatus status, Pageable pageable); + + /** + * 자동 종료 대상 채팅방 조회 + * + * 조건: + * 1. lastMessageAt < cutoff (마지막 메시지 이후 일정 시간 경과) + * 2. WAITING 또는 IN_PROGRESS 상태 (이미 종료된 상태) + * + * COMPLETED는 activeFlag=null이라서 조건 2로 자동 제외됨 + */ + List findInactiveRooms(LocalDateTime cutoff, Long lastId, int batchSize); + + /** + * 배치 자동 종료 - Bulk Update + * + * 개별 autoCloseRoom() N번 호출 대신 IN 쿼리로 한 번에 상태 변경 + * DB 커넥션 N번 -> 1번으로 감소 + */ + void bulkCompleteRooms(List roomIds); } diff --git a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustomImpl.java b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustomImpl.java index d7446c0..35f392b 100644 --- a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustomImpl.java +++ b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/repository/ChatRoomRepositoryCustomImpl.java @@ -4,6 +4,7 @@ import com.querydsl.core.types.Projections; import com.querydsl.core.types.dsl.BooleanExpression; import com.querydsl.jpa.impl.JPAQueryFactory; +import jakarta.persistence.EntityManager; import jpa.basic.alldayprojectcommerce.domain.chat.dto.response.ChatRoomResponse; import jpa.basic.alldayprojectcommerce.domain.chat.entity.ChatRoom; import jpa.basic.alldayprojectcommerce.domain.chat.entity.ChatRoomStatus; @@ -13,6 +14,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Repository; +import java.time.LocalDateTime; import java.util.List; import static jpa.basic.alldayprojectcommerce.domain.chat.entity.QChatRoom.chatRoom; @@ -22,6 +24,7 @@ public class ChatRoomRepositoryCustomImpl implements ChatRoomRepositoryCustom { private final JPAQueryFactory queryFactory; + private final EntityManager em; /** * 전체 채팅방 조회 - QueryDSL Offset 페이징 @@ -58,7 +61,44 @@ public Page findAllWithFilter(ChatRoomStatus status, Pageable return new PageImpl<>(content, pageable, count == null ? 0L : count); } + @Override + public List findInactiveRooms(LocalDateTime cutoff, Long lastId, int batchSize) { + return queryFactory + .selectFrom(chatRoom) + .where( + gtRoomId(lastId), + chatRoom.chatRoomStatus.in( + ChatRoomStatus.WAITING, + ChatRoomStatus.IN_PROGRESS + ), + chatRoom.lastMessageAt.before(cutoff) + ) + .orderBy(chatRoom.id.asc()) // ID 오름차순 - 커서 기반 페이징 + .limit(batchSize) + .fetch(); + } + + @Override + public void bulkCompleteRooms(List roomIds) { + queryFactory + .update(chatRoom) + .set(chatRoom.chatRoomStatus, ChatRoomStatus.COMPLETED) + .setNull(chatRoom.activeFlag) + .where(chatRoom.id.in(roomIds)) + .execute(); + + /** + * bulk update를 진행하면서 1차 캐시에 남아있는 데이터를 초기화해서 + * 다음 조회 시 DB에서 최신 데이터를 가져오도록 강제 + */ + em.clear(); + } + private BooleanExpression statusEq(ChatRoomStatus status) { return status != null ? chatRoom.chatRoomStatus.eq(status) : null; } + + private BooleanExpression gtRoomId(Long lastId) { + return lastId != null ? chatRoom.id.gt(lastId) : null; + } } diff --git a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/scheduler/ChatInactivityScheduler.java b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/scheduler/ChatInactivityScheduler.java new file mode 100644 index 0000000..88d5b1c --- /dev/null +++ b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/scheduler/ChatInactivityScheduler.java @@ -0,0 +1,140 @@ +package jpa.basic.alldayprojectcommerce.domain.chat.scheduler; + +import jpa.basic.alldayprojectcommerce.common.lock.repository.RedisLockRepository; +import jpa.basic.alldayprojectcommerce.domain.chat.dto.response.ChatMessageResponse; +import jpa.basic.alldayprojectcommerce.domain.chat.entity.ChatMessage; +import jpa.basic.alldayprojectcommerce.domain.chat.entity.ChatRoom; +import jpa.basic.alldayprojectcommerce.domain.chat.entity.MessageType; +import jpa.basic.alldayprojectcommerce.domain.chat.entity.SenderType; +import jpa.basic.alldayprojectcommerce.domain.chat.redis.ChatRedisPublisher; +import jpa.basic.alldayprojectcommerce.domain.chat.repository.ChatMessageRepository; +import jpa.basic.alldayprojectcommerce.domain.chat.repository.ChatRoomRepository; +import jpa.basic.alldayprojectcommerce.domain.chat.service.ChatRoomCommandService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ChatInactivityScheduler { + + @Value("${chat.inactivity-timeout-minutes:10}") + private int inactivityTimeoutMinutes; + + private final ChatRoomRepository chatRoomRepository; + private final ChatRoomCommandService chatRoomCommandService; + private final ChatRedisPublisher chatRedisPublisher; + private final RedisLockRepository redisLockRepository; + + private final ChatMessageRepository chatMessageRepository; + + private static final String INACTIVITY_LOCK_KEY = "lock:chat:inactivity"; + private static final long INACTIVITY_LOCK_TTL = 55L; // 55초 + private static final int BATCH_SIZE = 100; // 한 번에 처리할 최대 방 수 + + private String buildCloseMessage() { + return String.format("%d분간 응답이 없어 상담이 자동 종료되었습니다.", + inactivityTimeoutMinutes); + } + + /** + * 비활성 채팅방 자동 종료 스케쥴러 + * + * 실행 주기: 1분마다 + * 종료 기준: lastMessageAt 이후 inactivityTimeoutMinutes 경과 + * + * 처리 순서: + * 1. 분산락 획득 — 다른 서버 중복 실행 방지 + * 2. No-Offset 배치 조회 (BATCH_SIZE씩) — OOM 방지 + * 3. bulkCompleteRooms() — IN 쿼리로 상태 한 번에 변경 + * 4. saveAll() — 시스템 메시지 한 번에 저장 + * 5. Redis Pub/Sub — 방별 WebSocket 알림 발행 + * 6. Redis 발행 실패 시 DB는 이미 커밋됨 — 로그만 남기고 계속 진행 + */ + @Scheduled(fixedRate = 60_000) // 1분마다 실행 + public void closeInactiveRooms() { + String lockValue = UUID.randomUUID().toString(); + + if (!redisLockRepository.tryLock(INACTIVITY_LOCK_KEY, lockValue, INACTIVITY_LOCK_TTL)) { + log.info("[자동종료] 다른 서버 실행 중 - 스킵"); + return; + } + + try { + // 기준 시각: 현재 - 10분 + LocalDateTime cutOff = LocalDateTime.now().minusMinutes(inactivityTimeoutMinutes); + + Long lastId = null; + int totalProcessed = 0; + + while (true) { + List targets = chatRoomRepository.findInactiveRooms(cutOff, lastId, BATCH_SIZE); + + if (targets.isEmpty()) break; + + List roomIds = targets.stream() + .map(ChatRoom::getId) + .collect(Collectors.toList()); + + /** + * Bulk Update + * + * 분산락이 스케쥴러 진입 자체를 1대만 허용하므로 + * 비관적 락 없이 Bulk Update 사용 + */ + chatRoomRepository.bulkCompleteRooms(roomIds); + + List messages = roomIds.stream() + .map(id -> ChatMessage.systemMessage(id, buildCloseMessage())) + .collect(Collectors.toList()); + + chatMessageRepository.saveAll(messages); + + for (Long roomId : roomIds) { + try { + ChatMessageResponse notification = new ChatMessageResponse( + null, + null, + SenderType.SYSTEM, + MessageType.SYSTEM, + buildCloseMessage(), + LocalDateTime.now() + ); + chatRedisPublisher.publish(roomId, notification); + } catch (Exception e) { + /** + * Redis 발행 실패해도 DB는 이미 커밋됨 + * 클라이언트는 재연결 시 방 상태 조회로 2차 감지 + */ + log.error("[자동종료] 알림 실패 roomId: {}", roomId, e); + } + } + + totalProcessed += targets.size(); + + // 다음 배치 시작점 - 현재 배치의 마지막 ID + lastId = targets.get(targets.size() - 1).getId(); + + // 마지막 배치면 종료 + if (targets.size() < BATCH_SIZE) break; + } + + if (totalProcessed > 0) { + log.info("[자동종료] 총 {}건 처리 완료", totalProcessed); + } else { + log.debug("[자동종료] 처리 대상 없음"); + } + + } finally { + redisLockRepository.unlock(INACTIVITY_LOCK_KEY, lockValue); + } + } +} diff --git a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/service/ChatRoomCommandServiceImpl.java b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/service/ChatRoomCommandServiceImpl.java index 9dfdc9e..94f8957 100644 --- a/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/service/ChatRoomCommandServiceImpl.java +++ b/src/main/java/jpa/basic/alldayprojectcommerce/domain/chat/service/ChatRoomCommandServiceImpl.java @@ -1,7 +1,5 @@ package jpa.basic.alldayprojectcommerce.domain.chat.service; -import jakarta.validation.constraints.NotBlank; -import jakarta.validation.constraints.Size; import jpa.basic.alldayprojectcommerce.common.exception.CustomException; import jpa.basic.alldayprojectcommerce.common.exception.ErrorCode; import jpa.basic.alldayprojectcommerce.domain.chat.dto.request.CreateChatRoomRequest; diff --git a/src/main/java/jpa/basic/alldayprojectcommerce/domain/keyword/scheduler/KeywordScheduler.java b/src/main/java/jpa/basic/alldayprojectcommerce/domain/keyword/scheduler/KeywordScheduler.java index cf17546..f267480 100644 --- a/src/main/java/jpa/basic/alldayprojectcommerce/domain/keyword/scheduler/KeywordScheduler.java +++ b/src/main/java/jpa/basic/alldayprojectcommerce/domain/keyword/scheduler/KeywordScheduler.java @@ -1,5 +1,6 @@ package jpa.basic.alldayprojectcommerce.domain.keyword.scheduler; +import jpa.basic.alldayprojectcommerce.common.lock.repository.RedisLockRepository; import jpa.basic.alldayprojectcommerce.domain.keyword.service.KeywordCommandService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -7,6 +8,7 @@ import org.springframework.stereotype.Component; import java.time.LocalDate; +import java.util.UUID; @Slf4j @Component @@ -14,6 +16,12 @@ public class KeywordScheduler { private final KeywordCommandService keywordCommandService; + private final RedisLockRepository redisLockRepository; + + private static final String WRITE_BACK_LOCK_KEY = "lock:keyword:writeBack"; + private static final String MIDNIGHT_LOCK_KEY = "lock:keyword:midnightReset"; + private static final Long WRITE_BACK_LOCK_TTL = 55 * 60L; // 55분 + private static final Long MIDNIGHT_LOCK_TTL = 5 * 60L; // 5분 /** * Write-back 스케쥴러 @@ -21,15 +29,24 @@ public class KeywordScheduler { * 매 1시간마다 실행 * Redis ZSet -> SearchKeyword DB 동기화 */ - @Scheduled(fixedDelay = 60 * 60 * 1000, zone = "Asia/Seoul") + @Scheduled(cron = "0 0 * * * *", zone = "Asia/Seoul") public void writeBack() { log.info("[스케쥴러] Write-back 시작"); + String lockValue = UUID.randomUUID().toString(); + + if (!redisLockRepository.tryLock(WRITE_BACK_LOCK_KEY, lockValue, WRITE_BACK_LOCK_TTL)) { + log.info("[스케쥴러] Write-back 다른 서버 실행 중 - 스킵"); + return; + } + try { keywordCommandService.writeBack(); } catch (Exception e) { // 스케쥴러가 실패해도 Redis는 건들지 않고 다음 주기에 재시도 - log.error("[스케쥴러] Write-back 실패: {}", e.getMessage()); + log.error("[스케쥴러] Write-back 실패", e); + } finally { + redisLockRepository.unlock(WRITE_BACK_LOCK_KEY, lockValue); } } @@ -43,6 +60,13 @@ public void midnightReset() { log.info("[자정 초기화] 시작 - 기준 날짜: {}", yesterday); + String lockValue = UUID.randomUUID().toString(); + + if (!redisLockRepository.tryLock(MIDNIGHT_LOCK_KEY, lockValue, MIDNIGHT_LOCK_TTL)) { + log.info("[자정 초기화] 다른 서버 실행 중 - 스킵"); + return; + } + try { // 마지막 Write-back keywordCommandService.writeBack(yesterday); @@ -60,6 +84,8 @@ public void midnightReset() { } catch (Exception e) { // 실패 시 Redis 초기화 X log.error("[자정 초기화] 실패", e); + } finally { + redisLockRepository.unlock(MIDNIGHT_LOCK_KEY, lockValue); } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5dbbdde..31e7935 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -55,4 +55,7 @@ management: show-details: always websocket: - allowed-origins: "http://localhost:3000,http://localhost:8090" \ No newline at end of file + allowed-origins: "http://localhost:3000,http://localhost:8090" + +chat: + inactivity-timeout-minutes: 10 # 비활성 기준 시간 (분) \ No newline at end of file diff --git a/src/main/resources/static/test-chat.html b/src/main/resources/static/test-chat.html new file mode 100644 index 0000000..610c04a --- /dev/null +++ b/src/main/resources/static/test-chat.html @@ -0,0 +1,263 @@ + + + + + 채팅 테스트 + + + + + + + +

🔌 채팅 WebSocket 테스트

+DISCONNECTED + + +
+

1. 로그인 (JWT 토큰 발급)

+ + + +
+ JWT 토큰: + +
+
+ + +
+

2. WebSocket 연결

+ +
+ + +
+
+ + +
+

3. 채팅방 구독

+ + + +
+ + +
+

4. 메시지 전송

+ + + +
+ + +
+

📋 로그

+ +
+
+ + + + \ No newline at end of file