diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index b33258fa31..44c79f8d38 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -269,34 +269,48 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) ) engineExecutionContext.getProperties .put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, errorIndex.toString) - // jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的 - var newIndex = index - var newErrorIndex = errorIndex - if (adjustErrorIndexForSetScenarios(engineConnTask)) { - newIndex = index - 1 - newErrorIndex = errorIndex + 1 - } - // 重试的时候如果执行过则跳过执行 - if (retryEnable && errorIndex > 0 && index < newErrorIndex) { - val code = codes(index).trim.toUpperCase() - val shouldSkip = !isContextStatement(code) + val props: util.Map[String, String] = engineCreationContext.getOptions + val taskRetry: String = + props.getOrDefault("linkis.task.retry.switch", "false").toString + if (java.lang.Boolean.parseBoolean(taskRetry)) { + // jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的 + var newIndex = index + var newErrorIndex = errorIndex + if (adjustErrorIndexForSetScenarios(engineConnTask)) { + newIndex = index - 1 + newErrorIndex = errorIndex + 1 + } + // 重试的时候如果执行过则跳过执行 + if (retryEnable && errorIndex > 0 && index < newErrorIndex) { + val code = codes(index).trim.toUpperCase() + val shouldSkip = !isContextStatement(code) - if (shouldSkip) { - engineExecutionContext.appendStdout( - LogUtils.generateInfo( - s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip." - ) - ) - executeFlag = false - } else { - if (newIndex >= 0) { + if (shouldSkip) { engineExecutionContext.appendStdout( LogUtils.generateInfo( - s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute." + s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip." ) ) + executeFlag = false + } else { + if (newIndex >= 0) { + engineExecutionContext.appendStdout( + LogUtils.generateInfo( + s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute." + ) + ) + } } } + } else { + if (retryEnable && errorIndex > 0 && index < errorIndex) { + engineExecutionContext.appendStdout( + LogUtils.generateInfo( + s"aisql retry with errorIndex: ${errorIndex}, current sql index: ${index} will skip." + ) + ) + executeFlag = false + } } if (executeFlag) { val code = codes(index) @@ -461,17 +475,31 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) def getProgressInfo(taskID: String): Array[JobProgressInfo] /** - * 调整错误索引:直接匹配三种SET语句场景 因为SET语句会被解析器视为第一条SQL + * 检测是否为需要调整错误索引的JDBC SET语句场景 */ protected def adjustErrorIndexForSetScenarios(engineConnTask: EngineConnTask): Boolean = { - val executionCode = engineConnTask.getCode - val engineTypeLabel = engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get - val engineType = engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType var result = false - if (executionCode != null && engineType.equals(EngineType.JDBC.toString)) { - val upperCode = executionCode.toUpperCase().trim - val jdbcSetPrefixes = ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") - result = jdbcSetPrefixes.exists(upperCode.startsWith) + Utils.tryAndWarn { + val executionCode = engineConnTask.getCode + if (StringUtils.isEmpty(executionCode)) { + return result + } + + val engineTypeLabel = engineConnTask.getLables.collectFirst { case label: EngineTypeLabel => + label + } + + result = engineTypeLabel.exists { label => + val engineType = label.getEngineType + if (engineType.equals(EngineType.JDBC.toString)) { + val upperCode = executionCode.toUpperCase().trim + val jdbcSetPrefixes = + ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") + jdbcSetPrefixes.exists(upperCode.startsWith) + } else { + false + } + } } result } diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java index 4c18b23b62..1ebf15dc9c 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java @@ -19,6 +19,7 @@ import org.apache.linkis.common.log.LogUtils; import org.apache.linkis.common.utils.ByteTimeUtils; +import org.apache.linkis.entrance.conf.EntranceConfiguration; import org.apache.linkis.entrance.exception.EntranceErrorException; import org.apache.linkis.entrance.execute.EntranceJob; import org.apache.linkis.entrance.log.LogHandler; @@ -159,7 +160,8 @@ public ExecuteRequest jobToExecuteRequest() throws EntranceErrorException { if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) { // 修复:任务重试背景下,10:59分提交任务执行,重试时时间变成11:00,重试任务会重新生成结果目录,导致查询结果集时,重试之前执行的结果集丢失 // 新增判断:生成结果目录之前,判断任务之前是否生成结果集,生成过就复用 - if (org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) { + if (((Boolean) EntranceConfiguration.TASK_RETRY_SWITCH().getValue()) + && org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) { resultSetPathRoot = jobRequest.getResultLocation(); } else { String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest); diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index 39b3f58c71..883922e32d 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -167,7 +167,9 @@ public boolean onJobFailed( AtomicBoolean canRetry = new AtomicBoolean(false); String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key(); - if (engineType.equals(EngineType.JDBC().toString()) && StringUtils.isNotBlank(errorDescRegex)) { + if (((Boolean) EntranceConfiguration.TASK_RETRY_SWITCH().getValue()) + && engineType.equals(EngineType.JDBC().toString()) + && StringUtils.isNotBlank(errorDescRegex)) { // JDBC执行正则匹配 for (String regex : errorDescRegex.split(",")) { String trimmedRegex = regex.trim();