Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nageoffer.ai.ragent.knowledge.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;

/**
* outbox 清理任务 worker 配置
*/
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "rag.knowledge.cleanup")
public class CleanupTaskProperties {

/** worker 扫描间隔(毫秒) */
private Long scanDelayMs = 15000L;

/** 每次扫描批量大小 */
private Integer batchSize = 50;

/** 最大重试次数,超过置为 failed 并告警 */
private Integer maxRetry = 5;

/** 重试退避基数(秒),实际退避 = baseBackoffSeconds * 2^retryCount */
private Long baseBackoffSeconds = 30L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nageoffer.ai.ragent.knowledge.dao.entity;

import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

/**
* 文件清理任务(outbox)。
* 删除主事务内写入,由 CleanupTaskWorker 异步执行文件存储删除并保证最终一致。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@TableName("t_file_cleanup_task")
public class FileCleanupTaskDO {

@TableId(type = IdType.ASSIGN_ID)
private String id;

/** 文件存储地址(fileUrl) */
private String fileUrl;

/** 任务状态:pending/success/failed,见 CleanupTaskStatus */
private String status;

/** 已重试次数 */
private Integer retryCount;

/** 下次可执行时间(退避重试) */
private Date nextRetryTime;

/** 当前领取者 */
private String lockOwner;

/** 领取租约到期时间 */
private Date lockUntil;

/** 最近一次错误信息 */
private String errorMessage;

@TableField(fill = FieldFill.INSERT)
private Date createTime;

@TableField(fill = FieldFill.INSERT_UPDATE)
private Date updateTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nageoffer.ai.ragent.knowledge.dao.entity;

import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

/**
* 向量清理任务(outbox)。
* 删除主事务内写入,由 CleanupTaskWorker 异步执行 PgVector 删除并保证最终一致。
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@TableName("t_vector_cleanup_task")
public class VectorCleanupTaskDO {

@TableId(type = IdType.ASSIGN_ID)
private String id;

/** 文档 ID */
private String docId;

/** 向量集合名(collection_name) */
private String collectionName;

/** 任务状态:pending/success/failed,见 CleanupTaskStatus */
private String status;

/** 已重试次数 */
private Integer retryCount;

/** 下次可执行时间(退避重试) */
private Date nextRetryTime;

/** 当前领取者 */
private String lockOwner;

/** 领取租约到期时间 */
private Date lockUntil;

/** 最近一次错误信息 */
private String errorMessage;

@TableField(fill = FieldFill.INSERT)
private Date createTime;

@TableField(fill = FieldFill.INSERT_UPDATE)
private Date updateTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nageoffer.ai.ragent.knowledge.dao.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.nageoffer.ai.ragent.knowledge.dao.entity.FileCleanupTaskDO;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

import java.util.Date;

public interface FileCleanupTaskMapper extends BaseMapper<FileCleanupTaskDO> {

@Update("UPDATE t_file_cleanup_task "
+ "SET status = #{running}, lock_owner = #{lockOwner}, lock_until = #{lockUntil}, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{pending} "
+ "AND (next_retry_time IS NULL OR next_retry_time <= #{now})")
int claimProcessing(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("lockUntil") Date lockUntil, @Param("now") Date now,
@Param("running") String runningCode, @Param("pending") String pendingCode);

@Update("UPDATE t_file_cleanup_task "
+ "SET status = #{success}, lock_owner = NULL, lock_until = NULL, error_message = NULL, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}")
int markSuccessIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("success") String successCode, @Param("running") String runningCode);

@Update("UPDATE t_file_cleanup_task "
+ "SET status = #{pending}, retry_count = #{retryCount}, next_retry_time = #{nextRetryTime}, "
+ "error_message = #{errorMessage}, lock_owner = NULL, lock_until = NULL, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}")
int markRetryIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("pending") String pendingCode, @Param("running") String runningCode,
@Param("retryCount") int retryCount, @Param("nextRetryTime") Date nextRetryTime,
@Param("errorMessage") String errorMessage);

@Update("UPDATE t_file_cleanup_task "
+ "SET status = #{failed}, retry_count = #{retryCount}, error_message = #{errorMessage}, "
+ "lock_owner = NULL, lock_until = NULL, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}")
int markFailedIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("failed") String failedCode, @Param("running") String runningCode,
@Param("retryCount") int retryCount, @Param("errorMessage") String errorMessage);

@Update("UPDATE t_file_cleanup_task "
+ "SET status = #{pending}, lock_owner = NULL, lock_until = NULL, update_time = NOW() "
+ "WHERE status = #{running} AND lock_until <= #{now}")
int recoverExpiredProcessing(@Param("now") Date now,
@Param("running") String runningCode, @Param("pending") String pendingCode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,35 @@

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.nageoffer.ai.ragent.knowledge.dao.entity.KnowledgeDocumentDO;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

import java.util.List;

public interface KnowledgeDocumentMapper extends BaseMapper<KnowledgeDocumentDO> {

/**
* CAS 抢占文档状态:仅当当前状态命中 fromStatuses 且未删除时,原子置为 toStatus。
*
* @return 受影响行数;0 表示并发冲突(状态已被其它流程改写)
*/
@Update("<script>"
+ "UPDATE t_knowledge_document "
+ "SET status = #{toStatus}, update_time = NOW() "
+ "WHERE id = #{docId} AND deleted = 0 "
+ "AND status IN "
+ "<foreach collection='fromStatuses' item='s' open='(' close=')' separator=','>#{s}</foreach>"
+ "</script>")
int casStatus(@Param("docId") String docId,
@Param("fromStatuses") List<String> fromStatuses,
@Param("toStatus") String toStatus);

/**
* 仅查询文档当前状态(不走逻辑删除过滤,供分块中段二次校验使用)。
*
* @return status 字符串;文档不存在时返回 null
*/
@Select("SELECT status FROM t_knowledge_document WHERE id = #{docId}")
String selectStatusById(@Param("docId") String docId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nageoffer.ai.ragent.knowledge.dao.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.nageoffer.ai.ragent.knowledge.dao.entity.VectorCleanupTaskDO;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

import java.util.Date;

public interface VectorCleanupTaskMapper extends BaseMapper<VectorCleanupTaskDO> {

@Update("UPDATE t_vector_cleanup_task "
+ "SET status = #{running}, lock_owner = #{lockOwner}, lock_until = #{lockUntil}, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{pending} "
+ "AND (next_retry_time IS NULL OR next_retry_time <= #{now})")
int claimProcessing(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("lockUntil") Date lockUntil, @Param("now") Date now,
@Param("running") String runningCode, @Param("pending") String pendingCode);

@Update("UPDATE t_vector_cleanup_task "
+ "SET status = #{success}, lock_owner = NULL, lock_until = NULL, error_message = NULL, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}")
int markSuccessIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("success") String successCode, @Param("running") String runningCode);

@Update("UPDATE t_vector_cleanup_task "
+ "SET status = #{pending}, retry_count = #{retryCount}, next_retry_time = #{nextRetryTime}, "
+ "error_message = #{errorMessage}, lock_owner = NULL, lock_until = NULL, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}")
int markRetryIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("pending") String pendingCode, @Param("running") String runningCode,
@Param("retryCount") int retryCount, @Param("nextRetryTime") Date nextRetryTime,
@Param("errorMessage") String errorMessage);

@Update("UPDATE t_vector_cleanup_task "
+ "SET status = #{failed}, retry_count = #{retryCount}, error_message = #{errorMessage}, "
+ "lock_owner = NULL, lock_until = NULL, update_time = NOW() "
+ "WHERE id = #{id} AND status = #{running} AND lock_owner = #{lockOwner}")
int markFailedIfOwned(@Param("id") String id, @Param("lockOwner") String lockOwner,
@Param("failed") String failedCode, @Param("running") String runningCode,
@Param("retryCount") int retryCount, @Param("errorMessage") String errorMessage);

@Update("UPDATE t_vector_cleanup_task "
+ "SET status = #{pending}, lock_owner = NULL, lock_until = NULL, update_time = NOW() "
+ "WHERE status = #{running} AND lock_until <= #{now}")
int recoverExpiredProcessing(@Param("now") Date now,
@Param("running") String runningCode, @Param("pending") String pendingCode);
}
Loading