From 2d31ae6247e4c7e855e57cbfb90b0e4156ea70d4 Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Fri, 27 Mar 2026 15:43:54 +0800 Subject: [PATCH 1/4] =?UTF-8?q?#AI=20commit#=20=E5=BC=80=E5=8F=91=E9=98=B6?= =?UTF-8?q?=E6=AE=B5=EF=BC=9A=20=E4=BF=AE=E5=A4=8Dsr=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AF=BC=E8=87=B4=E5=8A=A0=E8=BD=BDinit=5Fsq?= =?UTF-8?q?l=E5=BC=82=E5=B8=B8bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/ComputationExecutor.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index b33258fa31..8a3887b298 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -461,19 +461,29 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) def getProgressInfo(taskID: String): Array[JobProgressInfo] /** - * 调整错误索引:直接匹配三种SET语句场景 因为SET语句会被解析器视为第一条SQL + * 检测是否为需要调整错误索引的JDBC SET语句场景 */ protected def adjustErrorIndexForSetScenarios(engineConnTask: EngineConnTask): Boolean = { val executionCode = engineConnTask.getCode - val engineTypeLabel = engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get - val engineType = engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType - var result = false - if (executionCode != null && engineType.equals(EngineType.JDBC.toString)) { - val upperCode = executionCode.toUpperCase().trim - val jdbcSetPrefixes = ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") - result = jdbcSetPrefixes.exists(upperCode.startsWith) + if (StringUtils.isEmpty(executionCode)) { + return false + } + + val engineTypeLabel = engineConnTask.getLables.collectFirst { case label: EngineTypeLabel => + label + } + + engineTypeLabel.exists { label => + val engineType = label.getEngineType + if (engineType == EngineType.JDBC.toString) { + val upperCode = executionCode.toUpperCase().trim + val jdbcSetPrefixes = + ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") + jdbcSetPrefixes.exists(upperCode.startsWith) + } else { + false + } } - result } protected def createEngineExecutionContext( From d9af781e3ff2fedd3b5ed7d8a8346cfc1317337b Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Fri, 27 Mar 2026 16:07:51 +0800 Subject: [PATCH 2/4] =?UTF-8?q?#AI=20commit#=20=E5=BC=80=E5=8F=91=E9=98=B6?= =?UTF-8?q?=E6=AE=B5=EF=BC=9A=20=E4=BF=AE=E5=A4=8Dsr=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AF=BC=E8=87=B4=E5=8A=A0=E8=BD=BDinit=5Fsq?= =?UTF-8?q?l=E5=BC=82=E5=B8=B8bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/ComputationExecutor.scala | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index 8a3887b298..b4d1060f5c 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -464,26 +464,30 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) * 检测是否为需要调整错误索引的JDBC SET语句场景 */ protected def adjustErrorIndexForSetScenarios(engineConnTask: EngineConnTask): Boolean = { - val executionCode = engineConnTask.getCode - if (StringUtils.isEmpty(executionCode)) { - return false - } + var result = false + Utils.tryAndWarn { + val executionCode = engineConnTask.getCode + if (StringUtils.isEmpty(executionCode)) { + return result + } - val engineTypeLabel = engineConnTask.getLables.collectFirst { case label: EngineTypeLabel => - label - } + val engineTypeLabel = engineConnTask.getLables.collectFirst { case label: EngineTypeLabel => + label + } - engineTypeLabel.exists { label => - val engineType = label.getEngineType - if (engineType == EngineType.JDBC.toString) { - val upperCode = executionCode.toUpperCase().trim - val jdbcSetPrefixes = - ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") - jdbcSetPrefixes.exists(upperCode.startsWith) - } else { - false + result = engineTypeLabel.exists { label => + val engineType = label.getEngineType + if (engineType == EngineType.JDBC.toString) { + val upperCode = executionCode.toUpperCase().trim + val jdbcSetPrefixes = + ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") + jdbcSetPrefixes.exists(upperCode.startsWith) + } else { + false + } } } + result } protected def createEngineExecutionContext( From 9cbc3341b3d2c935738c4fa2cf873ca4c276dcdd Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Fri, 27 Mar 2026 16:19:30 +0800 Subject: [PATCH 3/4] =?UTF-8?q?#AI=20commit#=20=E5=BC=80=E5=8F=91=E9=98=B6?= =?UTF-8?q?=E6=AE=B5=EF=BC=9A=20=E4=BF=AE=E5=A4=8Dsr=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AF=BC=E8=87=B4=E5=8A=A0=E8=BD=BDinit=5Fsq?= =?UTF-8?q?l=E5=BC=82=E5=B8=B8bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../computation/executor/execute/ComputationExecutor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index b4d1060f5c..4fa868378d 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -477,7 +477,7 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) result = engineTypeLabel.exists { label => val engineType = label.getEngineType - if (engineType == EngineType.JDBC.toString) { + if (engineType.equals(EngineType.JDBC.toString)) { val upperCode = executionCode.toUpperCase().trim val jdbcSetPrefixes = ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",") From 530c12488e78fb0729a9e85728756396c5316207 Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Mon, 30 Mar 2026 10:54:59 +0800 Subject: [PATCH 4/4] =?UTF-8?q?#AI=20commit#=20=E4=BF=AE=E5=A4=8D=EF=BC=9A?= =?UTF-8?q?=20*=20=E5=A2=9E=E5=8A=A0=E4=BB=BB=E5=8A=A1=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E5=BC=80=E5=85=B3=E8=A6=86=E7=9B=96=E8=8C=83=E5=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execute/ComputationExecutor.scala | 56 ++++++++++++------- .../entrance/job/EntranceExecutionJob.java | 4 +- .../persistence/QueryPersistenceManager.java | 4 +- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala index 4fa868378d..44c79f8d38 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala @@ -269,34 +269,48 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) ) engineExecutionContext.getProperties .put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, errorIndex.toString) - // jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的 - var newIndex = index - var newErrorIndex = errorIndex - if (adjustErrorIndexForSetScenarios(engineConnTask)) { - newIndex = index - 1 - newErrorIndex = errorIndex + 1 - } - // 重试的时候如果执行过则跳过执行 - if (retryEnable && errorIndex > 0 && index < newErrorIndex) { - val code = codes(index).trim.toUpperCase() - val shouldSkip = !isContextStatement(code) + val props: util.Map[String, String] = engineCreationContext.getOptions + val taskRetry: String = + props.getOrDefault("linkis.task.retry.switch", "false").toString + if (java.lang.Boolean.parseBoolean(taskRetry)) { + // jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的 + var newIndex = index + var newErrorIndex = errorIndex + if (adjustErrorIndexForSetScenarios(engineConnTask)) { + newIndex = index - 1 + newErrorIndex = errorIndex + 1 + } + // 重试的时候如果执行过则跳过执行 + if (retryEnable && errorIndex > 0 && index < newErrorIndex) { + val code = codes(index).trim.toUpperCase() + val shouldSkip = !isContextStatement(code) - if (shouldSkip) { - engineExecutionContext.appendStdout( - LogUtils.generateInfo( - s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip." - ) - ) - executeFlag = false - } else { - if (newIndex >= 0) { + if (shouldSkip) { engineExecutionContext.appendStdout( LogUtils.generateInfo( - s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute." + s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip." ) ) + executeFlag = false + } else { + if (newIndex >= 0) { + engineExecutionContext.appendStdout( + LogUtils.generateInfo( + s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute." + ) + ) + } } } + } else { + if (retryEnable && errorIndex > 0 && index < errorIndex) { + engineExecutionContext.appendStdout( + LogUtils.generateInfo( + s"aisql retry with errorIndex: ${errorIndex}, current sql index: ${index} will skip." + ) + ) + executeFlag = false + } } if (executeFlag) { val code = codes(index) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java index 4c18b23b62..1ebf15dc9c 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java @@ -19,6 +19,7 @@ import org.apache.linkis.common.log.LogUtils; import org.apache.linkis.common.utils.ByteTimeUtils; +import org.apache.linkis.entrance.conf.EntranceConfiguration; import org.apache.linkis.entrance.exception.EntranceErrorException; import org.apache.linkis.entrance.execute.EntranceJob; import org.apache.linkis.entrance.log.LogHandler; @@ -159,7 +160,8 @@ public ExecuteRequest jobToExecuteRequest() throws EntranceErrorException { if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) { // 修复:任务重试背景下,10:59分提交任务执行,重试时时间变成11:00,重试任务会重新生成结果目录,导致查询结果集时,重试之前执行的结果集丢失 // 新增判断:生成结果目录之前,判断任务之前是否生成结果集,生成过就复用 - if (org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) { + if (((Boolean) EntranceConfiguration.TASK_RETRY_SWITCH().getValue()) + && org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) { resultSetPathRoot = jobRequest.getResultLocation(); } else { String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest); diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index 39b3f58c71..883922e32d 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -167,7 +167,9 @@ public boolean onJobFailed( AtomicBoolean canRetry = new AtomicBoolean(false); String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key(); - if (engineType.equals(EngineType.JDBC().toString()) && StringUtils.isNotBlank(errorDescRegex)) { + if (((Boolean) EntranceConfiguration.TASK_RETRY_SWITCH().getValue()) + && engineType.equals(EngineType.JDBC().toString()) + && StringUtils.isNotBlank(errorDescRegex)) { // JDBC执行正则匹配 for (String regex : errorDescRegex.split(",")) { String trimmedRegex = regex.trim();