Fix job deletion bug, add performance tests.

This commit is contained in:
Greyson Parrelli 2024-07-18 10:12:33 -04:00 committed by Nicholas Tinsley
parent 86cf8200b5
commit 36dface175
6 changed files with 264 additions and 7 deletions

View file

@ -0,0 +1,117 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.jobs
import android.app.Application
import androidx.test.ext.junit.runners.AndroidJUnit4
import org.junit.Ignore
import org.junit.Test
import org.junit.runner.RunWith
import org.signal.core.util.EventTimer
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.JobDatabase.Companion.getInstance
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JobManager
import org.thoughtcrime.securesms.jobmanager.JobMigrator
import org.thoughtcrime.securesms.jobmanager.JobTracker
import org.thoughtcrime.securesms.util.TextSecurePreferences
import java.util.concurrent.CountDownLatch
import kotlin.random.Random
@Ignore("This is just for testing performance, not correctness, and they can therefore take a long time. Run them manually when you need to.")
@RunWith(AndroidJUnit4::class)
class JobManagerPerformanceTests {
companion object {
val TAG = Log.tag(JobManagerPerformanceTests::class.java)
}
@Test
fun testPerformance_singleQueue() {
runTest(2000) { TestJob(queue = "queue") }
}
@Test
fun testPerformance_fourQueues() {
runTest(2000) { TestJob(queue = "queue-${Random.nextInt(1, 5)}") }
}
@Test
fun testPerformance_noQueues() {
runTest(2000) { TestJob(queue = null) }
}
private fun runTest(count: Int, jobCreator: () -> TestJob) {
val context = AppDependencies.application
val jobManager = testJobManager(context)
jobManager.beginJobLoop()
val eventTimer = EventTimer()
val latch = CountDownLatch(count)
var seenStart = false
jobManager.addListener({ it.factoryKey == TestJob.KEY }) { _, state ->
if (!seenStart && state == JobTracker.JobState.RUNNING) {
// Adding the jobs can take a while (and runs on a background thread), so we want to reset the timer the first time we see a job run so the first job
// doesn't have a skewed time
eventTimer.reset()
seenStart = true
}
if (state.isComplete) {
eventTimer.emit("job")
latch.countDown()
if (latch.count % 100 == 0L) {
Log.d(TAG, "Finished ${count - latch.count}/$count jobs")
}
}
}
Log.i(TAG, "Adding jobs...")
jobManager.addAll((1..count).map { jobCreator() })
Log.i(TAG, "Waiting for jobs to complete...")
latch.await()
Log.i(TAG, "Jobs complete!")
Log.i(TAG, eventTimer.stop().summary)
}
private fun testJobManager(context: Application): JobManager {
val config = JobManager.Configuration.Builder()
.setJobFactories(
JobManagerFactories.getJobFactories(context) + mapOf(
TestJob.KEY to TestJob.Factory()
)
)
.setConstraintFactories(JobManagerFactories.getConstraintFactories(context))
.setConstraintObservers(JobManagerFactories.getConstraintObservers(context))
.setJobStorage(FastJobStorage(getInstance(context)))
.setJobMigrator(JobMigrator(TextSecurePreferences.getJobManagerVersion(context), JobManager.CURRENT_VERSION, JobManagerFactories.getJobMigrations(context)))
.build()
return JobManager(context, config)
}
private class TestJob(params: Parameters) : Job(params) {
companion object {
const val KEY = "test"
}
constructor(queue: String?) : this(Parameters.Builder().setQueue(queue).build())
override fun serialize(): ByteArray? = null
override fun getFactoryKey(): String = KEY
override fun run(): Result = Result.success()
override fun onFailure() = Unit
class Factory : Job.Factory<TestJob> {
override fun create(parameters: Parameters, serializedData: ByteArray?): TestJob {
return TestJob(parameters)
}
}
}
}

View file

@ -197,6 +197,40 @@ class JobDatabase(
.readToList { it.toJobSpec() }
}
@Synchronized
fun getJobSpecsByKeys(keys: Collection<String>): List<JobSpec> {
if (keys.isEmpty()) {
return emptyList()
}
val output: MutableList<JobSpec> = ArrayList(keys.size)
for (query in SqlUtil.buildCollectionQuery(Jobs.JOB_SPEC_ID, keys)) {
readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.where(query.where, query.whereArgs)
.run()
.forEach {
output += it.toJobSpec()
}
}
return output
}
@Synchronized
fun getMostEligibleJobInQueue(queue: String): JobSpec? {
return readableDatabase
.select()
.from(Jobs.TABLE_NAME)
.where("${Jobs.QUEUE_KEY} = ?", queue)
.orderBy("${Jobs.PRIORITY} DESC, ${Jobs.CREATE_TIME} ASC, ${Jobs.ID} ASC")
.limit(1)
.run()
.readToSingleObject { it.toJobSpec() }
}
@Synchronized
fun getAllMatchingFilter(predicate: Predicate<JobSpec>): List<JobSpec> {
val output: MutableList<JobSpec> = mutableListOf()

View file

@ -1,6 +1,8 @@
package org.thoughtcrime.securesms.jobs
import androidx.annotation.VisibleForTesting
import org.signal.core.util.Stopwatch
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.JobDatabase
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.persistence.ConstraintSpec
@ -15,7 +17,9 @@ import java.util.function.Predicate
class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
companion object {
private val TAG = Log.tag(FastJobStorage::class)
private const val JOB_CACHE_LIMIT = 1000
private const val DEBUG = false
}
/** We keep a trimmed down version of every job in memory. */
@ -38,7 +42,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
private val dependenciesByJobId: MutableMap<String, MutableList<DependencySpec>> = hashMapOf()
/** The list of jobs eligible to be returned from [getPendingJobsWithNoDependenciesInCreatedOrder], kept sorted in the appropriate order. */
private val eligibleJobs: TreeSet<MinimalJobSpec> = TreeSet(EligibleJobComparator)
private val eligibleJobs: TreeSet<MinimalJobSpec> = TreeSet(EligibleMinJobComparator)
/** All migration-related jobs, kept in the appropriate order. */
private val migrationJobs: TreeSet<MinimalJobSpec> = TreeSet(compareBy { it.createTime })
@ -48,7 +52,9 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun init() {
val stopwatch = Stopwatch("init", decimalPlaces = 2)
minimalJobs += jobDatabase.getAllMinimalJobSpecs()
stopwatch.split("fetch-min-jobs")
for (job in minimalJobs) {
if (job.queueKey == Job.Parameters.MIGRATION_QUEUE_KEY) {
@ -57,29 +63,37 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
placeJobInEligibleList(job)
}
}
stopwatch.split("sort-min-jobs")
jobDatabase.getJobSpecs(JOB_CACHE_LIMIT).forEach {
jobSpecCache[it.id] = it
}
stopwatch.split("fetch-full-jobs")
for (constraintSpec in jobDatabase.getConstraintSpecsForJobs(jobSpecCache.keys)) {
val jobConstraints: MutableList<ConstraintSpec> = constraintsByJobId.getOrPut(constraintSpec.jobSpecId) { mutableListOf() }
jobConstraints += constraintSpec
}
stopwatch.split("fetch-constraints")
for (dependencySpec in jobDatabase.getAllDependencySpecs().filterNot { it.hasCircularDependency() }) {
val jobDependencies: MutableList<DependencySpec> = dependenciesByJobId.getOrPut(dependencySpec.jobId) { mutableListOf() }
jobDependencies += dependencySpec
}
stopwatch.split("fetch-dependencies")
stopwatch.stop(TAG)
}
@Synchronized
override fun insertJobs(fullSpecs: List<FullSpec>) {
val stopwatch = debugStopwatch("insert")
val durable: List<FullSpec> = fullSpecs.filterNot { it.isMemoryOnly }
if (durable.isNotEmpty()) {
jobDatabase.insertJobs(durable)
}
stopwatch?.split("db")
for (fullSpec in fullSpecs) {
val minimalJobSpec = fullSpec.jobSpec.toMinimalJobSpec()
@ -95,6 +109,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
constraintsByJobId[fullSpec.jobSpec.id] = fullSpec.constraintSpecs.toMutableList()
dependenciesByJobId[fullSpec.jobSpec.id] = fullSpec.dependencySpecs.toMutableList()
}
stopwatch?.split("cache")
stopwatch?.stop(TAG)
}
@Synchronized
@ -109,6 +125,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
@Synchronized
override fun getPendingJobsWithNoDependenciesInCreatedOrder(currentTime: Long): List<JobSpec> {
val stopwatch = debugStopwatch("get-pending")
val migrationJob: MinimalJobSpec? = migrationJobs.firstOrNull()
return if (migrationJob != null && !migrationJob.isRunning && migrationJob.hasEligibleRunTime(currentTime)) {
@ -116,7 +133,7 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
} else if (migrationJob != null) {
emptyList()
} else {
eligibleJobs
val minJobs: List<MinimalJobSpec> = eligibleJobs
.asSequence()
.filter { job ->
// Filter out all jobs with unmet dependencies
@ -124,8 +141,11 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
.filterNot { it.isRunning }
.filter { job -> job.hasEligibleRunTime(currentTime) }
.map { it.toJobSpec() }
.toList()
getFullJobs(minJobs)
}.also {
stopwatch?.stop(TAG)
}
}
@ -282,6 +302,8 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
.map { it.toMinimalJobSpec() }
.toSet()
val affectedQueues: Set<String> = minimalJobsToDelete.mapNotNull { it.queueKey }.toSet()
if (durableJobIdsToDelete.isNotEmpty()) {
jobDatabase.deleteJobs(durableJobIdsToDelete)
}
@ -292,6 +314,15 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
eligibleJobs.removeAll(minimalJobsToDelete)
migrationJobs.removeAll(minimalJobsToDelete)
mostEligibleJobForQueue.keys.removeAll(affectedQueues)
for (queue in affectedQueues) {
jobDatabase.getMostEligibleJobInQueue(queue)?.let {
jobSpecCache[it.id] = it
placeJobInEligibleList(it.toMinimalJobSpec())
}
}
for (jobId in ids) {
constraintsByJobId.remove(jobId)
dependenciesByJobId.remove(jobId)
@ -490,7 +521,22 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
private object EligibleJobComparator : Comparator<MinimalJobSpec> {
private fun getFullJobs(minJobs: Collection<MinimalJobSpec>): List<JobSpec> {
val requestedKeys = minJobs.map { it.id }.toSet()
val cachedKeys = jobSpecCache.keys.intersect(requestedKeys)
val uncachedKeys = requestedKeys.subtract(cachedKeys)
val cachedJobs = cachedKeys.map { jobSpecCache[it]!! }
val fetchedJobs = jobDatabase.getJobSpecsByKeys(uncachedKeys)
val sorted = TreeSet(EligibleFullJobComparator).apply {
addAll(cachedJobs)
addAll(fetchedJobs)
}
return sorted.toList()
}
private object EligibleMinJobComparator : Comparator<MinimalJobSpec> {
override fun compare(o1: MinimalJobSpec, o2: MinimalJobSpec): Int {
// We want to sort by priority descending, then createTime ascending
@ -506,6 +552,25 @@ class FastJobStorage(private val jobDatabase: JobDatabase) : JobStorage {
}
}
}
/**
* Identical to [EligibleMinJobComparator], but for full jobs.
*/
private object EligibleFullJobComparator : Comparator<JobSpec> {
override fun compare(o1: JobSpec, o2: JobSpec): Int {
return when {
o1.priority > o2.priority -> -1
o1.priority < o2.priority -> 1
o1.createTime < o2.createTime -> -1
o1.createTime > o2.createTime -> 1
else -> o1.id.compareTo(o2.id)
}
}
}
private fun debugStopwatch(label: String): Stopwatch? {
return if (DEBUG) Stopwatch(label, decimalPlaces = 2) else null
}
}
/**

View file

@ -533,6 +533,33 @@ class FastJobStorageTest {
jobs.contains(DataSet1.JOB_1) assertIs false
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after deleted, next item in queue is eligible`() {
// Two jobs in the same queue but with different create times
val firstJob = DataSet1.JOB_1
val secondJob = DataSet1.JOB_1.copy(id = "id2", createTime = 2)
val subject = FastJobStorage(
mockDatabase(
fullSpecs = listOf(
FullSpec(jobSpec = firstJob, constraintSpecs = emptyList(), dependencySpecs = emptyList()),
FullSpec(jobSpec = secondJob, constraintSpecs = emptyList(), dependencySpecs = emptyList())
)
)
)
subject.init()
var jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.size assertIs 1
jobs.contains(firstJob) assertIs true
subject.deleteJobs(listOf("id1"))
jobs = subject.getPendingJobsWithNoDependenciesInCreatedOrder(100)
jobs.size assertIs 1
jobs.contains(firstJob) assertIs false
jobs.contains(secondJob) assertIs true
}
@Test
fun `getPendingJobsWithNoDependenciesInCreatedOrder - after marked running, no longer is in eligible list`() {
val subject = FastJobStorage(mockDatabase(DataSet1.FULL_SPECS))
@ -798,6 +825,10 @@ class FastJobStorageTest {
every { mock.getAllDependencySpecs() } returns dependencies
every { mock.getConstraintSpecsForJobs(any()) } returns constraints
every { mock.getJobSpec(any()) } answers { jobs.first { it.id == firstArg() } }
every { mock.getJobSpecsByKeys(any()) } answers {
val ids: Collection<String> = firstArg()
jobs.filter { ids.contains(it.id) }
}
every { mock.insertJobs(any()) } answers {
val inserts: List<FullSpec> = firstArg()
for (insert in inserts) {
@ -863,6 +894,12 @@ class FastJobStorageTest {
}
}
}
every { mock.getMostEligibleJobInQueue(any()) } answers {
jobs
.filter { it.queueKey == firstArg() }
.sortedByDescending { it.priority }
.minByOrNull { it.createTime }
}
return mock
}

View file

@ -8,7 +8,6 @@ package org.signal.core.util
import org.signal.core.util.logging.Log
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit
import kotlin.time.ExperimentalTime
import kotlin.time.measureTimedValue
/**
@ -83,7 +82,6 @@ class Stopwatch @JvmOverloads constructor(private val title: String, private val
/**
* Logs how long it takes to perform the operation.
*/
@OptIn(ExperimentalTime::class)
inline fun <T> logTime(tag: String, label: String, decimalPlaces: Int = 0, block: () -> T): T {
val result = measureTimedValue(block)
Log.d(tag, "$label: ${result.duration.toDouble(DurationUnit.MILLISECONDS).roundedString(decimalPlaces)}")

View file

@ -23,9 +23,15 @@ class EventTimer {
private val durationsByGroup: MutableMap<String, MutableList<Long>> = mutableMapOf()
private val startTime = System.nanoTime()
private var startTime = System.nanoTime()
private var lastTimeNanos: Long = startTime
fun reset() {
startTime = System.nanoTime()
lastTimeNanos = startTime
durationsByGroup.clear()
}
/**
* Indicates an event in the specified group has finished.
*/