From c36c6e62e22f3e3dc8655a895f1c92db4689fa7d Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Mon, 16 Sep 2024 08:20:19 -0400 Subject: [PATCH] Add Flow.throttleLatest extension. --- .../banners/MediaRestoreProgressBanner.kt | 8 +- core-util-jvm/build.gradle.kts | 4 + .../org/signal/core/util/FlowExtensions.kt | 24 +++++ .../signal/core/util/FlowExtensionsTests.kt | 63 +++++++++++++ dependencies.gradle.kts | 3 + gradle/verification-metadata.xml | 88 +++++++++++++++++++ 6 files changed, 184 insertions(+), 6 deletions(-) create mode 100644 core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt create mode 100644 core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/banner/banners/MediaRestoreProgressBanner.kt b/app/src/main/java/org/thoughtcrime/securesms/banner/banners/MediaRestoreProgressBanner.kt index 0d0a75c74f..c1a967ca0a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/banner/banners/MediaRestoreProgressBanner.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/banner/banners/MediaRestoreProgressBanner.kt @@ -10,17 +10,14 @@ import androidx.compose.runtime.Composable import androidx.lifecycle.LifecycleOwner import androidx.lifecycle.flowWithLifecycle import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.withContext +import org.signal.core.util.throttleLatest import org.thoughtcrime.securesms.backup.v2.ui.status.BackupStatus import org.thoughtcrime.securesms.backup.v2.ui.status.BackupStatusData import org.thoughtcrime.securesms.banner.Banner @@ -67,8 +64,7 @@ class MediaRestoreProgressBanner(private val data: MediaRestoreEvent) : Banner() return flow .flowWithLifecycle(lifecycleOwner.lifecycle) - .buffer(1, BufferOverflow.DROP_OLDEST) - .onEach { delay(1.seconds) } + .throttleLatest(1.seconds) .map { MediaRestoreProgressBanner(loadData()) } .flowOn(Dispatchers.IO) } diff --git a/core-util-jvm/build.gradle.kts b/core-util-jvm/build.gradle.kts index f4711d14dd..1c6b2152f0 100644 --- a/core-util-jvm/build.gradle.kts +++ b/core-util-jvm/build.gradle.kts @@ -25,7 +25,11 @@ kotlin { dependencies { implementation(libs.kotlin.reflect) + implementation(libs.kotlinx.coroutines.core) + implementation(libs.kotlinx.coroutines.core.jvm) testImplementation(testLibs.junit.junit) testImplementation(testLibs.assertj.core) + testImplementation(testLibs.junit.junit) + testImplementation(testLibs.kotlinx.coroutines.test) } diff --git a/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt b/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt new file mode 100644 index 0000000000..45d252d67a --- /dev/null +++ b/core-util-jvm/src/main/java/org/signal/core/util/FlowExtensions.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.signal.core.util + +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.conflate +import kotlinx.coroutines.flow.onEach +import kotlin.time.Duration + +/** + * Throttles the flow so that at most one value is emitted every [timeout]. The latest value is always emitted. + * + * You can think of this like debouncing, but with "checkpoints" so that even if you have a constant stream of values, + * you'll still get an emission every [timeout] (unlike debouncing, which will only emit once the stream settles down). + */ +fun Flow.throttleLatest(timeout: Duration): Flow { + return this + .conflate() + .onEach { delay(timeout) } +} diff --git a/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt b/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt new file mode 100644 index 0000000000..43bd2f63e7 --- /dev/null +++ b/core-util-jvm/src/test/java/org/signal/core/util/FlowExtensionsTests.kt @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.signal.core.util + +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Test +import kotlin.time.Duration.Companion.milliseconds + +class FlowExtensionsTests { + + @Test + fun `throttleLatest - always emits first value`() = runTest { + val testFlow = flow { + delay(10) + emit(1) + } + + val output = testFlow + .throttleLatest(100.milliseconds) + .toList() + + assertEquals(listOf(1), output) + } + + @Test + fun `throttleLatest - always emits last value`() = runTest { + val testFlow = flow { + delay(10) + emit(1) + delay(30) + emit(2) + } + + val output = testFlow + .throttleLatest(20.milliseconds) + .toList() + + assertEquals(listOf(1, 2), output) + } + + @Test + fun `throttleLatest - skips intermediate values`() = runTest { + val testFlow = flow { + for (i in 1..30) { + emit(i) + delay(10) + } + } + + val output = testFlow + .throttleLatest(50.milliseconds) + .toList() + + assertEquals(listOf(1, 5, 10, 15, 20, 25, 30), output) + } +} diff --git a/dependencies.gradle.kts b/dependencies.gradle.kts index 8800c3ac2f..b93f922883 100644 --- a/dependencies.gradle.kts +++ b/dependencies.gradle.kts @@ -47,6 +47,8 @@ dependencyResolutionManagement { library("kotlin-reflect", "org.jetbrains.kotlin", "kotlin-reflect").versionRef("kotlin") library("kotlin-gradle-plugin", "org.jetbrains.kotlin", "kotlin-gradle-plugin").versionRef("kotlin") library("ktlint", "org.jlleitschuh.gradle:ktlint-gradle:12.1.1") + library("kotlinx-coroutines-core", "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0") + library("kotlinx-coroutines-core-jvm", "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.9.0") library("kotlinx-coroutines-play-services", "org.jetbrains.kotlinx:kotlinx-coroutines-play-services:1.8.1") library("kotlinx-coroutines-rx3", "org.jetbrains.kotlinx:kotlinx-coroutines-rx3:1.3.9") @@ -196,6 +198,7 @@ dependencyResolutionManagement { library("androidx-test-orchestrator", "androidx.test:orchestrator:1.4.1") library("androidx-test-runner", "androidx.test", "runner").versionRef("androidx-test") library("espresso-core", "androidx.test.espresso", "espresso-core").versionRef("espresso") + library("kotlinx-coroutines-test", "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.9.0") library("mockito-core", "org.mockito:mockito-inline:4.6.1") library("mockito-kotlin", "org.mockito.kotlin:mockito-kotlin:4.0.0") library("mockito-android", "org.mockito:mockito-android:4.6.1") diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml index 323d902b51..0aa29796b3 100644 --- a/gradle/verification-metadata.xml +++ b/gradle/verification-metadata.xml @@ -7968,6 +7968,17 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + + + + @@ -8043,6 +8054,11 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + @@ -8291,6 +8307,14 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + @@ -8346,6 +8370,14 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + @@ -8399,6 +8431,14 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + @@ -8407,6 +8447,14 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + @@ -8420,6 +8468,14 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + @@ -8433,6 +8489,14 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + @@ -8441,6 +8505,30 @@ https://docs.gradle.org/current/userguide/dependency_verification.html + + + + + + + + + + + + + + + + + + + + + + + +