diff --git a/admin-app/src/main/resources/application.conf b/admin-app/src/main/resources/application.conf index dd7146f5e..18b46d3fe 100644 --- a/admin-app/src/main/resources/application.conf +++ b/admin-app/src/main/resources/application.conf @@ -104,5 +104,7 @@ drill { transformationBufferSize = ${?DRILL_ETL_TRANSFORMATION_BUFFER_SIZE} loggingFrequency = 10 loggingFrequency = ${?DRILL_ETL_LOGGING_FREQUENCY} + consistencyWindow = 0 + consistencyWindow = ${?DRILL_ETL_CONSISTENCY_WINDOW} } } diff --git a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt index a1a2df82e..5beda0ec8 100644 --- a/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt +++ b/admin-etl/src/main/kotlin/com/epam/drill/admin/etl/impl/EtlOrchestratorImpl.kt @@ -43,6 +43,7 @@ open class EtlOrchestratorImpl( override val name: String, open val pipelines: List>, open val metadataRepository: EtlMetadataRepository, + open val consistencyWindow: Long = 0, ) : EtlOrchestrator { private val logger = KotlinLogging.logger {} @@ -101,7 +102,14 @@ open class EtlOrchestratorImpl( val metadata = metadataRepository.getAllMetadataByExtractor(groupId, pipeline.name, pipeline.extractor.name) .associateBy { it.loaderName } val loaderNames = pipeline.loaders.map { it.second.name }.toSet() - val timestampPerLoader = loaderNames.associateWith { (metadata[it]?.lastProcessedAt ?: initTimestamp) } + val timestampPerLoader = loaderNames.associateWith { + val lastProcessedAt = metadata[it]?.lastProcessedAt + if (lastProcessedAt != null) { + lastProcessedAt.minusSeconds(consistencyWindow) + } else { + initTimestamp + } + } try { for (loader in loaderNames) { diff --git a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt index 86b5dd53f..a7136cc8b 100644 --- a/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt +++ b/admin-etl/src/test/kotlin/com/epam/drill/admin/etl/ETLSimpleTest.kt @@ -304,4 +304,36 @@ class ETLSimpleTest { assertTrue(result2.first().status == EtlStatus.SUCCESS) assertEquals(3, result2.first().rowsProcessed) } + + @Test + fun `given consistencyWindow, ETL orchestrator should re-process records within the lookback window`() = runBlocking { + val groupId = "test-group" + val orchestrator = EtlOrchestratorImpl( + name = "lookback-etl", + pipelines = listOf( + EtlPipelineImpl.singleLoader( + "simple-pipeline", + extractor = SimpleExtractor(), + loader = SimpleLoader() + ) + ), + metadataRepository = SimpleMetadataRepository(), + consistencyWindow = 60 + ) + + // Add initial data + addNewRecords(5) + // First run ETL — should process all initial data + val result1 = orchestrator.run(groupId) + assertTrue(result1.first().status == EtlStatus.SUCCESS) + assertEquals(5, result1.first().rowsProcessed) + + // Add new data after last processed timestamp + addNewRecords(3) + // Second run ETL — lookback of 60s should re-process all 8 records (5 original + 3 new) + // because all records were created within the last 60 seconds + val result2 = orchestrator.run(groupId) + assertTrue(result2.first().status == EtlStatus.SUCCESS) + assertEquals(8, result2.first().rowsProcessed) + } } diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt index 2d7e5ac3f..71a80b1c8 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/EtlConfig.kt @@ -56,4 +56,12 @@ class EtlConfig(private val config: ApplicationConfig) { */ val loggingFrequency : Int get() = config.propertyOrNull("loggingFrequency")?.getString()?.toIntOrNull() ?: 10 + + /** + * Number of seconds to subtract from the last processed timestamp when starting an ETL run. + * This allows re-processing records that arrived in the source database with a delay + * or were inconsistent at the time of the previous ETL run. + */ + val consistencyWindow : Long + get() = config.propertyOrNull("consistencyWindow")?.getString()?.toLongOrNull() ?: 0L } \ No newline at end of file diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt index 3a5d73f8b..cabd13c71 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/config/MetricsModule.kt @@ -65,7 +65,8 @@ val metricsDIModule testLaunchesPipeline, testDefinitionsPipeline, testSessionsPipeline, coveragePipeline, testSessionBuildsPipeline ), - metadataRepository = instance() + metadataRepository = instance(), + consistencyWindow = etlConfig.consistencyWindow ) } } diff --git a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt index e75807373..73628de96 100644 --- a/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt +++ b/admin-metrics/src/main/kotlin/com/epam/drill/admin/metrics/service/impl/MetricsServiceImpl.kt @@ -484,10 +484,10 @@ class MetricsServiceImpl( page: Int?, pageSize: Int? ): PagedList = transaction { - val baselineBuildId = build.id.takeIf { metricsRepository.buildExists(it) } + val targetBuildId = build.id.takeIf { metricsRepository.buildExists(it) } ?: throw BuildNotFound("Target build info not found for ${build.id}") - val targetBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) } + val baselineBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) } ?: throw BuildNotFound("Baseline build info not found for ${baselineBuild.id}") // Map response field names to database column names @@ -549,10 +549,10 @@ class MetricsServiceImpl( page: Int?, pageSize: Int? ): PagedList = transaction { - val baselineBuildId = build.id.takeIf { metricsRepository.buildExists(it) } + val targetBuildId = build.id.takeIf { metricsRepository.buildExists(it) } ?: throw BuildNotFound("Target build info not found for ${build.id}") - val targetBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) } + val baselineBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) } ?: throw BuildNotFound("Baseline build info not found for ${baselineBuild.id}") // Map response field names to database column names