Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Realm.asFlow() potentially missing an update when doing a write immediately after opening the Realm #1599

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Fixed
* Cache notification callback JNI references at startup to ensure that symbols can be resolved in core callbacks. (Issue [#1577](https://github.com/realm/realm-kotlin/issues/1577))
* Using `Realm.asFlow()` could miss an update if a write was started right after opening the Realm. (Issue [#1582](https://github.com/realm/realm-kotlin/issues/1582))

### Compatibility
* File format: Generates Realms with file format v23.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.launch
import kotlin.reflect.KClass

Expand All @@ -65,7 +67,7 @@ public class RealmImpl private constructor(
internal val realmScope =
CoroutineScope(SupervisorJob() + notificationScheduler.dispatcher)
private val notifierFlow: MutableSharedFlow<RealmChange<Realm>> =
MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
MutableSharedFlow(onBufferOverflow = BufferOverflow.DROP_OLDEST, replay = 1)

private val notifier = SuspendableNotifier(
owner = this,
Expand Down Expand Up @@ -208,7 +210,13 @@ public class RealmImpl private constructor(
}

override fun asFlow(): Flow<RealmChange<Realm>> = scopedFlow {
notifierFlow.onStart { emit(InitialRealmImpl(this@RealmImpl)) }
notifierFlow.withIndex()
.map { (index, change) ->
when (index) {
0 -> InitialRealmImpl(this)
else -> change
}
}
}

override fun writeCopyTo(configuration: Configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class SuspendableNotifier(
// see https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt#L78
private val _realmChanged = MutableSharedFlow<VersionId>(
onBufferOverflow = BufferOverflow.DROP_OLDEST,
extraBufferCapacity = 1
replay = 1
)

val dispatcher: CoroutineDispatcher = scheduler.dispatcher
Expand Down Expand Up @@ -89,7 +89,10 @@ internal class SuspendableNotifier(
// Touching realm will open the underlying realm and register change listeners, but must
// happen on the dispatcher as the realm can only be touched on the dispatcher's thread.
if (!realmInitializer.isInitialized()) {
withContext(dispatcher) { realm }
withContext(dispatcher) {
realm
_realmChanged.emit(realm.version())
}
}
return _realmChanged.asSharedFlow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ class RealmDictionaryTests : EmbeddedObjectCollectionQueryTests {
val listener = async {
withTimeout(10.seconds) {
flow.collect { current ->
delay(30.milliseconds)
delay(100.milliseconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class RealmListTests : EmbeddedObjectCollectionQueryTests {
val listener = async {
withTimeout(10.seconds) {
flow.collect { current ->
delay(30.milliseconds)
delay(100.milliseconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ class RealmSetTests : CollectionQueryTests {
val listener = async {
withTimeout(10.seconds) {
flow.collect { current ->
delay(30.milliseconds)
delay(100.milliseconds)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ class VersionTrackingTests {
// Write that doesn't return objects does not trigger tracking additional versions
realm.write<Unit> { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(1, allTracked.size, toString())
assertTrue(1 >= allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(0, writer?.active?.size, toString())
}

// Until we actually query the object
realm.query<Sample>().find()
realm.activeVersions().run {
assertEquals(2, allTracked.size, toString())
assertTrue(2 >= allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(1, writer?.active?.size, toString())
}
Expand All @@ -129,7 +129,7 @@ class VersionTrackingTests {
// not assigned to a variable unless the generic return type is <Unit>)
realm.write { copyToRealm(Sample()) }
realm.activeVersions().run {
assertEquals(2, allTracked.size, toString())
assertTrue(2 >= allTracked.size, toString())
assertNotNull(writer, toString())
assertEquals(1, writer?.active?.size, toString())
}
Expand Down Expand Up @@ -219,8 +219,8 @@ class VersionTrackingTests {
@Suppress("invisible_member", "invisible_reference")
fun initialVersionDereferencedAfterFirstWrite() {
(realm as RealmImpl).let { realm ->
assertNotNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())
val intermediateVersions = realm.versionTracker.versions()
assertEquals(1, intermediateVersions.size, intermediateVersions.toString())

val realmUpdates = TestChannel<Unit>()

Expand All @@ -238,9 +238,11 @@ class VersionTrackingTests {

// Wait for the notifier to start
realmUpdates.receiveOrFail()

assertNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())
// Depending on the exact timing, the first version might or might not have been
// GC'ed. If GC'ed, there are no intermediate versions.
val trackedVersions = realm.versionTracker.versions()
assertTrue(1 >= trackedVersions.size, trackedVersions.toString())

deferred.cancel()
realmUpdates.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,45 @@ class RealmNotificationsTests : FlowableTests {
}
}

@Test
fun registerTwoFlows() = runBlocking {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was covered in cancelAsFlow, but I guess it is nice with an explicit test.

val c1 = TestChannel<RealmChange<Realm>>()
val c2 = TestChannel<RealmChange<Realm>>()
val startingVersion = realm.version()
val observer1 = async {
realm.asFlow().collect {
c1.send(it)
}
}
c1.receiveOrFail(message = "Failed to receive initial event on Channel 1").let { realmChange ->
assertIs<InitialRealm<Realm>>(realmChange)
assertEquals(startingVersion, realmChange.realm.version())
}

realm.write { /* Do nothing */ }
val nextVersion = realm.version()

val observer2 = async {
realm.asFlow().collect {
c2.send(it)
}
}

c1.receiveOrFail(message = "Failed to receive update event on Channel 1").let { realmChange ->
assertIs<UpdatedRealm<Realm>>(realmChange)
assertEquals(nextVersion, realmChange.realm.version())
}
c2.receiveOrFail(message = "Failed to receive initial event on Channel 2").let { realmChange ->
assertIs<InitialRealm<Realm>>(realmChange)
assertEquals(nextVersion, realmChange.realm.version())
}

observer1.cancel()
observer2.cancel()
c1.cancel()
c2.cancel()
}

@Test
override fun asFlow() {
runBlocking {
Expand Down