Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ internal class ConsentAwareStorage(

val result = fileMover.delete(batchFile)
if (result) {
grantedOrchestrator.onFileDeleted(batchFile)
pendingOrchestrator.onFileDeleted(batchFile)
val numPendingFiles = grantedOrchestrator.decrementAndGetPendingFilesCount()
metricsDispatcher.sendBatchDeletedMetric(batchFile, reason, numPendingFiles)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,17 @@ internal interface FileOrchestrator {
* @return the number of pending files in the orchestrator, after decrementing by 1.
*/
fun decrementAndGetPendingFilesCount(): Int

/**
* Notifies the orchestrator that a file has been deleted.
*
* @param file the file that was deleted
*/
fun onFileDeleted(file: File)

/**
* Refreshes the internal file list from disk.
*/
@WorkerThread
fun refreshFilesFromDisk()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal class ConsentAwareFileMigrator(
newFileOrchestrator
)
operation.run()
newFileOrchestrator.refreshFilesFromDisk()
}

@WorkerThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ internal open class ConsentAwareFileOrchestrator(
return delegateOrchestrator.decrementAndGetPendingFilesCount()
}

override fun onFileDeleted(file: File) {
grantedOrchestrator.onFileDeleted(file)
pendingOrchestrator.onFileDeleted(file)
}

@WorkerThread
override fun refreshFilesFromDisk() {
grantedOrchestrator.refreshFilesFromDisk()
pendingOrchestrator.refreshFilesFromDisk()
}

// endregion

// region TrackingConsentProviderCallback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import com.datadog.android.core.internal.persistence.file.listFilesSafe
import com.datadog.android.core.internal.persistence.file.mkdirsSafe
import com.datadog.android.internal.time.TimeProvider
import java.io.File
import java.io.FileFilter
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.roundToLong

// TODO RUM-438 Improve this class: need to make it thread-safe and optimize work with file
// system in order to reduce the number of syscalls (which are expensive) for files already seen
@Suppress("TooManyFunctions")
internal class BatchFileOrchestrator(
private val rootDir: File,
Expand All @@ -38,8 +37,6 @@ internal class BatchFileOrchestrator(
private val pendingFiles: AtomicInteger = AtomicInteger(0)
) : FileOrchestrator {

private val fileFilter = BatchFileFilter()

// Offset the recent threshold for read and write to avoid conflicts
// Arbitrary offset as ±5% of the threshold
@Suppress("UnsafeThirdPartyFunctionCall") // rounded Double isn't NaN
Expand All @@ -48,11 +45,11 @@ internal class BatchFileOrchestrator(
@Suppress("UnsafeThirdPartyFunctionCall") // rounded Double isn't NaN
private val recentWriteDelayMs = (config.recentDelayMs * DECREASE_PERCENT).roundToLong()

// keep track of how many items were written in the last known file
private var previousFile: File? = null
private var previousFileItemCount: Long = 0
private var lastFileAccessTimestamp: Long = 0L
private var lastCleanupTimestamp: Long = 0L
private var currentBatchState = CurrentBatchState(null, 0L, 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please double-check access to this currentBatchState doesn't need to be done in synchronized block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm this class runs on a persistenceExecutorService with pool size 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm this class runs on a persistenceExecutorService with pool size 1.

This may change at some point though.

private val lastCleanupTimestamp = AtomicLong(0L)
private val areKnownFilesInitialized = AtomicBoolean(false)

private val knownFiles: MutableSet<File> = mutableSetOf()

// region FileOrchestrator

Expand All @@ -66,7 +63,7 @@ internal class BatchFileOrchestrator(
var files = listBatchFiles()
files = deleteObsoleteFiles(files)
freeSpaceIfNeeded(files)
lastCleanupTimestamp = timeProvider.getDeviceTimestampMillis()
lastCleanupTimestamp.set(timeProvider.getDeviceTimestampMillis())
}

return getReusableWritableFile() ?: createNewFile()
Expand All @@ -78,14 +75,19 @@ internal class BatchFileOrchestrator(
return null
}

val files = listSortedBatchFiles().let {
deleteObsoleteFiles(it)
}
lastCleanupTimestamp = timeProvider.getDeviceTimestampMillis()
val files = deleteObsoleteFiles(listSortedBatchFiles())
lastCleanupTimestamp.set(timeProvider.getDeviceTimestampMillis())
pendingFiles.set(files.count())

return files.firstOrNull {
(it !in excludeFiles) && !isFileRecent(it, recentReadDelayMs)
when {
it in excludeFiles || isFileRecent(it, recentReadDelayMs) -> false
!it.existsSafe(internalLogger) -> {
onFileDeleted(it)
false
}
else -> true
}
}
}

Expand Down Expand Up @@ -138,112 +140,130 @@ internal class BatchFileOrchestrator(
}
}

// endregion

// region Internal

override fun decrementAndGetPendingFilesCount(): Int {
return pendingFiles.decrementAndGet()
}

@Suppress("LiftReturnOrAssignment", "ReturnCount")
override fun onFileDeleted(file: File) {
synchronized(knownFiles) {
knownFiles.remove(file)
}
if (currentBatchState.file == file) {
currentBatchState = CurrentBatchState(null, 0L, currentBatchState.lastAccessTimestamp)
}
}

@WorkerThread
override fun refreshFilesFromDisk() {
val files = rootDir.listFilesSafe(internalLogger)?.filter { it.isBatchFile }
if (files != null) {
synchronized(knownFiles) {
knownFiles.clear()
knownFiles.addAll(files)
}
}
}

// endregion

// region Private

@WorkerThread
@Suppress("ReturnCount")
private fun isRootDirValid(): Boolean {
if (rootDir.existsSafe(internalLogger)) {
if (rootDir.isDirectory) {
if (rootDir.canWriteSafe(internalLogger)) {
return true
} else {
val isValid = if (rootDir.existsSafe(internalLogger)) {
when {
!rootDir.isDirectory -> {
internalLogger.log(
InternalLogger.Level.ERROR,
listOf(
InternalLogger.Target.MAINTAINER,
InternalLogger.Target.TELEMETRY
),
{ ERROR_ROOT_NOT_WRITABLE.format(Locale.US, rootDir.path) }
listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
{ ERROR_ROOT_NOT_DIR.format(Locale.US, rootDir.path) }
)
return false
}
} else {
internalLogger.log(
InternalLogger.Level.ERROR,
listOf(
InternalLogger.Target.MAINTAINER,
InternalLogger.Target.TELEMETRY
),
{ ERROR_ROOT_NOT_DIR.format(Locale.US, rootDir.path) }
)
return false
}
} else {
synchronized(rootDir) {
// double check if directory was already created by some other thread
// entered this branch
if (rootDir.existsSafe(internalLogger)) {
return true
false
}

if (rootDir.mkdirsSafe(internalLogger)) {
return true
} else {
!rootDir.canWriteSafe(internalLogger) -> {
internalLogger.log(
InternalLogger.Level.ERROR,
listOf(
InternalLogger.Target.MAINTAINER,
InternalLogger.Target.TELEMETRY
),
{ ERROR_CANT_CREATE_ROOT.format(Locale.US, rootDir.path) }
listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
{ ERROR_ROOT_NOT_WRITABLE.format(Locale.US, rootDir.path) }
)
return false
false
}

else -> true
}
} else {
createRootDirectory()
}

if (isValid && areKnownFilesInitialized.compareAndSet(false, true)) refreshFilesFromDisk()
return isValid
}

private fun createRootDirectory(): Boolean = synchronized(rootDir) {
val created = rootDir.existsSafe(internalLogger) || rootDir.mkdirsSafe(internalLogger)
if (!created) {
internalLogger.log(
InternalLogger.Level.ERROR,
listOf(InternalLogger.Target.MAINTAINER, InternalLogger.Target.TELEMETRY),
{ ERROR_CANT_CREATE_ROOT.format(Locale.US, rootDir.path) }
)
}
created
}

private fun createNewFile(): File {
val newFileName = timeProvider.getDeviceTimestampMillis().toString()
val newFile = File(rootDir, newFileName)
val closedFile = previousFile
val closedFileLastAccessTimestamp = lastFileAccessTimestamp
if (closedFile != null) {
val state = currentBatchState
if (state.file != null) {
metricsDispatcher.sendBatchClosedMetric(
closedFile,
state.file,
BatchClosedMetadata(
lastTimeWasUsedInMs = closedFileLastAccessTimestamp,
eventsCount = previousFileItemCount
lastTimeWasUsedInMs = state.lastAccessTimestamp,
eventsCount = state.itemCount
)
)
}
previousFile = newFile
previousFileItemCount = 1
lastFileAccessTimestamp = timeProvider.getDeviceTimestampMillis()

pendingFiles.incrementAndGet()

val newFile = File(rootDir, timeProvider.getDeviceTimestampMillis().toString())
currentBatchState = CurrentBatchState(newFile, 1L, timeProvider.getDeviceTimestampMillis())
synchronized(knownFiles) {
knownFiles.add(newFile)
}
return newFile
}

@Suppress("ReturnCount")
private fun getReusableWritableFile(): File? {
val files = listBatchFiles()
val lastFile = files.latestBatchFile ?: return null
val latestFile = synchronized(knownFiles) { knownFiles.maxOrNull() } ?: return null

if (!latestFile.existsSafe(internalLogger)) {
onFileDeleted(latestFile)
return null
}

val lastKnownFile = previousFile
val lastKnownFileItemCount = previousFileItemCount
if (lastKnownFile != lastFile) {
if (currentBatchState.file != latestFile) {
// this situation can happen because:
// 1. `lastFile` is a file written during a previous session
// 2. `lastFile` was created by another system/process
// 3. `lastKnownFile` was deleted
// 1. `latestFile` is a file written during a previous session
// 2. `latestFile` was created by another system/process
// 3. `previousFile` was deleted
// In any case, we don't know the item count, so to be safe, we create a new file
return null
}

val isRecentEnough = isFileRecent(lastFile, recentWriteDelayMs)
val hasRoomForMore = lastFile.lengthSafe(internalLogger) < config.maxBatchSize
val isRecentEnough = isFileRecent(latestFile, recentWriteDelayMs)
val hasRoomForMore = latestFile.lengthSafe(internalLogger) < config.maxBatchSize
val lastKnownFileItemCount = currentBatchState.itemCount
val hasSlotForMore = (lastKnownFileItemCount < config.maxItemsPerBatch)

return if (isRecentEnough && hasRoomForMore && hasSlotForMore) {
previousFileItemCount = lastKnownFileItemCount + 1
lastFileAccessTimestamp = timeProvider.getDeviceTimestampMillis()
lastFile
currentBatchState = currentBatchState.copy(
itemCount = lastKnownFileItemCount + 1,
lastAccessTimestamp = timeProvider.getDeviceTimestampMillis()
)
latestFile
} else {
null
}
Expand All @@ -262,6 +282,7 @@ internal class BatchFileOrchestrator(
val isOldFile = (it.name.toLongOrNull() ?: 0) < threshold
if (isOldFile) {
if (it.deleteSafe(internalLogger)) {
onFileDeleted(it)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we need some sort of private method that calls onFileDeleted after deleteSafe(internalLogger). This pattern is present in the code in multiple places.

metricsDispatcher.sendBatchDeletedMetric(
batchFile = it,
removalReason = RemovalReason.Obsolete,
Expand Down Expand Up @@ -306,6 +327,10 @@ internal class BatchFileOrchestrator(
val size = file.lengthSafe(internalLogger)
val wasDeleted = file.deleteSafe(internalLogger)
return if (wasDeleted) {
onFileDeleted(file)
if (currentBatchState.file == file) {
currentBatchState = CurrentBatchState(null, 0L, currentBatchState.lastAccessTimestamp)
}
if (sendMetric) {
metricsDispatcher.sendBatchDeletedMetric(file, RemovalReason.Purged, pendingFiles.decrementAndGet())
}
Expand All @@ -316,7 +341,9 @@ internal class BatchFileOrchestrator(
}

private fun listBatchFiles(): List<File> {
return rootDir.listFilesSafe(fileFilter, internalLogger).orEmpty().toList()
return synchronized(knownFiles) {
knownFiles.toList()
}
}

private fun listSortedBatchFiles(): List<File> {
Expand All @@ -326,7 +353,7 @@ internal class BatchFileOrchestrator(
}

private fun canDoCleanup(): Boolean {
return timeProvider.getDeviceTimestampMillis() - lastCleanupTimestamp > config.cleanupFrequencyThreshold
return timeProvider.getDeviceTimestampMillis() - lastCleanupTimestamp.get() > config.cleanupFrequencyThreshold
}

private val File.metadata: File
Expand All @@ -335,23 +362,13 @@ internal class BatchFileOrchestrator(
private val File.isBatchFile: Boolean
get() = name.toLongOrNull() != null

private val List<File>.latestBatchFile: File?
get() = maxOrNull()

// endregion

// region FileFilter

internal inner class BatchFileFilter : FileFilter {
@Suppress("ReturnCount")
override fun accept(file: File?): Boolean {
if (file == null) return false

return file.isBatchFile
}
}

// endregion
private data class CurrentBatchState(
val file: File?,
val itemCount: Long,
val lastAccessTimestamp: Long
)

companion object {

Expand Down
Loading
Loading