Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,31 @@ case class JobRecord(
fire_instance_id: String,
)

class OneTimeFireId private[piezo] (val id: String) extends AnyVal

class JobHistoryModel(getConnection: () => Connection) {
val logger: Logger = LoggerFactory.getLogger(this.getClass)

// Trigger Group for records that aren't deletable
private final val oneTimeJobTriggerGroup = "ONE_TIME_JOB"
final def oneTimeTriggerKey(fireInstanceId: String): TriggerKey =
TriggerKey(fireInstanceId, oneTimeJobTriggerGroup)
private final def oneTimeTriggerKey(fireInstanceId: OneTimeFireId): TriggerKey =
TriggerKey(fireInstanceId.id, oneTimeJobTriggerGroup)

// Makes the job id unique per job, instead of globally unique
final def getFireInstanceIdFromOneTimeJobId(group: String, name: String, oneTimeJobId: Long): String =
s"${group}_${name}_$oneTimeJobId"
private[piezo] final def getFireInstanceIdFromOneTimeJobId(
group: String,
name: String,
oneTimeJobId: Long,
): OneTimeFireId =
new OneTimeFireId(s"${group}_${name}_$oneTimeJobId")

// Methods to store the one-time-job id in a job-data-map
final val jobDataMapOneTimeJobKey = "OneTimeJobId"
final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[String] = Option(
final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[OneTimeFireId] = Option(
jobDataMap.getString(jobDataMapOneTimeJobKey),
)
final def createJobDataMapForOneTimeJob(fireInstanceId: String): JobDataMap = new JobDataMap(
java.util.Map.of(jobDataMapOneTimeJobKey, fireInstanceId),
).map(s => new OneTimeFireId(s))
final def createJobDataMapForOneTimeJob(fireInstanceId: OneTimeFireId): JobDataMap = new JobDataMap(
java.util.Map.of(jobDataMapOneTimeJobKey, fireInstanceId.id),
)

def addJob(
Expand Down Expand Up @@ -218,11 +224,11 @@ class JobHistoryModel(getConnection: () => Connection) {
* with the same instance id is an idempotent operation. If the one-time job has not been triggered, the same
* transaction is used to add the one-time-job to the database, to avoid race conditions
*/
def addOneTimeJobIfNotExists(jobKey: JobKey, oneTimeJobId: Long): Option[String] = {
def addOneTimeJobIfNotExists(jobKey: JobKey, oneTimeJobId: Long): Option[OneTimeFireId] = {
val connection = getConnection()

// Make the job id unique for that job, instead of globally unique
val fireInstanceId: String = getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, oneTimeJobId)
val fireInstanceId: OneTimeFireId = getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, oneTimeJobId)
// Use a trigger key that the database won't clean up in "JobHistoryCleanup"
val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId)

Expand All @@ -245,7 +251,7 @@ class JobHistoryModel(getConnection: () => Connection) {
VALUES(?, ?, ?, ?, ?, ?, ?)
""".stripMargin,
)
prepared.setString(1, fireInstanceId)
prepared.setString(1, fireInstanceId.id)
prepared.setString(2, jobKey.getName)
prepared.setString(3, jobKey.getGroup)
prepared.setString(4, triggerKey.getName)
Expand All @@ -264,7 +270,7 @@ class JobHistoryModel(getConnection: () => Connection) {
}

def completeOneTimeJob(
fireInstanceId: String,
fireInstanceId: OneTimeFireId,
fireTime: Instant,
instanceDurationInMillis: Long,
success: Boolean,
Expand All @@ -284,7 +290,7 @@ class JobHistoryModel(getConnection: () => Connection) {
prepared.setBoolean(1, success)
prepared.setObject(2, fireTime)
prepared.setObject(3, fireTime.plusMillis(instanceDurationInMillis))
prepared.setString(4, fireInstanceId)
prepared.setString(4, fireInstanceId.id)
prepared.executeUpdate()
} catch {
case e: Exception => logger.error("error in recording completion of one-time-job", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class WorkerJobListener(getConnection: () => Connection, statsd: StatsDClient, u
def jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException): Unit = {
try {
val success = jobException == null
val oneTimeJobIdOption: Option[String] = jobHistoryModel.getOneTimeJobIdFromDataMap(context.getMergedJobDataMap)
val oneTimeJobIdOption = jobHistoryModel.getOneTimeJobIdFromDataMap(context.getMergedJobDataMap)
oneTimeJobIdOption match {
case Some(oneTimeJobId) => // Update the existing record from the job_history table
jobHistoryModel.completeOneTimeJob(
Expand Down
4 changes: 2 additions & 2 deletions worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll {
val scheduledStart = java.time.Instant.now()
val temporaryFireInstanceId = "FireInstanceId"
val permanentFireInstanceIdLong = 123456789
val permanentFireInstanceIdString = jobHistoryModel.getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, permanentFireInstanceIdLong)
val permanentFireInstanceIdString = jobHistoryModel.getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, permanentFireInstanceIdLong).id
jobHistoryModel.addJob(temporaryFireInstanceId, jobKey, temporaryTriggerKey, scheduledStart, 1, true)
jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceIdLong)

Expand Down Expand Up @@ -222,7 +222,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll {
.map(record => (record.fire_instance_id, record.finish.map(_.getEpochSecond))) must containTheSameElementsAs(
List(
(
fireInstanceId,
fireInstanceId.id,
Some(expectedFinishSeconds),
),
),
Expand Down