Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.techfork.domain.source.batch.RssFeedReader;
import com.techfork.domain.source.batch.RssToPostProcessor;
import com.techfork.domain.source.dto.RssFeedItem;
import com.techfork.domain.source.listener.RssCrawlingJobListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
Expand Down Expand Up @@ -58,9 +59,12 @@ public class RssCrawlingJobConfig {
private final PostEmbeddingProcessor postEmbeddingProcessor;
private final PostEmbeddingWriter postEmbeddingWriter;

private final RssCrawlingJobListener rssCrawlingJobListener;

@Bean
public Job rssCrawlingJob() {
return new JobBuilder("rssCrawlingJob", jobRepository)
.listener(rssCrawlingJobListener)
.start(fetchAndSaveRssStep())
.next(extractSummaryStep())
.next(embedAndIndexStep())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,28 @@
package com.techfork.domain.source.controller;

import com.techfork.domain.source.service.CrawlingService;
import com.techfork.global.common.code.SuccessCode;
import com.techfork.global.response.BaseResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@Tag(name = "Batch", description = "배치 작업 API")
@Slf4j
@RestController
@RequestMapping("/api/v1/batch")
@RequiredArgsConstructor
public class BatchController {

private final JobLauncher jobLauncher;
private final Job rssCrawlingJob;
private final CrawlingService crawlingService;

@Operation(summary = "RSS 크롤링 실행", description = "모든 테크 블로그의 RSS를 크롤링하여 DB에 저장합니다.")
@PostMapping("/crawl-rss")
public ResponseEntity<BaseResponse<String>> crawlRss() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString("requestTime", LocalDateTime.now().toString())
.toJobParameters();

jobLauncher.run(rssCrawlingJob, jobParameters);

log.info("RSS 크롤링 Job 실행 완료");
return BaseResponse.of(SuccessCode.OK, "RSS 크롤링이 성공적으로 시작되었습니다.");

} catch (Exception e) {
log.error("RSS 크롤링 Job 실행 실패", e);
return BaseResponse.of(SuccessCode.OK, "RSS 크롤링 실행 중 오류 발생: " + e.getMessage());
}
crawlingService.executeCrawling();
return BaseResponse.of(SuccessCode.OK, "RSS 크롤링이 성공적으로 시작되었습니다.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.techfork.domain.source.listener;

import com.techfork.domain.source.entity.CrawlingHistory;
import com.techfork.domain.source.repository.CrawlingHistoryRepository;
import com.techfork.domain.source.service.WebhookNotificationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

/**
* RSS 크롤링 Job 실행 리스너
* - Job 시작 시: CrawlingHistory 생성 및 시작 로깅
* - Job 종료 시: 성공/실패 처리, 통계 로깅, Webhook 알림
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RssCrawlingJobListener implements JobExecutionListener {

private final CrawlingHistoryRepository crawlingHistoryRepository;
private final WebhookNotificationService webhookNotificationService;

@Override
@Transactional
public void beforeJob(JobExecution jobExecution) {
Long jobExecutionId = jobExecution.getId();
log.info("RSS crawling job started: jobExecutionId={}", jobExecutionId);

CrawlingHistory history = CrawlingHistory.createStarted(jobExecutionId);
crawlingHistoryRepository.save(history);
}

@Override
@Transactional
public void afterJob(JobExecution jobExecution) {
Long jobExecutionId = jobExecution.getId();
BatchStatus batchStatus = jobExecution.getStatus();

CrawlingHistory history = crawlingHistoryRepository
.findByJobExecutionId(jobExecutionId)
.orElseThrow(() -> new IllegalStateException(
"CrawlingHistory not found for jobExecutionId: " + jobExecutionId));

if (batchStatus == BatchStatus.COMPLETED) {
handleJobSuccess(history, jobExecution);
} else {
handleJobFailure(history, jobExecution, "Job failed with status: " + batchStatus);
}
}

/**
* Job 성공 처리
*/
private void handleJobSuccess(CrawlingHistory history, JobExecution jobExecution) {
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();

int readCount = (int) stepExecution.getReadCount();
int writeCount = (int) stepExecution.getWriteCount();
int skipCount = (int) stepExecution.getSkipCount();

history.complete(readCount, writeCount, skipCount);
crawlingHistoryRepository.save(history);

log.info("RSS crawling completed successfully: " +
"total={}, success={}, failed={}",
readCount, writeCount, skipCount);
}

/**
* Job 실패 처리
*/
private void handleJobFailure(CrawlingHistory history, JobExecution jobExecution, String errorMessage) {
history.fail(errorMessage);
crawlingHistoryRepository.save(history);

log.error("RSS crawling failed: {}", errorMessage);

// 실패 알림 전송
Map<String, Object> context = new HashMap<>();
context.put("errorMessage", errorMessage);
context.put("timestamp", LocalDateTime.now());
context.put("jobExecutionId", jobExecution.getId());

webhookNotificationService.sendCrawlingFailureNotification(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
public interface CrawlingHistoryRepository extends JpaRepository<CrawlingHistory, Long> {

List<CrawlingHistory> findByStatusAndStartedAtBefore(ECrawlingStatus status, LocalDateTime dateTime);

Optional<CrawlingHistory> findByJobExecutionId(Long jobExecutionId);
}
100 changes: 3 additions & 97 deletions src/main/java/com/techfork/domain/source/service/CrawlingService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.techfork.domain.source.service;

import com.techfork.domain.source.entity.CrawlingHistory;
import com.techfork.domain.source.repository.CrawlingHistoryRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
Expand All @@ -10,14 +8,11 @@
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

/**
* RSS 크롤링 실행 서비스
* - Job 실행만 담당
* - Job 라이프사이클 이벤트(시작/종료)는 RssCrawlingJobListener에서 처리
*/
@Slf4j
@Service
Expand All @@ -26,115 +21,26 @@ public class CrawlingService {

private final JobLauncher jobLauncher;
private final Job rssCrawlingJob;
private final CrawlingHistoryRepository crawlingHistoryRepository;
private final WebhookNotificationService webhookNotificationService;

public void executeCrawling() {
log.info("Starting RSS crawling job");

CrawlingHistory history = null;
JobExecution jobExecution = null;

try {
// Job 파라미터 생성 (매번 다른 파라미터로 실행)
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();

jobExecution = jobLauncher.run(rssCrawlingJob, jobParameters);
Long jobExecutionId = jobExecution.getId();

history = CrawlingHistory.createStarted(jobExecutionId);
crawlingHistoryRepository.save(history);

log.info("RSS crawling job started: jobExecutionId={}", jobExecutionId);

// Job 실행 완료 대기 및 결과 처리
waitForJobCompletion(jobExecution, history);
jobLauncher.run(rssCrawlingJob, jobParameters);

} catch (JobExecutionAlreadyRunningException e) {
log.warn("Job is already running", e);
if (history != null) {
history.fail("Job is already running");
crawlingHistoryRepository.save(history);
}
} catch (JobRestartException e) {
log.error("Job restart failed", e);
handleJobFailure(history, jobExecution, "Job restart failed: " + e.getMessage());
} catch (JobInstanceAlreadyCompleteException e) {
log.error("Job instance already complete", e);
handleJobFailure(history, jobExecution, "Job instance already complete: " + e.getMessage());
} catch (JobParametersInvalidException e) {
log.error("Invalid job parameters", e);
handleJobFailure(history, jobExecution, "Invalid job parameters: " + e.getMessage());
} catch (Exception e) {
log.error("Unexpected error during crawling", e);
handleJobFailure(history, jobExecution, "Unexpected error: " + e.getMessage());
}
}

private void waitForJobCompletion(JobExecution jobExecution, CrawlingHistory history) {
BatchStatus batchStatus = jobExecution.getStatus();

while (batchStatus.isRunning()) {
try {
Thread.sleep(1000);
batchStatus = jobExecution.getStatus();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Job execution monitoring interrupted", e);
break;
}
}

// Job 실행 결과 처리
if (batchStatus == BatchStatus.COMPLETED) {
handleJobSuccess(history, jobExecution);
} else {
handleJobFailure(history, jobExecution,
"Job failed with status: " + batchStatus);
}
}

/**
* Job 성공 처리
*/
@Transactional
public void handleJobSuccess(CrawlingHistory history, JobExecution jobExecution) {
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();

int readCount = (int) stepExecution.getReadCount();
int writeCount = (int) stepExecution.getWriteCount();
int skipCount = (int) stepExecution.getSkipCount();

history.complete(readCount, writeCount, skipCount);
crawlingHistoryRepository.save(history);

log.info("RSS crawling completed successfully: " +
"total={}, success={}, failed={}",
readCount, writeCount, skipCount);
}

/**
* Job 실패 처리
*/
@Transactional
public void handleJobFailure(CrawlingHistory history, JobExecution jobExecution, String errorMessage) {
if (history != null) {
history.fail(errorMessage);
crawlingHistoryRepository.save(history);
}

log.error("RSS crawling failed: {}", errorMessage);

// 실패 알림 전송
Map<String, Object> context = new HashMap<>();
context.put("errorMessage", errorMessage);
context.put("timestamp", LocalDateTime.now());
if (jobExecution != null) {
context.put("jobExecutionId", jobExecution.getId());
}

webhookNotificationService.sendCrawlingFailureNotification(context);
}
}
Loading