Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,34 +269,48 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
)
engineExecutionContext.getProperties
.put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, errorIndex.toString)
// jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的
var newIndex = index
var newErrorIndex = errorIndex
if (adjustErrorIndexForSetScenarios(engineConnTask)) {
newIndex = index - 1
newErrorIndex = errorIndex + 1
}
// 重试的时候如果执行过则跳过执行
if (retryEnable && errorIndex > 0 && index < newErrorIndex) {
val code = codes(index).trim.toUpperCase()
val shouldSkip = !isContextStatement(code)
val props: util.Map[String, String] = engineCreationContext.getOptions
val taskRetry: String =
props.getOrDefault("linkis.task.retry.switch", "false").toString
if (java.lang.Boolean.parseBoolean(taskRetry)) {
// jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的
var newIndex = index
var newErrorIndex = errorIndex
if (adjustErrorIndexForSetScenarios(engineConnTask)) {
newIndex = index - 1
newErrorIndex = errorIndex + 1
}
// 重试的时候如果执行过则跳过执行
if (retryEnable && errorIndex > 0 && index < newErrorIndex) {
val code = codes(index).trim.toUpperCase()
val shouldSkip = !isContextStatement(code)

if (shouldSkip) {
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip."
)
)
executeFlag = false
} else {
if (newIndex >= 0) {
if (shouldSkip) {
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute."
s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip."
)
)
executeFlag = false
} else {
if (newIndex >= 0) {
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute."
)
)
}
}
}
} else {
if (retryEnable && errorIndex > 0 && index < errorIndex) {
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"aisql retry with errorIndex: ${errorIndex}, current sql index: ${index} will skip."
)
)
executeFlag = false
}
}
if (executeFlag) {
val code = codes(index)
Expand Down Expand Up @@ -461,17 +475,31 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
def getProgressInfo(taskID: String): Array[JobProgressInfo]

/**
* 调整错误索引:直接匹配三种SET语句场景 因为SET语句会被解析器视为第一条SQL
* 检测是否为需要调整错误索引的JDBC SET语句场景
*/
protected def adjustErrorIndexForSetScenarios(engineConnTask: EngineConnTask): Boolean = {
val executionCode = engineConnTask.getCode
val engineTypeLabel = engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get
val engineType = engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType
var result = false
if (executionCode != null && engineType.equals(EngineType.JDBC.toString)) {
val upperCode = executionCode.toUpperCase().trim
val jdbcSetPrefixes = ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",")
result = jdbcSetPrefixes.exists(upperCode.startsWith)
Utils.tryAndWarn {
val executionCode = engineConnTask.getCode
if (StringUtils.isEmpty(executionCode)) {
return result
}

val engineTypeLabel = engineConnTask.getLables.collectFirst { case label: EngineTypeLabel =>
label
}

result = engineTypeLabel.exists { label =>
val engineType = label.getEngineType
if (engineType.equals(EngineType.JDBC.toString)) {
val upperCode = executionCode.toUpperCase().trim
val jdbcSetPrefixes =
ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",")
jdbcSetPrefixes.exists(upperCode.startsWith)
} else {
false
}
}
}
result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.ByteTimeUtils;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
import org.apache.linkis.entrance.exception.EntranceErrorException;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.log.LogHandler;
Expand Down Expand Up @@ -159,7 +160,8 @@ public ExecuteRequest jobToExecuteRequest() throws EntranceErrorException {
if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) {
// 修复:任务重试背景下,10:59分提交任务执行,重试时时间变成11:00,重试任务会重新生成结果目录,导致查询结果集时,重试之前执行的结果集丢失
// 新增判断:生成结果目录之前,判断任务之前是否生成结果集,生成过就复用
if (org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) {
if (((Boolean) EntranceConfiguration.TASK_RETRY_SWITCH().getValue())
&& org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) {
resultSetPathRoot = jobRequest.getResultLocation();
} else {
String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ public boolean onJobFailed(
AtomicBoolean canRetry = new AtomicBoolean(false);
String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key();

if (engineType.equals(EngineType.JDBC().toString()) && StringUtils.isNotBlank(errorDescRegex)) {
if (((Boolean) EntranceConfiguration.TASK_RETRY_SWITCH().getValue())
&& engineType.equals(EngineType.JDBC().toString())
&& StringUtils.isNotBlank(errorDescRegex)) {
// JDBC执行正则匹配
for (String regex : errorDescRegex.split(",")) {
String trimmedRegex = regex.trim();
Expand Down
Loading