From 66dcd9b25af9904774cd929bdb35c94f79486eeb Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Thu, 26 Feb 2026 10:56:17 -0700 Subject: [PATCH] feat: Type safety for one time fire instance id Use a newtype so that ensure that we are passing around the right thing. Also make some methods more private. This is built on top of #133 --- .../lucidchart/piezo/JobHistoryModel.scala | 32 +++++++++++-------- .../lucidchart/piezo/WorkerJobListener.scala | 2 +- .../com/lucidchart/piezo/ModelTest.scala | 4 +-- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index 72a15389..39ce5838 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -18,25 +18,31 @@ case class JobRecord( fire_instance_id: String, ) +class OneTimeFireId private[piezo] (val id: String) extends AnyVal + class JobHistoryModel(getConnection: () => Connection) { val logger: Logger = LoggerFactory.getLogger(this.getClass) // Trigger Group for records that aren't deletable private final val oneTimeJobTriggerGroup = "ONE_TIME_JOB" - final def oneTimeTriggerKey(fireInstanceId: String): TriggerKey = - TriggerKey(fireInstanceId, oneTimeJobTriggerGroup) + private final def oneTimeTriggerKey(fireInstanceId: OneTimeFireId): TriggerKey = + TriggerKey(fireInstanceId.id, oneTimeJobTriggerGroup) // Makes the job id unique per job, instead of globally unique - final def getFireInstanceIdFromOneTimeJobId(group: String, name: String, oneTimeJobId: Long): String = - s"${group}_${name}_$oneTimeJobId" + private[piezo] final def getFireInstanceIdFromOneTimeJobId( + group: String, + name: String, + oneTimeJobId: Long, + ): OneTimeFireId = + new OneTimeFireId(s"${group}_${name}_$oneTimeJobId") // Methods to store the one-time-job id in a job-data-map final val jobDataMapOneTimeJobKey = "OneTimeJobId" - final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[String] = Option( + final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[OneTimeFireId] = Option( jobDataMap.getString(jobDataMapOneTimeJobKey), - ) - final def createJobDataMapForOneTimeJob(fireInstanceId: String): JobDataMap = new JobDataMap( - java.util.Map.of(jobDataMapOneTimeJobKey, fireInstanceId), + ).map(s => new OneTimeFireId(s)) + final def createJobDataMapForOneTimeJob(fireInstanceId: OneTimeFireId): JobDataMap = new JobDataMap( + java.util.Map.of(jobDataMapOneTimeJobKey, fireInstanceId.id), ) def addJob( @@ -218,11 +224,11 @@ class JobHistoryModel(getConnection: () => Connection) { * with the same instance id is an idempotent operation. If the one-time job has not been triggered, the same * transaction is used to add the one-time-job to the database, to avoid race conditions */ - def addOneTimeJobIfNotExists(jobKey: JobKey, oneTimeJobId: Long): Option[String] = { + def addOneTimeJobIfNotExists(jobKey: JobKey, oneTimeJobId: Long): Option[OneTimeFireId] = { val connection = getConnection() // Make the job id unique for that job, instead of globally unique - val fireInstanceId: String = getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, oneTimeJobId) + val fireInstanceId: OneTimeFireId = getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, oneTimeJobId) // Use a trigger key that the database won't clean up in "JobHistoryCleanup" val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId) @@ -245,7 +251,7 @@ class JobHistoryModel(getConnection: () => Connection) { VALUES(?, ?, ?, ?, ?, ?, ?) """.stripMargin, ) - prepared.setString(1, fireInstanceId) + prepared.setString(1, fireInstanceId.id) prepared.setString(2, jobKey.getName) prepared.setString(3, jobKey.getGroup) prepared.setString(4, triggerKey.getName) @@ -264,7 +270,7 @@ class JobHistoryModel(getConnection: () => Connection) { } def completeOneTimeJob( - fireInstanceId: String, + fireInstanceId: OneTimeFireId, fireTime: Instant, instanceDurationInMillis: Long, success: Boolean, @@ -284,7 +290,7 @@ class JobHistoryModel(getConnection: () => Connection) { prepared.setBoolean(1, success) prepared.setObject(2, fireTime) prepared.setObject(3, fireTime.plusMillis(instanceDurationInMillis)) - prepared.setString(4, fireInstanceId) + prepared.setString(4, fireInstanceId.id) prepared.executeUpdate() } catch { case e: Exception => logger.error("error in recording completion of one-time-job", e) diff --git a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala index 3c0a3480..2f4ee842 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/WorkerJobListener.scala @@ -23,7 +23,7 @@ class WorkerJobListener(getConnection: () => Connection, statsd: StatsDClient, u def jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException): Unit = { try { val success = jobException == null - val oneTimeJobIdOption: Option[String] = jobHistoryModel.getOneTimeJobIdFromDataMap(context.getMergedJobDataMap) + val oneTimeJobIdOption = jobHistoryModel.getOneTimeJobIdFromDataMap(context.getMergedJobDataMap) oneTimeJobIdOption match { case Some(oneTimeJobId) => // Update the existing record from the job_history table jobHistoryModel.completeOneTimeJob( diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index 2d7d4d09..f546e424 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -156,7 +156,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val scheduledStart = java.time.Instant.now() val temporaryFireInstanceId = "FireInstanceId" val permanentFireInstanceIdLong = 123456789 - val permanentFireInstanceIdString = jobHistoryModel.getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, permanentFireInstanceIdLong) + val permanentFireInstanceIdString = jobHistoryModel.getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, permanentFireInstanceIdLong).id jobHistoryModel.addJob(temporaryFireInstanceId, jobKey, temporaryTriggerKey, scheduledStart, 1, true) jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceIdLong) @@ -222,7 +222,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { .map(record => (record.fire_instance_id, record.finish.map(_.getEpochSecond))) must containTheSameElementsAs( List( ( - fireInstanceId, + fireInstanceId.id, Some(expectedFinishSeconds), ), ),