diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/config/CleanupTaskProperties.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/config/CleanupTaskProperties.java new file mode 100644 index 00000000..276c36ea --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/config/CleanupTaskProperties.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.validation.annotation.Validated; + +/** + * outbox 清理任务 worker 配置 + */ +@Data +@Validated +@Configuration +@ConfigurationProperties(prefix = "rag.knowledge.cleanup") +public class CleanupTaskProperties { + + /** worker 扫描间隔(毫秒) */ + private Long scanDelayMs = 15000L; + + /** 每次扫描批量大小 */ + private Integer batchSize = 50; + + /** 最大重试次数,超过置为 failed 并告警 */ + private Integer maxRetry = 5; + + /** 重试退避基数(秒),实际退避 = baseBackoffSeconds * 2^retryCount */ + private Long baseBackoffSeconds = 30L; +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/entity/FileCleanupTaskDO.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/entity/FileCleanupTaskDO.java new file mode 100644 index 00000000..419b58ca --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/entity/FileCleanupTaskDO.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.dao.entity; + +import com.baomidou.mybatisplus.annotation.FieldFill; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * 文件清理任务(outbox)。 + * 删除主事务内写入,由 CleanupTaskWorker 异步执行文件存储删除并保证最终一致。 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@TableName("t_file_cleanup_task") +public class FileCleanupTaskDO { + + @TableId(type = IdType.ASSIGN_ID) + private String id; + + /** 文件存储地址(fileUrl) */ + private String fileUrl; + + /** 任务状态:pending/success/failed,见 CleanupTaskStatus */ + private String status; + + /** 已重试次数 */ + private Integer retryCount; + + /** 下次可执行时间(退避重试) */ + private Date nextRetryTime; + + /** 当前领取者 */ + private String lockOwner; + + /** 领取租约到期时间 */ + private Date lockUntil; + + /** 最近一次错误信息 */ + private String errorMessage; + + @TableField(fill = FieldFill.INSERT) + private Date createTime; + + @TableField(fill = FieldFill.INSERT_UPDATE) + private Date updateTime; +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/entity/VectorCleanupTaskDO.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/entity/VectorCleanupTaskDO.java new file mode 100644 index 00000000..b55a8348 --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/entity/VectorCleanupTaskDO.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.dao.entity; + +import com.baomidou.mybatisplus.annotation.FieldFill; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * 向量清理任务(outbox)。 + * 删除主事务内写入,由 CleanupTaskWorker 异步执行 PgVector 删除并保证最终一致。 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@TableName("t_vector_cleanup_task") +public class VectorCleanupTaskDO { + + @TableId(type = IdType.ASSIGN_ID) + private String id; + + /** 文档 ID */ + private String docId; + + /** 向量集合名(collection_name) */ + private String collectionName; + + /** 任务状态:pending/success/failed,见 CleanupTaskStatus */ + private String status; + + /** 已重试次数 */ + private Integer retryCount; + + /** 下次可执行时间(退避重试) */ + private Date nextRetryTime; + + /** 当前领取者 */ + private String lockOwner; + + /** 领取租约到期时间 */ + private Date lockUntil; + + /** 最近一次错误信息 */ + private String errorMessage; + + @TableField(fill = FieldFill.INSERT) + private Date createTime; + + @TableField(fill = FieldFill.INSERT_UPDATE) + private Date updateTime; +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/FileCleanupTaskMapper.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/FileCleanupTaskMapper.java new file mode 100644 index 00000000..a954a08c --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/FileCleanupTaskMapper.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.dao.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.nageoffer.ai.ragent.knowledge.dao.entity.FileCleanupTaskDO; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; + +import java.util.Date; + +public interface FileCleanupTaskMapper extends BaseMapper { + + @Update("UPDATE t_file_cleanup_task " + + "SET status = #{running}, lock_owner = #{lockOwner}, lock_until = #{lockUntil}, update_time = NOW() " + + "WHERE id = #{id} AND status = #{pending} " + + "AND (next_retry_time IS NULL OR next_retry_time <= #{now})") + int claimProcessing(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("lockUntil") Date lockUntil, @Param("now") Date now, + @Param("running") String runningCode, @Param("pending") String pendingCode); + + @Update("UPDATE t_file_cleanup_task " + + "SET status = #{success}, lock_owner = NULL, lock_until = NULL, error_message = NULL, update_time = NOW() " + + "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}") + int markSuccessIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("success") String successCode, @Param("running") String runningCode); + + @Update("UPDATE t_file_cleanup_task " + + "SET status = #{pending}, retry_count = #{retryCount}, next_retry_time = #{nextRetryTime}, " + + "error_message = #{errorMessage}, lock_owner = NULL, lock_until = NULL, update_time = NOW() " + + "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}") + int markRetryIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("pending") String pendingCode, @Param("running") String runningCode, + @Param("retryCount") int retryCount, @Param("nextRetryTime") Date nextRetryTime, + @Param("errorMessage") String errorMessage); + + @Update("UPDATE t_file_cleanup_task " + + "SET status = #{failed}, retry_count = #{retryCount}, error_message = #{errorMessage}, " + + "lock_owner = NULL, lock_until = NULL, update_time = NOW() " + + "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}") + int markFailedIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("failed") String failedCode, @Param("running") String runningCode, + @Param("retryCount") int retryCount, @Param("errorMessage") String errorMessage); + + @Update("UPDATE t_file_cleanup_task " + + "SET status = #{pending}, lock_owner = NULL, lock_until = NULL, update_time = NOW() " + + "WHERE status = #{running} AND lock_until <= #{now}") + int recoverExpiredProcessing(@Param("now") Date now, + @Param("running") String runningCode, @Param("pending") String pendingCode); +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/KnowledgeDocumentMapper.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/KnowledgeDocumentMapper.java index 5c22d6bc..50ce7a36 100644 --- a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/KnowledgeDocumentMapper.java +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/KnowledgeDocumentMapper.java @@ -19,6 +19,35 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.nageoffer.ai.ragent.knowledge.dao.entity.KnowledgeDocumentDO; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +import java.util.List; public interface KnowledgeDocumentMapper extends BaseMapper { + + /** + * CAS 抢占文档状态:仅当当前状态命中 fromStatuses 且未删除时,原子置为 toStatus。 + * + * @return 受影响行数;0 表示并发冲突(状态已被其它流程改写) + */ + @Update("") + int casStatus(@Param("docId") String docId, + @Param("fromStatuses") List fromStatuses, + @Param("toStatus") String toStatus); + + /** + * 仅查询文档当前状态(不走逻辑删除过滤,供分块中段二次校验使用)。 + * + * @return status 字符串;文档不存在时返回 null + */ + @Select("SELECT status FROM t_knowledge_document WHERE id = #{docId}") + String selectStatusById(@Param("docId") String docId); } diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/VectorCleanupTaskMapper.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/VectorCleanupTaskMapper.java new file mode 100644 index 00000000..7d03cad5 --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/dao/mapper/VectorCleanupTaskMapper.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.dao.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.nageoffer.ai.ragent.knowledge.dao.entity.VectorCleanupTaskDO; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; + +import java.util.Date; + +public interface VectorCleanupTaskMapper extends BaseMapper { + + @Update("UPDATE t_vector_cleanup_task " + + "SET status = #{running}, lock_owner = #{lockOwner}, lock_until = #{lockUntil}, update_time = NOW() " + + "WHERE id = #{id} AND status = #{pending} " + + "AND (next_retry_time IS NULL OR next_retry_time <= #{now})") + int claimProcessing(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("lockUntil") Date lockUntil, @Param("now") Date now, + @Param("running") String runningCode, @Param("pending") String pendingCode); + + @Update("UPDATE t_vector_cleanup_task " + + "SET status = #{success}, lock_owner = NULL, lock_until = NULL, error_message = NULL, update_time = NOW() " + + "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}") + int markSuccessIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("success") String successCode, @Param("running") String runningCode); + + @Update("UPDATE t_vector_cleanup_task " + + "SET status = #{pending}, retry_count = #{retryCount}, next_retry_time = #{nextRetryTime}, " + + "error_message = #{errorMessage}, lock_owner = NULL, lock_until = NULL, update_time = NOW() " + + "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}") + int markRetryIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("pending") String pendingCode, @Param("running") String runningCode, + @Param("retryCount") int retryCount, @Param("nextRetryTime") Date nextRetryTime, + @Param("errorMessage") String errorMessage); + + @Update("UPDATE t_vector_cleanup_task " + + "SET status = #{failed}, retry_count = #{retryCount}, error_message = #{errorMessage}, " + + "lock_owner = NULL, lock_until = NULL, update_time = NOW() " + + "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}") + int markFailedIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner, + @Param("failed") String failedCode, @Param("running") String runningCode, + @Param("retryCount") int retryCount, @Param("errorMessage") String errorMessage); + + @Update("UPDATE t_vector_cleanup_task " + + "SET status = #{pending}, lock_owner = NULL, lock_until = NULL, update_time = NOW() " + + "WHERE status = #{running} AND lock_until <= #{now}") + int recoverExpiredProcessing(@Param("now") Date now, + @Param("running") String runningCode, @Param("pending") String pendingCode); +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/CleanupTaskStatus.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/CleanupTaskStatus.java new file mode 100644 index 00000000..22c806bc --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/CleanupTaskStatus.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * outbox 清理任务状态 + */ +@Getter +@RequiredArgsConstructor +public enum CleanupTaskStatus { + + /** + * 待执行 + */ + PENDING("pending"), + + /** + * 已被 worker 领取,正在执行 + */ + RUNNING("running"), + + /** + * 执行成功 + */ + SUCCESS("success"), + + /** + * 重试耗尽,进入死信,需人工介入 + */ + FAILED("failed"); + + public static final String PENDING_CODE = "pending"; + public static final String RUNNING_CODE = "running"; + + private final String code; +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/DocumentStatus.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/DocumentStatus.java index df131e7b..ba679ed6 100644 --- a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/DocumentStatus.java +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/enums/DocumentStatus.java @@ -47,7 +47,12 @@ public enum DocumentStatus { /** * 文档处理成功 */ - SUCCESS("success"); + SUCCESS("success"), + + /** + * 文档删除中(删除流程已抢占,分块不得再启动或写回) + */ + DELETING("deleting"); /** * 状态码 diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/CleanupTaskWorker.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/CleanupTaskWorker.java new file mode 100644 index 00000000..17a1975f --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/CleanupTaskWorker.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.schedule; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.nageoffer.ai.ragent.knowledge.config.CleanupTaskProperties; +import com.nageoffer.ai.ragent.knowledge.dao.entity.FileCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.entity.VectorCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.FileCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.VectorCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.enums.CleanupTaskStatus; +import com.nageoffer.ai.ragent.rag.core.vector.VectorStoreService; +import com.nageoffer.ai.ragent.rag.service.FileStorageService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.UUID; + +/** + * outbox 清理任务 worker:异步执行 PgVector / 文件存储删除,失败退避重试,超阈值告警。 + * 任务领取后所有状态写回均带 lockOwner,避免过期 worker 覆盖新 owner 的结果。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class CleanupTaskWorker { + + private final VectorCleanupTaskMapper vectorMapper; + private final FileCleanupTaskMapper fileMapper; + private final VectorStoreService vectorStoreService; + private final FileStorageService fileStorageService; + private final CleanupTaskProperties properties; + + @Scheduled(fixedDelayString = "${rag.knowledge.cleanup.scan-delay-ms:15000}") + public void scan() { + recoverExpiredProcessingTasks(); + scanVectorTasks(); + scanFileTasks(); + } + + private void recoverExpiredProcessingTasks() { + Date now = new Date(); + vectorMapper.recoverExpiredProcessing(now, CleanupTaskStatus.RUNNING_CODE, CleanupTaskStatus.PENDING_CODE); + fileMapper.recoverExpiredProcessing(now, CleanupTaskStatus.RUNNING_CODE, CleanupTaskStatus.PENDING_CODE); + } + + private void scanVectorTasks() { + Date now = new Date(); + LambdaQueryWrapper qw = Wrappers.lambdaQuery(VectorCleanupTaskDO.class) + .eq(VectorCleanupTaskDO::getStatus, CleanupTaskStatus.PENDING.getCode()) + .and(w -> w.isNull(VectorCleanupTaskDO::getNextRetryTime) + .or().le(VectorCleanupTaskDO::getNextRetryTime, now)) + .last("LIMIT " + Math.max(properties.getBatchSize(), 1)); + List tasks = vectorMapper.selectList(qw); + if (tasks == null || tasks.isEmpty()) { + return; + } + for (VectorCleanupTaskDO task : tasks) { + String lockOwner = nextLockOwner(); + if (vectorMapper.claimProcessing(task.getId(), lockOwner, computeProcessingLeaseUntil(), now, + CleanupTaskStatus.RUNNING_CODE, CleanupTaskStatus.PENDING_CODE) == 0) { + continue; + } + try { + vectorStoreService.deleteDocumentVectors(task.getCollectionName(), task.getDocId()); + markVectorSuccess(task, lockOwner); + } catch (Exception e) { + handleVectorFailure(task, lockOwner, e); + } + } + } + + private void markVectorSuccess(VectorCleanupTaskDO task, String lockOwner) { + int updated = vectorMapper.markSuccessIfOwned( + task.getId(), + lockOwner, + CleanupTaskStatus.SUCCESS.getCode(), + CleanupTaskStatus.RUNNING.getCode() + ); + if (updated == 0) { + log.warn("向量清理任务完成但锁已失效,跳过状态写回: taskId={}, docId={}", task.getId(), task.getDocId()); + } + } + + private void handleVectorFailure(VectorCleanupTaskDO task, String lockOwner, Exception e) { + int nextRetry = (task.getRetryCount() == null ? 0 : task.getRetryCount()) + 1; + int updated; + if (nextRetry > properties.getMaxRetry()) { + updated = vectorMapper.markFailedIfOwned( + task.getId(), + lockOwner, + CleanupTaskStatus.FAILED.getCode(), + CleanupTaskStatus.RUNNING.getCode(), + nextRetry, + truncate(e.getMessage()) + ); + log.error("向量清理任务重试耗尽,转入死信需人工介入: taskId={}, docId={}, collection={}", + task.getId(), task.getDocId(), task.getCollectionName(), e); + } else { + updated = vectorMapper.markRetryIfOwned( + task.getId(), + lockOwner, + CleanupTaskStatus.PENDING.getCode(), + CleanupTaskStatus.RUNNING.getCode(), + nextRetry, + computeNextRetry(nextRetry), + truncate(e.getMessage()) + ); + log.warn("向量清理任务失败,第 {} 次重试: taskId={}, docId={}", nextRetry, task.getId(), task.getDocId(), e); + } + if (updated == 0) { + log.warn("向量清理任务失败但锁已失效,跳过状态写回: taskId={}, docId={}", task.getId(), task.getDocId()); + } + } + + private void scanFileTasks() { + Date now = new Date(); + LambdaQueryWrapper qw = Wrappers.lambdaQuery(FileCleanupTaskDO.class) + .eq(FileCleanupTaskDO::getStatus, CleanupTaskStatus.PENDING.getCode()) + .and(w -> w.isNull(FileCleanupTaskDO::getNextRetryTime) + .or().le(FileCleanupTaskDO::getNextRetryTime, now)) + .last("LIMIT " + Math.max(properties.getBatchSize(), 1)); + List tasks = fileMapper.selectList(qw); + if (tasks == null || tasks.isEmpty()) { + return; + } + for (FileCleanupTaskDO task : tasks) { + String lockOwner = nextLockOwner(); + if (fileMapper.claimProcessing(task.getId(), lockOwner, computeProcessingLeaseUntil(), now, + CleanupTaskStatus.RUNNING_CODE, CleanupTaskStatus.PENDING_CODE) == 0) { + continue; + } + try { + fileStorageService.deleteByUrl(task.getFileUrl()); + int updated = fileMapper.markSuccessIfOwned( + task.getId(), + lockOwner, + CleanupTaskStatus.SUCCESS.getCode(), + CleanupTaskStatus.RUNNING.getCode() + ); + if (updated == 0) { + log.warn("文件清理任务完成但锁已失效,跳过状态写回: taskId={}, fileUrl={}", + task.getId(), task.getFileUrl()); + } + } catch (Exception e) { + handleFileFailure(task, lockOwner, e); + } + } + } + + private void handleFileFailure(FileCleanupTaskDO task, String lockOwner, Exception e) { + int nextRetry = (task.getRetryCount() == null ? 0 : task.getRetryCount()) + 1; + int updated; + if (nextRetry > properties.getMaxRetry()) { + updated = fileMapper.markFailedIfOwned( + task.getId(), + lockOwner, + CleanupTaskStatus.FAILED.getCode(), + CleanupTaskStatus.RUNNING.getCode(), + nextRetry, + truncate(e.getMessage()) + ); + log.error("文件清理任务重试耗尽,转入死信需人工介入: taskId={}, fileUrl={}", + task.getId(), task.getFileUrl(), e); + } else { + updated = fileMapper.markRetryIfOwned( + task.getId(), + lockOwner, + CleanupTaskStatus.PENDING.getCode(), + CleanupTaskStatus.RUNNING.getCode(), + nextRetry, + computeNextRetry(nextRetry), + truncate(e.getMessage()) + ); + log.warn("文件清理任务失败,第 {} 次重试: taskId={}, fileUrl={}", nextRetry, task.getId(), task.getFileUrl(), e); + } + if (updated == 0) { + log.warn("文件清理任务失败但锁已失效,跳过状态写回: taskId={}, fileUrl={}", task.getId(), task.getFileUrl()); + } + } + + private Date computeNextRetry(int retryCount) { + long backoffSeconds = properties.getBaseBackoffSeconds() * (1L << Math.min(retryCount, 10)); + return new Date(System.currentTimeMillis() + backoffSeconds * 1000L); + } + + private Date computeProcessingLeaseUntil() { + long leaseMillis = Math.max(properties.getScanDelayMs() * 4, 60_000L); + return new Date(System.currentTimeMillis() + leaseMillis); + } + + private String nextLockOwner() { + return "cleanup-" + UUID.randomUUID(); + } + + private String truncate(String message) { + if (message == null) { + return null; + } + return message.length() > 1000 ? message.substring(0, 1000) : message; + } +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/DocumentStatusHelper.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/DocumentStatusHelper.java index d97787b2..441269b7 100644 --- a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/DocumentStatusHelper.java +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/schedule/DocumentStatusHelper.java @@ -49,7 +49,10 @@ public boolean tryMarkRunning(String docId) { .eq(KnowledgeDocumentDO::getId, docId) .eq(KnowledgeDocumentDO::getDeleted, 0) .eq(KnowledgeDocumentDO::getEnabled, 1) - .ne(KnowledgeDocumentDO::getStatus, DocumentStatus.RUNNING.getCode()) + .in(KnowledgeDocumentDO::getStatus, + DocumentStatus.PENDING.getCode(), + DocumentStatus.FAILED.getCode(), + DocumentStatus.SUCCESS.getCode()) ) > 0; } @@ -112,4 +115,57 @@ public StuckRecoveryResult recoverStuckRunning(long timeoutMinutes) { public record StuckRecoveryResult(List stuckDocIds, int actualRecovered) { } + + /** + * 删除入口 CAS:pending/success/failed -> deleting。 + * + * @return true 抢占成功;false 表示分块正在进行或已被其它删除流程抢占 + */ + public boolean tryMarkDeleting(String docId) { + return documentMapper.casStatus( + docId, + List.of(DocumentStatus.PENDING.getCode(), + DocumentStatus.SUCCESS.getCode(), + DocumentStatus.FAILED.getCode()), + DocumentStatus.DELETING.getCode() + ) > 0; + } + + /** + * 分块入口 CAS:pending/failed -> running。 + * + * @return true 抢占成功;false 表示文档非空闲(运行中 / 删除中 / 已完成) + */ + public boolean tryStartChunk(String docId) { + return documentMapper.casStatus( + docId, + List.of(DocumentStatus.PENDING.getCode(), + DocumentStatus.FAILED.getCode()), + DocumentStatus.RUNNING.getCode() + ) > 0; + } + + /** + * 分块完成 CAS:running -> success。 + * + * @return true 成功写回;false 表示运行权已被删除流程或其它流程抢走 + */ + public boolean tryMarkSuccess(String docId) { + return documentMapper.casStatus( + docId, + List.of(DocumentStatus.RUNNING.getCode()), + DocumentStatus.SUCCESS.getCode() + ) > 0; + } + + /** + * 分块失败 CAS:running -> failed。 + */ + public boolean tryMarkFailed(String docId) { + return documentMapper.casStatus( + docId, + List.of(DocumentStatus.RUNNING.getCode()), + DocumentStatus.FAILED.getCode() + ) > 0; + } } diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/CleanupTaskService.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/CleanupTaskService.java new file mode 100644 index 00000000..72c86551 --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/CleanupTaskService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.service; + +/** + * outbox 清理任务入队服务。 + * 必须在删除主事务内调用,保证「业务删除」与「清理意图」原子落库。 + */ +public interface CleanupTaskService { + + /** + * 入队向量清理任务。 + */ + void enqueueVectorCleanup(String docId, String collectionName); + + /** + * 入队文件清理任务;fileUrl 为空白时跳过。 + */ + void enqueueFileCleanup(String fileUrl); +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/CleanupTaskServiceImpl.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/CleanupTaskServiceImpl.java new file mode 100644 index 00000000..07dad4c9 --- /dev/null +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/CleanupTaskServiceImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.service.impl; + +import com.nageoffer.ai.ragent.knowledge.dao.entity.FileCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.entity.VectorCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.FileCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.VectorCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.enums.CleanupTaskStatus; +import com.nageoffer.ai.ragent.knowledge.service.CleanupTaskService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +@Service +@RequiredArgsConstructor +public class CleanupTaskServiceImpl implements CleanupTaskService { + + private final VectorCleanupTaskMapper vectorMapper; + private final FileCleanupTaskMapper fileMapper; + + @Override + public void enqueueVectorCleanup(String docId, String collectionName) { + VectorCleanupTaskDO task = VectorCleanupTaskDO.builder() + .docId(docId) + .collectionName(collectionName) + .status(CleanupTaskStatus.PENDING.getCode()) + .retryCount(0) + .build(); + vectorMapper.insert(task); + } + + @Override + public void enqueueFileCleanup(String fileUrl) { + if (!StringUtils.hasText(fileUrl)) { + return; + } + FileCleanupTaskDO task = FileCleanupTaskDO.builder() + .fileUrl(fileUrl) + .status(CleanupTaskStatus.PENDING.getCode()) + .retryCount(0) + .build(); + fileMapper.insert(task); + } +} diff --git a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImpl.java b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImpl.java index 2c90b373..22b12a12 100644 --- a/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImpl.java +++ b/bootstrap/src/main/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImpl.java @@ -72,6 +72,8 @@ import com.nageoffer.ai.ragent.knowledge.handler.RemoteFileFetcher; import com.nageoffer.ai.ragent.knowledge.mq.event.KnowledgeDocumentChunkEvent; import com.nageoffer.ai.ragent.knowledge.schedule.CronScheduleHelper; +import com.nageoffer.ai.ragent.knowledge.schedule.DocumentStatusHelper; +import com.nageoffer.ai.ragent.knowledge.service.CleanupTaskService; import com.nageoffer.ai.ragent.knowledge.service.KnowledgeChunkService; import com.nageoffer.ai.ragent.knowledge.service.KnowledgeDocumentScheduleService; import com.nageoffer.ai.ragent.knowledge.service.KnowledgeDocumentService; @@ -122,6 +124,8 @@ public class KnowledgeDocumentServiceImpl implements KnowledgeDocumentService { private final MessageQueueProducer messageQueueProducer; private final KnowledgeScheduleProperties scheduleProperties; private final RemoteFileFetcher remoteFileFetcher; + private final DocumentStatusHelper documentStatusHelper; + private final CleanupTaskService cleanupTaskService; @Value("knowledge-document-chunk_topic${unique-name:}") private String chunkTopic; @@ -179,16 +183,7 @@ public void startChunk(String docId) { "文档分块", event, arg -> { - // Wrapper 更新不触发 updateTime 自动填充, 显式刷新, 使卡死恢复以分块开始时刻为基准 - int updated = documentMapper.update( - new LambdaUpdateWrapper() - .set(KnowledgeDocumentDO::getStatus, DocumentStatus.RUNNING.getCode()) - .set(KnowledgeDocumentDO::getUpdatedBy, event.getOperator()) - .set(KnowledgeDocumentDO::getUpdateTime, new Date()) - .eq(KnowledgeDocumentDO::getId, docId) - .ne(KnowledgeDocumentDO::getStatus, DocumentStatus.RUNNING.getCode()) - ); - if (updated == 0) { + if (!documentStatusHelper.tryStartChunk(docId)) { KnowledgeDocumentDO documentDO = documentMapper.selectById(docId); Assert.notNull(documentDO, () -> new ClientException("文档不存在")); throw new ClientException("文档分块操作正在进行中,请稍后再试"); @@ -273,14 +268,24 @@ private int persistChunksAndVectorsAtomically(String collectionName, String docI }) .toList(); transactionOperations.executeWithoutResult(status -> { + String latestStatus = documentMapper.selectStatusById(docId); + if (!DocumentStatus.RUNNING.getCode().equals(latestStatus)) { + log.warn("文档分块写入前状态已变更,放弃写入: docId={}, status={}", docId, latestStatus); + status.setRollbackOnly(); + throw new IllegalStateException("document status changed: " + latestStatus); + } knowledgeChunkService.deleteByDocId(docId); knowledgeChunkService.batchCreate(docId, chunks); vectorStoreService.deleteDocumentVectors(collectionName, docId); vectorStoreService.indexDocumentChunks(collectionName, docId, chunkResults); + if (!documentStatusHelper.tryMarkSuccess(docId)) { + log.warn("文档分块完成状态 CAS 失败,回滚本次 chunk 写入: docId={}", docId); + status.setRollbackOnly(); + throw new IllegalStateException("mark success failed"); + } KnowledgeDocumentDO updateDocumentDO = KnowledgeDocumentDO.builder() .id(docId) .chunkCount(chunks.size()) - .status(DocumentStatus.SUCCESS.getCode()) .updatedBy(UserContext.getUsername()) .build(); documentMapper.updateById(updateDocumentDO); @@ -443,11 +448,9 @@ public void chunkDocument(KnowledgeDocumentDO documentDO) { private void markChunkFailed(String docId) { transactionOperations.executeWithoutResult(status -> { - KnowledgeDocumentDO update = new KnowledgeDocumentDO(); - update.setId(docId); - update.setStatus(DocumentStatus.FAILED.getCode()); - update.setUpdatedBy(UserContext.getUsername()); - documentMapper.updateById(update); + if (!documentStatusHelper.tryMarkFailed(docId)) { + log.warn("分块失败状态 CAS 未命中,跳过 failed 写回: docId={}", docId); + } }); } @@ -457,9 +460,11 @@ public void delete(String docId) { KnowledgeDocumentDO documentDO = documentMapper.selectById(docId); Assert.notNull(documentDO, () -> new ClientException("文档不存在")); - // 禁止在文档分块运行时删除 - if (DocumentStatus.RUNNING.getCode().equals(documentDO.getStatus())) { - throw new ClientException("文档正在分块中,无法删除"); + // CAS 抢占删除权:pending/success/failed -> deleting。 + // 抢占失败说明文档正在分块(running)或已被其它删除流程处理,整段事务回滚。 + boolean claimed = documentStatusHelper.tryMarkDeleting(docId); + if (!claimed) { + throw new ClientException("文档正在分块或已被处理,无法删除"); } knowledgeChunkService.deleteByDocId(docId); @@ -471,9 +476,11 @@ public void delete(String docId) { documentDO.setUpdatedBy(UserContext.getUsername()); documentMapper.deleteById(documentDO); + // 外部资源(PgVector / 文件存储)不在主库事务内,改为 outbox 入队, + // 与上面的主库删除原子提交,提交后由 CleanupTaskWorker 异步执行并保证最终一致。 String collectionName = resolveCollectionName(documentDO.getKbId()); - vectorStoreService.deleteDocumentVectors(collectionName, docId); - deleteStoredFileQuietly(documentDO); + cleanupTaskService.enqueueVectorCleanup(docId, collectionName); + cleanupTaskService.enqueueFileCleanup(documentDO.getFileUrl()); } @Override @@ -907,15 +914,4 @@ public String preview(String docId) { throw new ClientException("读取文档内容失败: " + e.getMessage()); } } - - private void deleteStoredFileQuietly(KnowledgeDocumentDO documentDO) { - if (documentDO == null || !StringUtils.hasText(documentDO.getFileUrl())) { - return; - } - try { - fileStorageService.deleteByUrl(documentDO.getFileUrl()); - } catch (Exception e) { - log.warn("删除文档存储文件失败, docId={}, fileUrl={}", documentDO.getId(), documentDO.getFileUrl(), e); - } - } } diff --git a/bootstrap/src/main/resources/application.yaml b/bootstrap/src/main/resources/application.yaml index 62f39b1b..07cabb59 100644 --- a/bootstrap/src/main/resources/application.yaml +++ b/bootstrap/src/main/resources/application.yaml @@ -85,6 +85,11 @@ rag: lock-seconds: 900 batch-size: 20 min-interval-seconds: 60 + cleanup: + scan-delay-ms: 15000 + batch-size: 50 + max-retry: 5 + base-backoff-seconds: 30 mcp: servers: diff --git a/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/schedule/CleanupTaskWorkerTest.java b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/schedule/CleanupTaskWorkerTest.java new file mode 100644 index 00000000..03e13236 --- /dev/null +++ b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/schedule/CleanupTaskWorkerTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.schedule; + +import com.nageoffer.ai.ragent.knowledge.config.CleanupTaskProperties; +import com.nageoffer.ai.ragent.knowledge.dao.entity.FileCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.entity.VectorCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.FileCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.VectorCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.enums.CleanupTaskStatus; +import com.nageoffer.ai.ragent.rag.core.vector.VectorStoreService; +import com.nageoffer.ai.ragent.rag.service.FileStorageService; +import org.apache.ibatis.annotations.Update; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Locale; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CleanupTaskWorkerTest { + + @Mock private VectorCleanupTaskMapper vectorMapper; + @Mock private FileCleanupTaskMapper fileMapper; + @Mock private VectorStoreService vectorStoreService; + @Mock private FileStorageService fileStorageService; + + private CleanupTaskWorker worker; + + @BeforeEach + void setUp() { + CleanupTaskProperties props = new CleanupTaskProperties(); + props.setBatchSize(50); + props.setMaxRetry(3); + props.setBaseBackoffSeconds(30L); + worker = new CleanupTaskWorker(vectorMapper, fileMapper, vectorStoreService, fileStorageService, props); + } + + @Test + void claimProcessingSql_rechecksRetryTimeWhenClaiming() { + assertClaimProcessingRechecksRetryTime(VectorCleanupTaskMapper.class); + assertClaimProcessingRechecksRetryTime(FileCleanupTaskMapper.class); + } + + @Test + void vectorTask_success_marksSuccess() { + VectorCleanupTaskDO task = VectorCleanupTaskDO.builder() + .id("t1").docId("doc-1").collectionName("kb").status("pending").retryCount(0).build(); + when(vectorMapper.selectList(any())).thenReturn(List.of(task)); + when(vectorMapper.claimProcessing(eq("t1"), anyString(), any(Date.class), any(Date.class), anyString(), anyString())) + .thenReturn(1); + when(vectorMapper.markSuccessIfOwned(eq("t1"), anyString(), anyString(), anyString())).thenReturn(1); + when(fileMapper.selectList(any())).thenReturn(List.of()); + + worker.scan(); + + verify(vectorStoreService).deleteDocumentVectors("kb", "doc-1"); + ArgumentCaptor claimOwner = ArgumentCaptor.forClass(String.class); + ArgumentCaptor successOwner = ArgumentCaptor.forClass(String.class); + verify(vectorMapper).claimProcessing(eq("t1"), claimOwner.capture(), any(Date.class), any(Date.class), + eq(CleanupTaskStatus.RUNNING.getCode()), eq(CleanupTaskStatus.PENDING.getCode())); + verify(vectorMapper).markSuccessIfOwned(eq("t1"), successOwner.capture(), + eq(CleanupTaskStatus.SUCCESS.getCode()), eq(CleanupTaskStatus.RUNNING.getCode())); + assertThat(successOwner.getValue()).isEqualTo(claimOwner.getValue()); + assertThat(successOwner.getValue()).isNotBlank(); + } + + @Test + void vectorTask_failure_incrementsRetryAndBackoff() { + VectorCleanupTaskDO task = VectorCleanupTaskDO.builder() + .id("t1").docId("doc-1").collectionName("kb").status("pending").retryCount(0).build(); + when(vectorMapper.selectList(any())).thenReturn(List.of(task)); + when(vectorMapper.claimProcessing(eq("t1"), anyString(), any(Date.class), any(Date.class), anyString(), anyString())) + .thenReturn(1); + when(vectorMapper.markRetryIfOwned(eq("t1"), anyString(), anyString(), anyString(), + eq(1), any(Date.class), anyString())).thenReturn(1); + when(fileMapper.selectList(any())).thenReturn(List.of()); + doThrow(new RuntimeException("pgvector down")) + .when(vectorStoreService).deleteDocumentVectors(anyString(), anyString()); + + worker.scan(); + + verify(vectorMapper).markRetryIfOwned(eq("t1"), anyString(), + eq(CleanupTaskStatus.PENDING.getCode()), eq(CleanupTaskStatus.RUNNING.getCode()), + eq(1), any(Date.class), eq("pgvector down")); + } + + @Test + void vectorTask_exhaustedRetry_marksFailed() { + VectorCleanupTaskDO task = VectorCleanupTaskDO.builder() + .id("t1").docId("doc-1").collectionName("kb").status("pending").retryCount(3).build(); + when(vectorMapper.selectList(any())).thenReturn(List.of(task)); + when(vectorMapper.claimProcessing(eq("t1"), anyString(), any(Date.class), any(Date.class), anyString(), anyString())) + .thenReturn(1); + when(vectorMapper.markFailedIfOwned(eq("t1"), anyString(), anyString(), anyString(), + eq(4), anyString())).thenReturn(1); + when(fileMapper.selectList(any())).thenReturn(List.of()); + doThrow(new RuntimeException("still down")) + .when(vectorStoreService).deleteDocumentVectors(anyString(), anyString()); + + worker.scan(); + + verify(vectorMapper).markFailedIfOwned(eq("t1"), anyString(), + eq(CleanupTaskStatus.FAILED.getCode()), eq(CleanupTaskStatus.RUNNING.getCode()), + eq(4), eq("still down")); + } + + @Test + void fileTask_success_marksSuccess() { + FileCleanupTaskDO task = FileCleanupTaskDO.builder() + .id("f1").fileUrl("s3://b/x.pdf").status("pending").retryCount(0).build(); + when(vectorMapper.selectList(any())).thenReturn(List.of()); + when(fileMapper.selectList(any())).thenReturn(List.of(task)); + when(fileMapper.claimProcessing(eq("f1"), anyString(), any(Date.class), any(Date.class), anyString(), anyString())) + .thenReturn(1); + when(fileMapper.markSuccessIfOwned(eq("f1"), anyString(), anyString(), anyString())).thenReturn(1); + + worker.scan(); + + verify(fileStorageService).deleteByUrl("s3://b/x.pdf"); + ArgumentCaptor claimOwner = ArgumentCaptor.forClass(String.class); + ArgumentCaptor successOwner = ArgumentCaptor.forClass(String.class); + verify(fileMapper).claimProcessing(eq("f1"), claimOwner.capture(), any(Date.class), any(Date.class), + eq(CleanupTaskStatus.RUNNING.getCode()), eq(CleanupTaskStatus.PENDING.getCode())); + verify(fileMapper).markSuccessIfOwned(eq("f1"), successOwner.capture(), + eq(CleanupTaskStatus.SUCCESS.getCode()), eq(CleanupTaskStatus.RUNNING.getCode())); + assertThat(successOwner.getValue()).isEqualTo(claimOwner.getValue()); + assertThat(successOwner.getValue()).isNotBlank(); + } + + @Test + void vectorTask_claimLost_skipsExternalDelete() { + VectorCleanupTaskDO task = VectorCleanupTaskDO.builder() + .id("t1").docId("doc-1").collectionName("kb").status("pending").retryCount(0).build(); + when(vectorMapper.selectList(any())).thenReturn(List.of(task)); + when(vectorMapper.claimProcessing(eq("t1"), anyString(), any(Date.class), any(Date.class), anyString(), anyString())) + .thenReturn(0); + when(fileMapper.selectList(any())).thenReturn(List.of()); + + worker.scan(); + + verify(vectorStoreService, never()).deleteDocumentVectors(anyString(), anyString()); + } + + @Test + void fileTask_claimLost_skipsExternalDelete() { + FileCleanupTaskDO task = FileCleanupTaskDO.builder() + .id("f1").fileUrl("s3://b/x.pdf").status("pending").retryCount(0).build(); + when(vectorMapper.selectList(any())).thenReturn(List.of()); + when(fileMapper.selectList(any())).thenReturn(List.of(task)); + when(fileMapper.claimProcessing(eq("f1"), anyString(), any(Date.class), any(Date.class), anyString(), anyString())) + .thenReturn(0); + + worker.scan(); + + verify(fileStorageService, never()).deleteByUrl(anyString()); + } + + private void assertClaimProcessingRechecksRetryTime(Class mapperType) { + Method claimMethod = Arrays.stream(mapperType.getDeclaredMethods()) + .filter(method -> "claimProcessing".equals(method.getName())) + .findFirst() + .orElseThrow(); + assertThat(claimMethod.getParameterCount()).isEqualTo(6); + + Update update = claimMethod.getAnnotation(Update.class); + assertThat(update).isNotNull(); + String sql = String.join(" ", update.value()).toLowerCase(Locale.ROOT).replaceAll("\\s+", " "); + assertThat(sql).contains("next_retry_time"); + assertThat(sql).contains("next_retry_time is null"); + assertThat(sql).contains("next_retry_time <= #{now}"); + } +} diff --git a/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/CleanupTaskServiceImplTest.java b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/CleanupTaskServiceImplTest.java new file mode 100644 index 00000000..ceab7a65 --- /dev/null +++ b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/CleanupTaskServiceImplTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.service; + +import com.nageoffer.ai.ragent.knowledge.dao.entity.FileCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.entity.VectorCleanupTaskDO; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.FileCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.VectorCleanupTaskMapper; +import com.nageoffer.ai.ragent.knowledge.enums.CleanupTaskStatus; +import com.nageoffer.ai.ragent.knowledge.service.impl.CleanupTaskServiceImpl; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class CleanupTaskServiceImplTest { + + @Mock + private VectorCleanupTaskMapper vectorMapper; + @Mock + private FileCleanupTaskMapper fileMapper; + @InjectMocks + private CleanupTaskServiceImpl service; + + @Test + void enqueueVectorCleanup_insertsPendingTask() { + service.enqueueVectorCleanup("doc-1", "kb_collection"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(VectorCleanupTaskDO.class); + verify(vectorMapper).insert(captor.capture()); + VectorCleanupTaskDO task = captor.getValue(); + assertThat(task.getDocId()).isEqualTo("doc-1"); + assertThat(task.getCollectionName()).isEqualTo("kb_collection"); + assertThat(task.getStatus()).isEqualTo(CleanupTaskStatus.PENDING.getCode()); + assertThat(task.getRetryCount()).isZero(); + } + + @Test + void enqueueFileCleanup_skipsBlankUrl() { + service.enqueueFileCleanup(" "); + verify(fileMapper, org.mockito.Mockito.never()) + .insert(org.mockito.ArgumentMatchers.any(FileCleanupTaskDO.class)); + } + + @Test + void enqueueFileCleanup_insertsPendingTask() { + service.enqueueFileCleanup("s3://bucket/doc.pdf"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(FileCleanupTaskDO.class); + verify(fileMapper).insert(captor.capture()); + assertThat(captor.getValue().getFileUrl()).isEqualTo("s3://bucket/doc.pdf"); + assertThat(captor.getValue().getStatus()).isEqualTo(CleanupTaskStatus.PENDING.getCode()); + } +} diff --git a/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/DocumentDeleteRaceTest.java b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/DocumentDeleteRaceTest.java new file mode 100644 index 00000000..47589766 --- /dev/null +++ b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/DocumentDeleteRaceTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.service; + +import com.nageoffer.ai.ragent.knowledge.dao.mapper.KnowledgeDocumentMapper; +import com.nageoffer.ai.ragent.knowledge.enums.DocumentStatus; +import com.nageoffer.ai.ragent.knowledge.schedule.DocumentStatusHelper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DocumentDeleteRaceTest { + + @Mock + private KnowledgeDocumentMapper documentMapper; + @InjectMocks + private DocumentStatusHelper documentStatusHelper; + + @Test + void tryMarkDeleting_succeedsWhenIdle() { + when(documentMapper.casStatus(eq("doc-1"), anyList(), eq(DocumentStatus.DELETING.getCode()))) + .thenReturn(1); + assertThat(documentStatusHelper.tryMarkDeleting("doc-1")).isTrue(); + } + + @Test + void tryMarkDeleting_failsWhenRunning() { + // 模拟分块已抢占(CAS 命中 0 行) + when(documentMapper.casStatus(eq("doc-1"), anyList(), eq(DocumentStatus.DELETING.getCode()))) + .thenReturn(0); + assertThat(documentStatusHelper.tryMarkDeleting("doc-1")).isFalse(); + } + + @Test + void tryStartChunk_failsWhenDeleting() { + when(documentMapper.casStatus(eq("doc-1"), anyList(), eq(DocumentStatus.RUNNING.getCode()))) + .thenReturn(0); + assertThat(documentStatusHelper.tryStartChunk("doc-1")).isFalse(); + } +} diff --git a/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImplRaceTest.java b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImplRaceTest.java new file mode 100644 index 00000000..8bcbe221 --- /dev/null +++ b/bootstrap/src/test/java/com/nageoffer/ai/ragent/knowledge/service/impl/KnowledgeDocumentServiceImplRaceTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nageoffer.ai.ragent.knowledge.service.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.nageoffer.ai.ragent.core.chunk.ChunkEmbeddingService; +import com.nageoffer.ai.ragent.core.chunk.ChunkingStrategyFactory; +import com.nageoffer.ai.ragent.core.chunk.VectorChunk; +import com.nageoffer.ai.ragent.core.parser.DocumentParserSelector; +import com.nageoffer.ai.ragent.framework.mq.producer.MessageQueueProducer; +import com.nageoffer.ai.ragent.ingestion.dao.mapper.IngestionPipelineMapper; +import com.nageoffer.ai.ragent.ingestion.engine.IngestionEngine; +import com.nageoffer.ai.ragent.ingestion.service.IngestionPipelineService; +import com.nageoffer.ai.ragent.knowledge.config.KnowledgeScheduleProperties; +import com.nageoffer.ai.ragent.knowledge.dao.entity.KnowledgeDocumentDO; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.KnowledgeBaseMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.KnowledgeChunkMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.KnowledgeDocumentChunkLogMapper; +import com.nageoffer.ai.ragent.knowledge.dao.mapper.KnowledgeDocumentMapper; +import com.nageoffer.ai.ragent.knowledge.enums.DocumentStatus; +import com.nageoffer.ai.ragent.knowledge.handler.RemoteFileFetcher; +import com.nageoffer.ai.ragent.knowledge.schedule.DocumentStatusHelper; +import com.nageoffer.ai.ragent.knowledge.service.CleanupTaskService; +import com.nageoffer.ai.ragent.knowledge.service.KnowledgeChunkService; +import com.nageoffer.ai.ragent.knowledge.service.KnowledgeDocumentScheduleService; +import com.nageoffer.ai.ragent.rag.core.vector.VectorStoreService; +import com.nageoffer.ai.ragent.rag.service.FileStorageService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionOperations; + +import java.util.List; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class KnowledgeDocumentServiceImplRaceTest { + + @Mock private KnowledgeBaseMapper knowledgeBaseMapper; + @Mock private KnowledgeDocumentMapper documentMapper; + @Mock private DocumentParserSelector parserSelector; + @Mock private ChunkingStrategyFactory chunkingStrategyFactory; + @Mock private FileStorageService fileStorageService; + @Mock private VectorStoreService vectorStoreService; + @Mock private KnowledgeChunkService knowledgeChunkService; + @Mock private ObjectMapper objectMapper; + @Mock private KnowledgeDocumentScheduleService scheduleService; + @Mock private IngestionPipelineService ingestionPipelineService; + @Mock private IngestionPipelineMapper ingestionPipelineMapper; + @Mock private IngestionEngine ingestionEngine; + @Mock private ChunkEmbeddingService chunkEmbeddingService; + @Mock private KnowledgeDocumentChunkLogMapper chunkLogMapper; + @Mock private KnowledgeChunkMapper chunkMapper; + @Mock private TransactionOperations transactionOperations; + @Mock private MessageQueueProducer messageQueueProducer; + @Mock private KnowledgeScheduleProperties scheduleProperties; + @Mock private RemoteFileFetcher remoteFileFetcher; + @Mock private DocumentStatusHelper documentStatusHelper; + @Mock private CleanupTaskService cleanupTaskService; + @Mock private TransactionStatus transactionStatus; + + @InjectMocks + private KnowledgeDocumentServiceImpl service; + + @Test + void persistChunksAndVectorsAtomically_writesChunksAndReturnsCount() { + runTransactionCallback(); + when(documentMapper.selectStatusById("doc-1")).thenReturn(DocumentStatus.RUNNING.getCode()); + when(documentStatusHelper.tryMarkSuccess("doc-1")).thenReturn(true); + + Integer saved = invokePersist(); + + assertThat(saved).isEqualTo(1); + verify(documentStatusHelper).tryMarkSuccess("doc-1"); + ArgumentCaptor captor = ArgumentCaptor.forClass(KnowledgeDocumentDO.class); + verify(documentMapper).updateById(captor.capture()); + assertThat(captor.getValue().getChunkCount()).isEqualTo(1); + } + + @Test + void persistChunksAndVectorsAtomically_whenDocumentNoLongerRunning_rollsBackAndSkipsWrites() { + runTransactionCallback(); + when(documentMapper.selectStatusById("doc-1")).thenReturn(DocumentStatus.DELETING.getCode()); + + assertThatThrownBy(this::invokePersist) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("document status changed"); + + verify(transactionStatus).setRollbackOnly(); + verifyNoInteractions(knowledgeChunkService, vectorStoreService); + verify(documentMapper, never()).updateById(any(KnowledgeDocumentDO.class)); + } + + @Test + void persistChunksAndVectorsAtomically_whenSuccessCasFailsAfterVectorWrite_doesNotEnqueueVectorCleanup() { + runTransactionCallback(); + when(documentMapper.selectStatusById("doc-1")).thenReturn(DocumentStatus.RUNNING.getCode()); + when(documentStatusHelper.tryMarkSuccess("doc-1")).thenReturn(false); + + assertThatThrownBy(this::invokePersist) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("mark success failed"); + + verify(transactionStatus).setRollbackOnly(); + verify(vectorStoreService).deleteDocumentVectors("collection", "doc-1"); + verify(vectorStoreService).indexDocumentChunks(eq("collection"), eq("doc-1"), any()); + verifyNoInteractions(cleanupTaskService); + verify(documentMapper, never()).updateById(any(KnowledgeDocumentDO.class)); + } + + private void runTransactionCallback() { + doAnswer(invocation -> { + Consumer callback = invocation.getArgument(0); + callback.accept(transactionStatus); + return null; + }).when(transactionOperations).executeWithoutResult(any()); + } + + private Integer invokePersist() { + List chunks = List.of(VectorChunk.builder() + .chunkId("chunk-1") + .index(0) + .content("content") + .build()); + return (Integer) ReflectionTestUtils.invokeMethod( + service, + "persistChunksAndVectorsAtomically", + "collection", + "doc-1", + chunks + ); + } +} diff --git a/resources/database/schema_pg.sql b/resources/database/schema_pg.sql index 23a4416e..649fcc66 100644 --- a/resources/database/schema_pg.sql +++ b/resources/database/schema_pg.sql @@ -415,6 +415,59 @@ CREATE INDEX idx_ingestion_task_node_pipeline ON t_ingestion_task_node (pipeline CREATE INDEX idx_ingestion_task_node_status ON t_ingestion_task_node (status); COMMENT ON TABLE t_ingestion_task_node IS '摄取任务节点表'; +-- ============================================ +-- Cleanup Task Tables (outbox for external resource deletion) +-- ============================================ + +CREATE TABLE t_vector_cleanup_task ( + id VARCHAR(20) NOT NULL PRIMARY KEY, + doc_id VARCHAR(20) NOT NULL, + collection_name VARCHAR(64) NOT NULL, + status VARCHAR(16) NOT NULL DEFAULT 'pending', + retry_count INTEGER NOT NULL DEFAULT 0, + next_retry_time TIMESTAMP, + lock_owner VARCHAR(128), + lock_until TIMESTAMP, + error_message TEXT, + create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX idx_vct_status_next ON t_vector_cleanup_task (status, next_retry_time); +CREATE INDEX idx_vct_status_lock ON t_vector_cleanup_task (status, lock_until); +CREATE INDEX idx_vct_doc_id ON t_vector_cleanup_task (doc_id); +COMMENT ON TABLE t_vector_cleanup_task IS '向量清理任务表(outbox)'; +COMMENT ON COLUMN t_vector_cleanup_task.doc_id IS '文档ID'; +COMMENT ON COLUMN t_vector_cleanup_task.collection_name IS '向量集合名'; +COMMENT ON COLUMN t_vector_cleanup_task.status IS '状态:pending/running/success/failed'; +COMMENT ON COLUMN t_vector_cleanup_task.retry_count IS '已重试次数'; +COMMENT ON COLUMN t_vector_cleanup_task.next_retry_time IS '下次执行时间'; +COMMENT ON COLUMN t_vector_cleanup_task.lock_owner IS '当前领取者'; +COMMENT ON COLUMN t_vector_cleanup_task.lock_until IS '领取租约到期时间'; +COMMENT ON COLUMN t_vector_cleanup_task.error_message IS '错误信息'; + +CREATE TABLE t_file_cleanup_task ( + id VARCHAR(20) NOT NULL PRIMARY KEY, + file_url VARCHAR(1024) NOT NULL, + status VARCHAR(16) NOT NULL DEFAULT 'pending', + retry_count INTEGER NOT NULL DEFAULT 0, + next_retry_time TIMESTAMP, + lock_owner VARCHAR(128), + lock_until TIMESTAMP, + error_message TEXT, + create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX idx_fct_status_next ON t_file_cleanup_task (status, next_retry_time); +CREATE INDEX idx_fct_status_lock ON t_file_cleanup_task (status, lock_until); +COMMENT ON TABLE t_file_cleanup_task IS '文件清理任务表(outbox)'; +COMMENT ON COLUMN t_file_cleanup_task.file_url IS '文件存储地址'; +COMMENT ON COLUMN t_file_cleanup_task.status IS '状态:pending/running/success/failed'; +COMMENT ON COLUMN t_file_cleanup_task.retry_count IS '已重试次数'; +COMMENT ON COLUMN t_file_cleanup_task.next_retry_time IS '下次执行时间'; +COMMENT ON COLUMN t_file_cleanup_task.lock_owner IS '当前领取者'; +COMMENT ON COLUMN t_file_cleanup_task.lock_until IS '领取租约到期时间'; +COMMENT ON COLUMN t_file_cleanup_task.error_message IS '错误信息'; + -- ============================================ -- Vector Storage Table (pgvector) -- ============================================ diff --git a/resources/database/upgrade_v1.2_to_v1.3.sql b/resources/database/upgrade_v1.2_to_v1.3.sql new file mode 100644 index 00000000..80e49c25 --- /dev/null +++ b/resources/database/upgrade_v1.2_to_v1.3.sql @@ -0,0 +1,37 @@ +-- ragent v1.2 -> v1.3 升级脚本 +-- issue #42:文档删除与分块并发竞态修复 +-- 1) t_knowledge_document.status 新增 deleting 状态(无需 DDL,VARCHAR(16) 兼容,仅说明) +-- 2) 新增两张 outbox 清理任务表,用于 PgVector / 文件存储删除的最终一致兜底 +-- status:pending/running/success/failed + +CREATE TABLE t_vector_cleanup_task ( + id VARCHAR(20) NOT NULL PRIMARY KEY, + doc_id VARCHAR(20) NOT NULL, + collection_name VARCHAR(64) NOT NULL, + status VARCHAR(16) NOT NULL DEFAULT 'pending', + retry_count INTEGER NOT NULL DEFAULT 0, + next_retry_time TIMESTAMP, + lock_owner VARCHAR(128), + lock_until TIMESTAMP, + error_message TEXT, + create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX idx_vct_status_next ON t_vector_cleanup_task (status, next_retry_time); +CREATE INDEX idx_vct_status_lock ON t_vector_cleanup_task (status, lock_until); +CREATE INDEX idx_vct_doc_id ON t_vector_cleanup_task (doc_id); + +CREATE TABLE t_file_cleanup_task ( + id VARCHAR(20) NOT NULL PRIMARY KEY, + file_url VARCHAR(1024) NOT NULL, + status VARCHAR(16) NOT NULL DEFAULT 'pending', + retry_count INTEGER NOT NULL DEFAULT 0, + next_retry_time TIMESTAMP, + lock_owner VARCHAR(128), + lock_until TIMESTAMP, + error_message TEXT, + create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +CREATE INDEX idx_fct_status_next ON t_file_cleanup_task (status, next_retry_time); +CREATE INDEX idx_fct_status_lock ON t_file_cleanup_task (status, lock_until);