Skip to content

Commit

Permalink
RCORE-2129 Do not send empty upload messages if there are no download…
Browse files Browse the repository at this point in the history
…s to ack (#7859)

* Do not send empty upload message if there is no download message to ack

* Add integration test

* Check if other messages need to be sent when uploads are not allowed
  • Loading branch information
danieltabacaru authored Jul 19, 2024
1 parent 45f1e85 commit a59aeae
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

### Fixed
* When a public name is defined on a property, calling `realm::Results::sort()` or `realm::Results::distinct()` with the internal name could throw an error like `Cannot sort on key path 'NAME': property 'PersonObject.NAME' does not exist`. ([realm/realm-js#6779](https://github.com/realm/realm-js/issues/6779), since v12.12.0)
* Fixed unnecessary server roundtrips when there is no download to acknowledge ([#2129](https://jira.mongodb.org/browse/RCORE-2129), since v14.8.0).

### Breaking changes
* None.
Expand Down
19 changes: 12 additions & 7 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2040,17 +2040,21 @@ void Session::send_upload_message()
target_upload_version = m_pending_flx_sub_set->snapshot_version;
}

bool server_version_to_ack =
m_upload_progress.last_integrated_server_version < m_download_progress.server_version;

std::vector<UploadChangeset> uploadable_changesets;
version_type locked_server_version = 0;
get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
locked_server_version); // Throws

if (uploadable_changesets.empty()) {
// Nothing more to upload right now
// If we need to limit upload up to some version other than the last client version available and there are no
// changes to upload, then there is no need to send an empty message.
if (m_pending_flx_sub_set) {
logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
// Nothing more to upload right now if:
// 1. We need to limit upload up to some version other than the last client version
// available and there are no changes to upload
// 2. There are no changes to upload and no server version(s) to acknowledge
if (m_pending_flx_sub_set || !server_version_to_ack) {
logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
// Other messages may be waiting to be sent
return enlist_to_send(); // Throws
Expand All @@ -2066,11 +2070,12 @@ void Session::send_upload_message()
version_type progress_server_version = m_upload_progress.last_integrated_server_version;

if (!upload_messages_allowed()) {
logger.debug("UPLOAD not allowed: upload progress(progress_client_version=%1, progress_server_version=%2, "
logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
"locked_server_version=%3, num_changesets=%4)",
progress_client_version, progress_server_version, locked_server_version,
uploadable_changesets.size()); // Throws
return;
// Other messages may be waiting to be sent
return enlist_to_send(); // Throws
}

logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
Expand Down
48 changes: 48 additions & 0 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4972,6 +4972,54 @@ TEST_CASE("flx: nested collections in mixed", "[sync][flx][baas]") {
CHECK(nested_list.get_any(1) == "foo");
}

TEST_CASE("flx: no upload during bootstraps", "[sync][flx][bootstrap][baas]") {
FLXSyncTestHarness harness("flx_bootstrap_no_upload", {g_large_array_schema, {"queryable_int_field"}});
fill_large_array_schema(harness);
auto config = harness.make_test_file();
enum class TestState { Start, BootstrapInProgress, BootstrapProcessed, BootstrapAck };
TestingStateMachine<TestState> state(TestState::Start);
config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession>, const SyncClientHookData& data) {
if (data.query_version == 0) {
return SyncClientHookAction::NoAction;
}
state.transition_with([&](TestState cur_state) -> std::optional<TestState> {
// Check no upload messages are sent during bootstrap.
if (data.event == SyncClientHookEvent::BootstrapMessageProcessed) {
CHECK((cur_state == TestState::Start || cur_state == TestState::BootstrapInProgress));
return TestState::BootstrapInProgress;
}
else if (data.event == SyncClientHookEvent::DownloadMessageIntegrated &&
data.batch_state == sync::DownloadBatchState::LastInBatch) {
CHECK(cur_state == TestState::BootstrapInProgress);
return TestState::BootstrapProcessed;
}
else if (data.event == SyncClientHookEvent::UploadMessageSent) {
// Uploads are allowed before a bootstrap starts.
if (cur_state == TestState::Start) {
return std::nullopt; // Don't transition
}
CHECK(cur_state == TestState::BootstrapProcessed);
return TestState::BootstrapAck;
}
return std::nullopt;
});
return SyncClientHookAction::NoAction;
};

auto realm = Realm::get_shared_realm(config);
auto table = realm->read_group().get_table("class_TopLevel");
auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
new_subs.insert_or_assign(Query(table));
new_subs.commit();
state.wait_for(TestState::BootstrapAck);

// Commiting an empty changeset does not upload a message.
realm->begin_transaction();
realm->commit_transaction();
// Give the sync client the chance to send an upload after mark.
wait_for_download(*realm);
}

} // namespace realm::app

#endif // REALM_ENABLE_AUTH_TESTS

0 comments on commit a59aeae

Please sign in to comment.