From 224c4ffd298d53e201a55b37cc885b7cb726288f Mon Sep 17 00:00:00 2001 From: Carlos Ernesto Alvarez Berumen Date: Sat, 7 Mar 2026 14:46:32 -0600 Subject: [PATCH 1/3] v1 --- .../scala/org/apache/texera/service/util/S3StorageClient.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index f3d252d413b..d692c574b8e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -38,7 +38,8 @@ import scala.jdk.CollectionConverters._ object S3StorageClient { val MINIMUM_NUM_OF_MULTIPART_S3_PART: Long = 5L * 1024 * 1024 // 5 MiB val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000 - val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6 + //Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180 + val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24 // Initialize MinIO-compatible S3 Client private lazy val s3Client: S3Client = { From bf6130653b560a9139067dfaa7708e2bcf426897 Mon Sep 17 00:00:00 2001 From: Carlos Ernesto Alvarez Berumen Date: Sat, 7 Mar 2026 14:54:21 -0600 Subject: [PATCH 2/3] v2 --- .../apache/texera/service/resource/DatasetResourceSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 24253c3a92d..0256fbf4937 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -905,7 +905,7 @@ class DatasetResourceSpec uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim should not be "" - // Age the session so it is definitely expired (> PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6) + // Age the session so it is definitely expired (> PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS) val expireHrs = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS getDSLContext From c858100d4f187540ffc46d2b20a3852810f899f9 Mon Sep 17 00:00:00 2001 From: Carlos Ernesto Alvarez Berumen Date: Mon, 16 Mar 2026 02:12:51 -0600 Subject: [PATCH 3/3] v10 --- .../resource/DatasetResourceSpec.scala | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 0256fbf4937..a71861491a6 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -670,6 +670,20 @@ class DatasetResourceSpec sessionRecord.getUploadId } + private def expireUploadSession(uploadId: String): Unit = { + val expiredHoursAgo = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS + 1 + getDSLContext + .update(DATASET_UPLOAD_SESSION) + .set( + DATASET_UPLOAD_SESSION.CREATED_AT, + DSL + .field(s"current_timestamp - interval '${expiredHoursAgo} hours'") + .cast(classOf[java.time.OffsetDateTime]) + ) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId)) + .execute() + } + private def assertPlaceholdersCreated(uploadId: String, expectedParts: Int): Unit = { val rows = fetchPartRows(uploadId).sortBy(_.getPartNumber) rows.size shouldEqual expectedParts @@ -717,17 +731,9 @@ class DatasetResourceSpec initUpload(fpB, numParts = 2).getStatus shouldEqual 200 initUpload(fpA, numParts = 2).getStatus shouldEqual 200 - // Expire fpB by pushing created_at back > 6 hours. + // Expire fpB by pushing created_at back beyond the real session expiration window. val uploadIdB = fetchUploadIdOrFail(fpB) - val tableName = DATASET_UPLOAD_SESSION.getName // typically "dataset_upload_session" - getDSLContext - .update(DATASET_UPLOAD_SESSION) - .set( - DATASET_UPLOAD_SESSION.CREATED_AT, - DSL.field("current_timestamp - interval '7 hours'").cast(classOf[java.time.OffsetDateTime]) - ) - .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadIdB)) - .execute() + expireUploadSession(uploadIdB) val listed = listUploads() listed shouldEqual listed.sorted @@ -906,18 +912,7 @@ class DatasetResourceSpec fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim should not be "" // Age the session so it is definitely expired (> PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS) - val expireHrs = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS - - getDSLContext - .update(DATASET_UPLOAD_SESSION) - .set( - DATASET_UPLOAD_SESSION.CREATED_AT, - DSL - .field(s"current_timestamp - interval '${expireHrs + 1} hours'") - .cast(classOf[java.time.OffsetDateTime]) - ) - .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(oldUploadId)) - .execute() + expireUploadSession(oldUploadId) // Same init config again -> should restart because it's expired val r2 = initUpload(filePath, numParts = 2, lastPartBytes = 123)