Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions admin-app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,7 @@ drill {
transformationBufferSize = ${?DRILL_ETL_TRANSFORMATION_BUFFER_SIZE}
loggingFrequency = 10
loggingFrequency = ${?DRILL_ETL_LOGGING_FREQUENCY}
consistencyWindow = 0
consistencyWindow = ${?DRILL_ETL_CONSISTENCY_WINDOW}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ open class EtlOrchestratorImpl(
override val name: String,
open val pipelines: List<EtlPipeline<*, *>>,
open val metadataRepository: EtlMetadataRepository,
open val consistencyWindow: Long = 0,
) : EtlOrchestrator {
private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -101,7 +102,14 @@ open class EtlOrchestratorImpl(
val metadata = metadataRepository.getAllMetadataByExtractor(groupId, pipeline.name, pipeline.extractor.name)
.associateBy { it.loaderName }
val loaderNames = pipeline.loaders.map { it.second.name }.toSet()
val timestampPerLoader = loaderNames.associateWith { (metadata[it]?.lastProcessedAt ?: initTimestamp) }
val timestampPerLoader = loaderNames.associateWith {
val lastProcessedAt = metadata[it]?.lastProcessedAt
if (lastProcessedAt != null) {
lastProcessedAt.minusSeconds(consistencyWindow)
} else {
initTimestamp
}
}

try {
for (loader in loaderNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,36 @@ class ETLSimpleTest {
assertTrue(result2.first().status == EtlStatus.SUCCESS)
assertEquals(3, result2.first().rowsProcessed)
}

@Test
fun `given consistencyWindow, ETL orchestrator should re-process records within the lookback window`() = runBlocking {
val groupId = "test-group"
val orchestrator = EtlOrchestratorImpl(
name = "lookback-etl",
pipelines = listOf(
EtlPipelineImpl.singleLoader(
"simple-pipeline",
extractor = SimpleExtractor(),
loader = SimpleLoader()
)
),
metadataRepository = SimpleMetadataRepository(),
consistencyWindow = 60
)

// Add initial data
addNewRecords(5)
// First run ETL — should process all initial data
val result1 = orchestrator.run(groupId)
assertTrue(result1.first().status == EtlStatus.SUCCESS)
assertEquals(5, result1.first().rowsProcessed)

// Add new data after last processed timestamp
addNewRecords(3)
// Second run ETL — lookback of 60s should re-process all 8 records (5 original + 3 new)
// because all records were created within the last 60 seconds
val result2 = orchestrator.run(groupId)
assertTrue(result2.first().status == EtlStatus.SUCCESS)
assertEquals(8, result2.first().rowsProcessed)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ class EtlConfig(private val config: ApplicationConfig) {
*/
val loggingFrequency : Int
get() = config.propertyOrNull("loggingFrequency")?.getString()?.toIntOrNull() ?: 10

/**
* Number of seconds to subtract from the last processed timestamp when starting an ETL run.
* This allows re-processing records that arrived in the source database with a delay
* or were inconsistent at the time of the previous ETL run.
*/
val consistencyWindow : Long
get() = config.propertyOrNull("consistencyWindow")?.getString()?.toLongOrNull() ?: 0L
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ val metricsDIModule
testLaunchesPipeline, testDefinitionsPipeline, testSessionsPipeline,
coveragePipeline, testSessionBuildsPipeline
),
metadataRepository = instance()
metadataRepository = instance(),
consistencyWindow = etlConfig.consistencyWindow
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,10 @@ class MetricsServiceImpl(
page: Int?,
pageSize: Int?
): PagedList<TestView> = transaction {
val baselineBuildId = build.id.takeIf { metricsRepository.buildExists(it) }
val targetBuildId = build.id.takeIf { metricsRepository.buildExists(it) }
?: throw BuildNotFound("Target build info not found for ${build.id}")

val targetBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) }
val baselineBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) }
?: throw BuildNotFound("Baseline build info not found for ${baselineBuild.id}")

// Map response field names to database column names
Expand Down Expand Up @@ -549,10 +549,10 @@ class MetricsServiceImpl(
page: Int?,
pageSize: Int?
): PagedList<MethodView> = transaction {
val baselineBuildId = build.id.takeIf { metricsRepository.buildExists(it) }
val targetBuildId = build.id.takeIf { metricsRepository.buildExists(it) }
?: throw BuildNotFound("Target build info not found for ${build.id}")

val targetBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) }
val baselineBuildId = baselineBuild.id.takeIf { metricsRepository.buildExists(it) }
?: throw BuildNotFound("Baseline build info not found for ${baselineBuild.id}")

// Map response field names to database column names
Expand Down
Loading