diff --git a/src/realm/group.cpp b/src/realm/group.cpp index b66d49e1801..aac7beb7a9a 100644 --- a/src/realm/group.cpp +++ b/src/realm/group.cpp @@ -798,27 +798,27 @@ void Group::recycle_table_accessor(Table* to_be_recycled) g_table_recycler_1.push_back(to_be_recycled); } -void Group::remove_table(StringData name) +void Group::remove_table(StringData name, bool ignore_backlinks) { check_attached(); size_t table_ndx = m_table_names.find_first(name); if (table_ndx == not_found) throw NoSuchTable(); auto key = ndx2key(table_ndx); - remove_table(table_ndx, key); // Throws + remove_table(table_ndx, key, ignore_backlinks); // Throws } -void Group::remove_table(TableKey key) +void Group::remove_table(TableKey key, bool ignore_backlinks) { check_attached(); size_t table_ndx = key2ndx_checked(key); - remove_table(table_ndx, key); + remove_table(table_ndx, key, ignore_backlinks); } -void Group::remove_table(size_t table_ndx, TableKey key) +void Group::remove_table(size_t table_ndx, TableKey key, bool ignore_backlinks) { if (!m_is_writable) throw LogicError(ErrorCodes::ReadOnlyDB, "Database not writable"); @@ -832,7 +832,7 @@ void Group::remove_table(size_t table_ndx, TableKey key) // tables. Such a behaviour is deemed too obscure, and we shall therefore // require that a removed table does not contain foreign origin backlink // columns. - if (table->is_cross_table_link_target()) + if (!ignore_backlinks && table->is_cross_table_link_target()) throw CrossTableLinkTarget(table->get_name()); { diff --git a/src/realm/group.hpp b/src/realm/group.hpp index 9659f550254..ff663b723e3 100644 --- a/src/realm/group.hpp +++ b/src/realm/group.hpp @@ -321,8 +321,10 @@ class Group : public ArrayParent { TableRef get_or_add_table_with_primary_key(StringData name, DataType pk_type, StringData pk_name, bool nullable = false, Table::Type table_type = Table::Type::TopLevel); - void remove_table(TableKey key); - void remove_table(StringData name); + // Use 'ignore_backlinks' with caution. ignore_backlinks=true will leave things in an invalid state + // if the target table (or column) is not removed as well. + void remove_table(TableKey key, bool ignore_backlinks = false); + void remove_table(StringData name, bool ignore_backlinks = false); void rename_table(TableKey key, StringData new_name, bool require_unique_name = true); void rename_table(StringData name, StringData new_name, bool require_unique_name = true); @@ -631,7 +633,7 @@ class Group : public ArrayParent { void attach_shared(ref_type new_top_ref, size_t new_file_size, bool writable, VersionID version); void create_empty_group(); - void remove_table(size_t table_ndx, TableKey key); + void remove_table(size_t table_ndx, TableKey key, bool ignore_backlinks); void reset_free_space_tracking(); diff --git a/src/realm/object-store/object_store.cpp b/src/realm/object-store/object_store.cpp index 45b05f47362..e986bf801e1 100644 --- a/src/realm/object-store/object_store.cpp +++ b/src/realm/object-store/object_store.cpp @@ -867,8 +867,7 @@ static const char* schema_mode_to_string(SchemaMode mode) void ObjectStore::apply_schema_changes(Transaction& transaction, uint64_t schema_version, Schema& target_schema, uint64_t target_schema_version, SchemaMode mode, std::vector const& changes, bool handle_automatically_backlinks, - std::function migration_function, - bool set_schema_version_on_version_decrease) + std::function migration_function) { using namespace std::chrono; auto t1 = steady_clock::now(); @@ -889,16 +888,11 @@ void ObjectStore::apply_schema_changes(Transaction& transaction, uint64_t schema create_metadata_tables(transaction); if (mode == SchemaMode::AdditiveDiscovered || mode == SchemaMode::AdditiveExplicit) { - bool set_schema = (schema_version < target_schema_version || schema_version == ObjectStore::NotVersioned || - set_schema_version_on_version_decrease); - // With sync v2.x, indexes are no longer synced, so there's no reason to avoid creating them. bool update_indexes = true; apply_additive_changes(transaction, changes, update_indexes); - if (set_schema) - set_schema_version(transaction, target_schema_version); - + set_schema_version(transaction, target_schema_version); set_schema_keys(transaction, target_schema); return; } diff --git a/src/realm/object-store/object_store.hpp b/src/realm/object-store/object_store.hpp index 3278b3eb737..743ea0a7358 100644 --- a/src/realm/object-store/object_store.hpp +++ b/src/realm/object-store/object_store.hpp @@ -86,8 +86,7 @@ class ObjectStore { static void apply_schema_changes(Transaction& group, uint64_t schema_version, Schema& target_schema, uint64_t target_schema_version, SchemaMode mode, std::vector const& changes, bool handle_automatically_backlinks, - std::function migration_function = {}, - bool save_schema_version_on_version_decrease = false); + std::function migration_function = {}); static void apply_additive_changes(Group&, std::vector const&, bool update_indexes); diff --git a/src/realm/object-store/schema.hpp b/src/realm/object-store/schema.hpp index 9998347090a..3a91e5e119e 100644 --- a/src/realm/object-store/schema.hpp +++ b/src/realm/object-store/schema.hpp @@ -87,10 +87,6 @@ enum class SchemaMode : uint8_t { // The only changes allowed are to add new tables, add columns to // existing tables, and to add or remove indexes from existing // columns. Extra tables not present in the schema are ignored. - // Indexes are only added to or removed from existing columns if the - // schema version is greater than the existing one (and unlike other - // modes, the schema version is allowed to be less than the existing - // one). // The migration function is not used. // This should be used when including discovered user classes. // Previously called Additive. diff --git a/src/realm/object-store/shared_realm.cpp b/src/realm/object-store/shared_realm.cpp index 40d23526ebf..2b00e80ae09 100644 --- a/src/realm/object-store/shared_realm.cpp +++ b/src/realm/object-store/shared_realm.cpp @@ -497,12 +497,6 @@ void Realm::update_schema(Schema schema, uint64_t version, MigrationFunction mig schema.copy_keys_from(actual_schema, m_config.schema_subset_mode); - bool save_schema_version_on_version_decrease = false; -#if REALM_ENABLE_SYNC - if (m_config.sync_config && m_config.sync_config->flx_sync_requested) - save_schema_version_on_version_decrease = true; -#endif - uint64_t old_schema_version = m_schema_version; bool additive = m_config.schema_mode == SchemaMode::AdditiveDiscovered || m_config.schema_mode == SchemaMode::AdditiveExplicit || @@ -532,12 +526,11 @@ void Realm::update_schema(Schema schema, uint64_t version, MigrationFunction mig ObjectStore::apply_schema_changes(transaction(), version, m_schema, m_schema_version, m_config.schema_mode, required_changes, m_config.automatically_handle_backlinks_in_migrations, - wrapper, save_schema_version_on_version_decrease); + wrapper); } else { ObjectStore::apply_schema_changes(transaction(), m_schema_version, schema, version, m_config.schema_mode, - required_changes, m_config.automatically_handle_backlinks_in_migrations, - nullptr, save_schema_version_on_version_decrease); + required_changes, m_config.automatically_handle_backlinks_in_migrations); REALM_ASSERT_DEBUG(additive || (required_changes = ObjectStore::schema_from_group(read_group()).compare(schema)).empty()); } diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index da7568c212f..56fbcc28196 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -192,20 +191,10 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback, return; } - // Sync schema migrations require setting a subscription initializer callback to bootstrap the data. The - // subscriptions in the current realm file may not be compatible with the new schema so cannot rely on them. - auto config = coordinator->get_config(); - if (!config.sync_config->subscription_initializer) { - status = Status(ErrorCodes::SyncSchemaMigrationError, - "Sync schema migrations must provide a subscription initializer callback in the sync config"); - async_open_complete(std::move(callback), coordinator, status); - return; - } - // Migrate the schema. // * First upload the changes at the old schema version - // * Then, delete the realm, reopen it, and bootstrap at new schema version - // The lifetime of the task is extended until bootstrap completes. + // * Then, pause the session, delete all tables, re-initialize the metadata, and finally restart the session. + // The lifetime of the task is extended until the bootstrap completes. std::shared_ptr self(shared_from_this()); session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self, this](Status status) mutable { @@ -220,38 +209,11 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback, return; } - auto future = SyncSession::Internal::pause_async(*session); - // Wait until the SessionWrapper is done using the DBRef. - std::move(future).get_async([callback = std::move(callback), coordinator, self, this](Status status) mutable { - { - util::CheckedLockGuard lock(m_mutex); - if (!m_session) - return; // Swallow all events if the task has been cancelled. - } - - if (!status.is_ok()) { - self->async_open_complete(std::move(callback), coordinator, status); - return; - } - - // Delete the realm file and reopen it. - try { - util::CheckedLockGuard lock(m_mutex); - auto config = coordinator->get_config(); - m_session = nullptr; - coordinator->close(); - coordinator = nullptr; - util::File::remove(config.path); - coordinator = _impl::RealmCoordinator::get_coordinator(config); - m_session = coordinator->sync_session(); - } - catch (...) { - async_open_complete(std::move(callback), coordinator, exception_to_status()); - return; - } - + auto migration_completed_callback = [callback = std::move(callback), coordinator = std::move(coordinator), + self](Status status) mutable { self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status); - }); + }; + SyncSession::Internal::migrate_schema(*session, std::move(migration_completed_callback)); }); } diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index c20ec940633..b6b370f5d8f 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -395,7 +395,6 @@ SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr db, co , m_migration_store{sync::MigrationStore::create(m_db)} , m_client(client) , m_sync_manager(sync_manager) - , m_previous_schema_version(_impl::sync_schema_migration::has_pending_migration(*m_db->start_read())) { REALM_ASSERT(m_config.sync_config); // we don't want the following configs enabled during a client reset @@ -657,12 +656,12 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status, revive_if_needed(); } -util::Future SyncSession::Internal::pause_async(SyncSession& session) +util::Future SyncSession::pause_async() { { - util::CheckedUniqueLock lock(session.m_state_mutex); + util::CheckedUniqueLock lock(m_state_mutex); // Nothing to wait for if the session is already paused or inactive. - if (session.m_state == SyncSession::State::Paused || session.m_state == SyncSession::State::Inactive) { + if (m_state == SyncSession::State::Paused || m_state == SyncSession::State::Inactive) { return util::Future::make_ready(); } } @@ -671,8 +670,8 @@ util::Future SyncSession::Internal::pause_async(SyncSession& session) // must have been destroyed upon return. This allows the caller to follow up with a call to // sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works // so long as this SyncSession object remains in the `paused` state after the invocation of shutdown(). - session.pause(); - return session.m_client.notify_session_terminated(); + pause(); + return m_client.notify_session_terminated(); } void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error) @@ -682,7 +681,7 @@ void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::Sessi util::Future SyncSession::OnlyForTesting::pause_async(SyncSession& session) { - return SyncSession::Internal::pause_async(session); + return session.pause_async(); } // This method should only be called from within the error handler callback registered upon the underlying @@ -1418,10 +1417,10 @@ void SyncSession::update_subscription_store(bool flx_sync_requested, std::option // waiters auto subscription_store = std::move(m_flx_subscription_store); lock.unlock(); - subscription_store->terminate(); auto tr = m_db->start_write(); + subscription_store->reset(*tr); history.set_write_validator_factory(nullptr); - tr->rollback(); + tr->commit(); } return; } @@ -1688,3 +1687,59 @@ util::Future SyncSession::send_test_command(std::string body) return m_session->send_test_command(std::move(body)); } + +void SyncSession::migrate_schema(util::UniqueFunction&& callback) +{ + util::CheckedUniqueLock lock(m_state_mutex); + // If the schema migration is already in progress, just wait to complete. + if (m_schema_migration_in_progress) { + add_completion_callback(std::move(callback), ProgressDirection::download); + return; + } + m_schema_migration_in_progress = true; + + // Perform the migration: + // 1. Pause the sync session + // 2. Once the sync client releases the realm file: + // a. Delete all tables (private and public) + // b. Reset the subscription store + // d. Empty the sync history and adjust cursors + // e. Reset file ident (the server flags the old ident as in the case of a client reset) + // 3. Resume the session (the client asks for a new file ident) + // See `sync_schema_migration::perform_schema_migration` for more details. + + CompletionCallbacks callbacks; + std::swap(m_completion_callbacks, callbacks); + auto guard = util::make_scope_exit([&]() noexcept { + util::CheckedUniqueLock lock(m_state_mutex); + if (m_completion_callbacks.empty()) + std::swap(callbacks, m_completion_callbacks); + else + m_completion_callbacks.merge(std::move(callbacks)); + }); + m_state_mutex.unlock(lock); + + auto future = pause_async(); + std::move(future).get_async( + [callback = std::move(callback), weak_session = weak_from_this()](Status status) mutable { + if (!status.is_ok()) + return callback(status); + + auto session = weak_session.lock(); + if (!session) { + status = Status(ErrorCodes::InvalidSession, "Sync session was destroyed during schema migration"); + return callback(status); + } + sync_schema_migration::perform_schema_migration(*session->m_db); + { + util::CheckedUniqueLock lock(session->m_state_mutex); + session->m_previous_schema_version.reset(); + session->m_schema_migration_in_progress = false; + session->m_subscription_store_base.reset(); + session->m_flx_subscription_store.reset(); + } + session->update_subscription_store(true, {}); + session->wait_for_download_completion(std::move(callback)); + session->resume(); + }); +} \ No newline at end of file diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index ceed4dac480..18211ad7179 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -297,7 +297,10 @@ class SyncSession : public std::enable_shared_from_this { return session.m_db; } - static util::Future pause_async(SyncSession& session); + static void migrate_schema(SyncSession& session, util::UniqueFunction&& callback) + { + session.migrate_schema(std::move(callback)); + } }; // Expose some internal functionality to testing code. @@ -410,6 +413,7 @@ class SyncSession : public std::enable_shared_from_this { REQUIRES(!m_connection_state_mutex); void become_paused(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex); void become_waiting_for_access_token() REQUIRES(m_state_mutex); + util::Future pause_async() REQUIRES(!m_state_mutex, !m_connection_state_mutex); // do restart session restarts the session without freeing any of the waiters void do_restart_session(util::CheckedUniqueLock) @@ -431,6 +435,9 @@ class SyncSession : public std::enable_shared_from_this { util::Future send_test_command(std::string body) REQUIRES(!m_state_mutex); + void migrate_schema(util::UniqueFunction&& callback) + REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex); + std::function m_sync_transact_callback GUARDED_BY(m_state_mutex); template @@ -497,6 +504,7 @@ class SyncSession : public std::enable_shared_from_this { // Set if ProtocolError::schema_version_changed error is received from the server. std::optional m_previous_schema_version GUARDED_BY(m_state_mutex); + bool m_schema_migration_in_progress GUARDED_BY(m_state_mutex) = false; }; } // namespace realm diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 1060f30a10f..489203a72b5 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -38,7 +38,7 @@ namespace realm::sync { -void ClientHistory::set_client_reset_adjustments( +void ClientHistory::set_history_adjustments( util::Logger& logger, version_type current_version, SaltedFileIdent client_file_ident, SaltedVersion server_version, const std::vector<_impl::client_reset::RecoveredChange>& recovered_changesets) { @@ -53,7 +53,7 @@ void ClientHistory::set_client_reset_adjustments( size_t uploadable_bytes = 0; if (recovered_changesets.empty()) { // Either we had nothing to upload or we're discarding the unsynced changes - logger.debug("Client reset adjustments: discarding %1 history entries", sync_history_size()); + logger.debug("History adjustments: discarding %1 history entries", sync_history_size()); do_trim_sync_history(sync_history_size()); // Throws } else { @@ -66,7 +66,7 @@ void ClientHistory::set_client_reset_adjustments( do_trim_sync_history(discard_count); if (logger.would_log(util::Logger::Level::debug)) { - logger.debug("Client reset adjustments: trimming %1 history entries and updating the remaining history " + logger.debug("History adjustments: trimming %1 history entries and updating the remaining history " "entries (%2)", discard_count, sync_history_size()); for (size_t i = 0, size = m_arrays->changesets.size(); i < size; ++i) { @@ -94,7 +94,7 @@ void ClientHistory::set_client_reset_adjustments( m_arrays->changesets.get(i).size(), server_version.version); } } - logger.debug("New uploadable bytes after client reset adjustment: %1", uploadable_bytes); + logger.debug("New uploadable bytes after history adjustment: %1", uploadable_bytes); // Client progress versions are set to 0 to signal to the server that we've // reset our versioning. If we send the actual values, the server would diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index af319cf3954..2b33e91c1de 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -101,13 +101,13 @@ class ClientHistory final : public _impl::History, public TransformHistory { std::unique_ptr buffer; }; - /// set_client_reset_adjustments() is used by client reset to adjust the + /// set_history_adjustments() is used by client reset to adjust the /// content of the history compartment. The DB associated with /// this history object must be in a write transaction when this function /// is called. - void set_client_reset_adjustments(util::Logger& logger, version_type current_version, - SaltedFileIdent client_file_ident, SaltedVersion server_version, - const std::vector<_impl::client_reset::RecoveredChange>&); + void set_history_adjustments(util::Logger& logger, version_type current_version, + SaltedFileIdent client_file_ident, SaltedVersion server_version, + const std::vector<_impl::client_reset::RecoveredChange>&); struct LocalChange { version_type version; diff --git a/src/realm/sync/noinst/client_reset.cpp b/src/realm/sync/noinst/client_reset.cpp index 0ee83fa4ee6..8a243ef2a84 100644 --- a/src/realm/sync/noinst/client_reset.cpp +++ b/src/realm/sync/noinst/client_reset.cpp @@ -616,8 +616,8 @@ bool perform_client_reset_diff(DB& db_local, DB& db_remote, sync::SaltedFileIden // now that the state of the fresh and local Realms are identical, // reset the local sync history and steal the fresh Realm's ident - history_local.set_client_reset_adjustments(logger, wt_local->get_version(), client_file_ident, - fresh_server_version, recovered); + history_local.set_history_adjustments(logger, wt_local->get_version(), client_file_ident, fresh_server_version, + recovered); int64_t subscription_version = 0; if (sub_store) { diff --git a/src/realm/sync/noinst/client_reset.hpp b/src/realm/sync/noinst/client_reset.hpp index a4d7e9b35d1..afd8307a297 100644 --- a/src/realm/sync/noinst/client_reset.hpp +++ b/src/realm/sync/noinst/client_reset.hpp @@ -61,8 +61,6 @@ struct ClientResetFailed : public std::runtime_error { void transfer_group(const Transaction& tr_src, Transaction& tr_dst, util::Logger& logger, bool allow_schema_additions); -void remove_all_tables(Transaction& tr_dst, util::Logger& logger); - struct PendingReset { ClientResyncMode type; Timestamp time; diff --git a/src/realm/sync/noinst/pending_bootstrap_store.cpp b/src/realm/sync/noinst/pending_bootstrap_store.cpp index 6a7d0aad2a2..03fb7f97231 100644 --- a/src/realm/sync/noinst/pending_bootstrap_store.cpp +++ b/src/realm/sync/noinst/pending_bootstrap_store.cpp @@ -199,10 +199,15 @@ bool PendingBootstrapStore::has_pending() void PendingBootstrapStore::clear() { auto tr = m_db->start_write(); - auto bootstrap_table = tr->get_table(m_table); + clear(*tr); + tr->commit(); +} + +void PendingBootstrapStore::clear(Transaction& wt) +{ + auto bootstrap_table = wt.get_table(m_table); bootstrap_table->clear(); m_has_pending = false; - tr->commit(); } PendingBootstrapStore::PendingBatch PendingBootstrapStore::peek_pending(size_t limit_in_bytes) diff --git a/src/realm/sync/noinst/pending_bootstrap_store.hpp b/src/realm/sync/noinst/pending_bootstrap_store.hpp index 0fae7dc3b8a..e2e363c29ad 100644 --- a/src/realm/sync/noinst/pending_bootstrap_store.hpp +++ b/src/realm/sync/noinst/pending_bootstrap_store.hpp @@ -82,6 +82,7 @@ class PendingBootstrapStore { const std::vector& changesets, bool* created_new_batch); void clear(); + void clear(Transaction& wt); private: diff --git a/src/realm/sync/noinst/sync_schema_migration.cpp b/src/realm/sync/noinst/sync_schema_migration.cpp index 423c609a5bc..a89c385b57c 100644 --- a/src/realm/sync/noinst/sync_schema_migration.cpp +++ b/src/realm/sync/noinst/sync_schema_migration.cpp @@ -17,8 +17,8 @@ **************************************************************************/ #include - -#include +#include +#include #include @@ -106,4 +106,32 @@ void track_sync_schema_migration(Transaction& wt, uint64_t previous_schema_versi } } +void perform_schema_migration(DB& db) +{ + // Everything is performed in one single write transaction. + auto tr = db.start_write(); + + // Disable sync replication. + auto& repl = dynamic_cast(*db.get_replication()); + sync::TempShortCircuitReplication sync_history_guard(repl); + repl.set_write_validator_factory(nullptr); + + // Delete all tables (and their columns). + const bool ignore_backlinks = true; + for (const auto& tk : tr->get_table_keys()) { + tr->remove_table(tk, ignore_backlinks); + } + + // Clear sync history, reset the file ident and the server version in the download and upload progress. + + auto& history = repl.get_history(); + sync::SaltedFileIdent reset_file_ident{0, 0}; + sync::SaltedVersion reset_server_version{0, 0}; + std::vector<_impl::client_reset::RecoveredChange> changes{}; + history.set_history_adjustments(*db.get_logger(), tr->get_version(), reset_file_ident, reset_server_version, + changes); + + tr->commit(); +} + } // namespace realm::_impl::sync_schema_migration \ No newline at end of file diff --git a/src/realm/sync/noinst/sync_schema_migration.hpp b/src/realm/sync/noinst/sync_schema_migration.hpp index 8ae60e385b4..bed44dbed17 100644 --- a/src/realm/sync/noinst/sync_schema_migration.hpp +++ b/src/realm/sync/noinst/sync_schema_migration.hpp @@ -18,6 +18,7 @@ #pragma once +#include #include #include @@ -29,5 +30,7 @@ std::optional has_pending_migration(const Transaction& rt); void track_sync_schema_migration(Transaction& wt, uint64_t previous_schema_version); +void perform_schema_migration(DB& db); + } // namespace _impl::sync_schema_migration } // namespace realm \ No newline at end of file diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp index de236c520dd..9498d9418ca 100644 --- a/src/realm/sync/subscriptions.cpp +++ b/src/realm/sync/subscriptions.cpp @@ -648,26 +648,29 @@ SubscriptionStore::SubscriptionStore(Private, DBRef db) } // Make sure the subscription set table is properly initialized - initialize_subscriptions_table(std::move(tr), false); + initialize_subscriptions_table(std::move(tr)); } -void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool clear_table) +void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr) { - if (auto sub_sets = tr->get_table(m_sub_set_table); clear_table || sub_sets->is_empty()) { + if (auto sub_sets = tr->get_table(m_sub_set_table); sub_sets->is_empty()) { tr->promote_to_write(); - // If erase_table is true, clear out the sub_sets table - if (clear_table) { - sub_sets->clear(); - } - // There should always be at least one subscription set so that the user can always wait - // for synchronizationon on the result of get_latest(). - auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)}); - zero_sub.set(m_sub_set_state, static_cast(SubscriptionSet::State::Pending)); - zero_sub.set(m_sub_set_snapshot_version, tr->get_version()); + clear(*tr); tr->commit(); } } +void SubscriptionStore::clear(Transaction& wt) +{ + auto sub_sets = wt.get_table(m_sub_set_table); + sub_sets->clear(); + // There should always be at least one subscription set so that the user can always wait + // for synchronizationon on the result of get_latest(). + auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)}); + zero_sub.set(m_sub_set_state, static_cast(SubscriptionSet::State::Pending)); + zero_sub.set(m_sub_set_snapshot_version, wt.get_version()); +} + SubscriptionSet SubscriptionStore::get_latest() { auto tr = m_db->start_frozen(); @@ -792,10 +795,10 @@ void SubscriptionStore::notify_all_state_change_notifications(Status status) } } -void SubscriptionStore::terminate() +void SubscriptionStore::reset(Transaction& wt) { // Clear out and initialize the subscription store - initialize_subscriptions_table(m_db->start_read(), true); + clear(wt); util::CheckedUniqueLock lk(m_pending_notifications_mutex); auto to_finish = std::move(m_pending_notifications); diff --git a/src/realm/sync/subscriptions.hpp b/src/realm/sync/subscriptions.hpp index 91127f03f3c..751ec84e457 100644 --- a/src/realm/sync/subscriptions.hpp +++ b/src/realm/sync/subscriptions.hpp @@ -361,7 +361,7 @@ class SubscriptionStore : public std::enable_shared_from_this // Reset SubscriptionStore and erase all current subscriptions and supersede any pending // subscriptions. Must be called from the event loop thread to prevent data race issues // with the subscription store. - void terminate() REQUIRES(!m_pending_notifications_mutex); + void reset(Transaction& wt) REQUIRES(!m_pending_notifications_mutex); // Recreate the active subscription set, marking any newer pending ones as // superseded. This is a no-op if there are no pending subscription sets. @@ -394,9 +394,10 @@ class SubscriptionStore : public std::enable_shared_from_this SubscriptionSet get_refreshed(ObjKey, int64_t flx_version, std::optional version = util::none); MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set); - // Ensure the subscriptions table is properly initialized - // If clear_table is true, the subscriptions table will be cleared before initialization - void initialize_subscriptions_table(TransactionRef&& tr, bool clear_table); + // Ensure the subscriptions table is properly initialized. No-op if already initialized. + void initialize_subscriptions_table(TransactionRef&& tr); + // Clear the table and reinitialize it. + void clear(Transaction& wt); friend class MutableSubscriptionSet; friend class Subscription; diff --git a/test/object-store/migrations.cpp b/test/object-store/migrations.cpp index f254aa4952f..2c47a1acce4 100644 --- a/test/object-store/migrations.cpp +++ b/test/object-store/migrations.cpp @@ -2279,7 +2279,7 @@ TEST_CASE("migration: Additive", "[migration]") { REQUIRE_NOTHROW(realm->update_schema(schema, 1)); REQUIRE(realm->schema_version() == 1); REQUIRE_NOTHROW(realm->update_schema(schema, 0)); - REQUIRE(realm->schema_version() == 1); + REQUIRE(realm->schema_version() == 0); } SECTION("migration function is not used") { diff --git a/test/object-store/sync/client_reset.cpp b/test/object-store/sync/client_reset.cpp index b44720796b5..7d5f80617f4 100644 --- a/test/object-store/sync/client_reset.cpp +++ b/test/object-store/sync/client_reset.cpp @@ -618,7 +618,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { partition_prop, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); }) ->make_local_changes([&](SharedRealm local) { local->update_schema( @@ -646,7 +646,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { remotely_added_property, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); create_object(*local, new_table_name, {pk1}, partition); create_object(*local, existing_table_name, {pk1}, partition); @@ -719,7 +719,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { {prop_name, PropertyType::Float}, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); }) ->make_remote_changes([&](SharedRealm remote) { remote->update_schema( @@ -1249,7 +1249,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { {"realm_id", PropertyType::String | PropertyType::Nullable}, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); create_object(*local, "object2", ObjectId::gen(), partition); create_object(*local, "object2", ObjectId::gen(), partition); }) @@ -1285,7 +1285,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { {"realm_id", PropertyType::String | PropertyType::Nullable}, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); auto table = ObjectStore::table_for_object_type(local->read_group(), "object"); table->begin()->set(table->get_column_key("value2"), 123); }) @@ -1321,7 +1321,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { {"realm_id", PropertyType::String | PropertyType::Nullable}, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); }) ->make_remote_changes([](SharedRealm remote) { remote->update_schema( @@ -1367,7 +1367,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { {"realm_id", PropertyType::String | PropertyType::Nullable}, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); }) ->make_remote_changes([](SharedRealm remote) { remote->update_schema( @@ -1409,7 +1409,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { {"realm_id", PropertyType::String | PropertyType::Nullable}, }}, }, - 0, nullptr, nullptr, true); + 1, nullptr, nullptr, true); }) ->make_remote_changes([](SharedRealm remote) { remote->update_schema( diff --git a/test/object-store/sync/flx_schema_migration.cpp b/test/object-store/sync/flx_schema_migration.cpp index e9412e2136d..7ece888a6b5 100644 --- a/test/object-store/sync/flx_schema_migration.cpp +++ b/test/object-store/sync/flx_schema_migration.cpp @@ -272,7 +272,7 @@ TEST_CASE("Sync schema migrations don't work with sync open", "[sync][flx][flx s } SECTION("Breaking change detected by server") { - // Remove table 'TopLevel2'. + // Remove table 'TopLevel3'. schema_v1.pop_back(); config.schema = schema_v1; create_schema(app_session, *config.schema, config.schema_version); @@ -294,8 +294,8 @@ TEST_CASE("Sync schema migrations don't work with sync open", "[sync][flx][flx s wait_for_download(*realm); wait_for_upload(*realm); - auto table = realm->read_group().get_table("class_TopLevel2"); - // Migration did not succeed because table 'TopLevel2' still exists (but there is no error). + auto table = realm->read_group().get_table("class_TopLevel3"); + // Migration did not succeed because table 'TopLevel3' still exists (but there is no error). CHECK(table); check_realm_schema(config.path, schema_v0, 1); } @@ -435,9 +435,12 @@ TEST_CASE("Schema version mismatch between client and server", "[sync][flx][flx REQUIRE_FALSE(realm); REQUIRE(error); REQUIRE_THROWS_CONTAINING(std::rethrow_exception(error), - "Synchronization no longer possible for client-side file"); + "The following changes cannot be made in additive-only schema mode"); REQUIRE(schema_migration_required); - check_realm_schema(config.path, schema_v0, 1); + // Applying the new schema (and version) fails, therefore the schema is unversioned (the metadata table is removed + // during migration). There is a schema though because the server schema is already applied by the time the client + // applies the mismatch schema. + check_realm_schema(config.path, schema_v1, ObjectStore::NotVersioned); wait_for_sessions_to_close(harness.session()); } @@ -503,12 +506,13 @@ TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][ {"queryable_int_field", static_cast(15)}, {"non_queryable_field2", "non queryable 33"s}})); realm->commit_transaction(); - // This server drops this object because the client is querying on a removed field. + // The server filters out this object because the schema version the client migrates to removes the queryable + // field. realm->begin_transaction(); Object::create( c, realm, "TopLevel3", std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); - + realm->commit_transaction(); realm->close(); } @@ -548,7 +552,6 @@ TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][ config.schema_version = 1; config.schema = schema_v1; config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); - config.sync_config->on_sync_client_event_hook = nullptr; auto [realm, error] = async_open_realm(config); REQUIRE(realm); REQUIRE_FALSE(error); @@ -556,6 +559,8 @@ TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][ auto table = realm->read_group().get_table("class_TopLevel"); CHECK(table->size() == 3); + table = realm->read_group().get_table("class_TopLevel2"); + CHECK(!table); table = realm->read_group().get_table("class_TopLevel3"); CHECK(table->size() == 1); CHECK(table->get_object_with_primary_key(obj3_id)); @@ -586,6 +591,8 @@ TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][ auto table = realm->read_group().get_table("class_TopLevel"); CHECK(table->size() == 4); + table = realm->read_group().get_table("class_TopLevel2"); + CHECK(!table); table = realm->read_group().get_table("class_TopLevel3"); CHECK(table->size() == 1); CHECK(table->get_object_with_primary_key(obj3_id)); @@ -604,6 +611,8 @@ TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][ auto table = realm->read_group().get_table("class_TopLevel"); CHECK(table->size() == 4); + table = realm->read_group().get_table("class_TopLevel2"); + CHECK(!table); table = realm->read_group().get_table("class_TopLevel3"); CHECK(table->size() == 1); CHECK(table->get_object_with_primary_key(obj3_id)); @@ -627,8 +636,9 @@ TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][ auto table3 = realm->read_group().get_table("class_TopLevel3"); CHECK(table3->is_empty()); - // The existing subscription for 'TopLevel3' is on a removed field (in version 1), so data cannot be sync'd. - // Update subscription so data can be sync'd. + // The subscription for 'TopLevel3' is on a removed field (i.e, the field does not exist in the previous + // schema version used), so data cannot be synced. + // Update subscription so data can be synced. auto subs = realm->get_latest_subscription_set().make_mutable_copy(); CHECK(subs.erase_by_class_name("TopLevel3")); subs.insert_or_assign(Query(table3)); @@ -758,6 +768,8 @@ TEST_CASE("Client reset during schema migration", "[sync][flx][flx schema migrat {"queryable_str_field", "foo"s}, {"queryable_int_field", static_cast(15)}, {"non_queryable_field2", "non queryable 11"s}})); + // The server filters out this object because the schema version the client migrates to removes the queryable + // field. Object::create( c, realm, "TopLevel3", std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); @@ -931,7 +943,7 @@ TEST_CASE("Send schema version zero if no schema is used to open the realm", REQUIRE(realm); REQUIRE_FALSE(error); // The schema is received from the server, but it is unversioned. - check_realm_schema(config.path, schema_v0, -1); + check_realm_schema(config.path, schema_v0, ObjectStore::NotVersioned); } TEST_CASE("Allow resetting the schema version to zero after bad schema version error", @@ -974,6 +986,226 @@ TEST_CASE("Allow resetting the schema version to zero after bad schema version e check_realm_schema(config.path, schema_v0, 0); } +TEST_CASE("Client reset and schema migration", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + auto config = harness.make_test_file(); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(config.path, schema_v0, 0); + + realm->sync_session()->pause(); + + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "foo"s}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field2", "non queryable 11"s}})); + Object::create( + c, realm, "TopLevel3", + std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); + realm->commit_transaction(); + + // Trigger a client reset. + reset_utils::trigger_client_reset(harness.session().app_session(), *realm->sync_session()); + } + _impl::RealmCoordinator::assert_no_open_realms(); + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, schema_v1, 1); + + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + config.sync_config->client_resync_mode = ClientResyncMode::Recover; + config.sync_config->on_sync_client_event_hook = [](std::weak_ptr, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code == sync::ProtocolError::initial_sync_not_completed) { + return SyncClientHookAction::NoAction; + } + CHECK((error_code == sync::ProtocolError::schema_version_changed || + error_code == sync::ProtocolError::bad_client_file_ident)); + return SyncClientHookAction::NoAction; + }; + size_t before_reset_count = 0; + size_t after_reset_count = 0; + config.sync_config->notify_before_client_reset = [&before_reset_count](SharedRealm) { + ++before_reset_count; + }; + config.sync_config->notify_after_client_reset = [&after_reset_count](SharedRealm, ThreadSafeReference, bool) { + ++after_reset_count; + }; + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + REQUIRE(before_reset_count == 0); + REQUIRE(after_reset_count == 0); + check_realm_schema(config.path, schema_v1, 1); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 1); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->is_empty()); +} + +TEST_CASE("Multiple async open tasks trigger a schema migration", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + auto config = harness.make_test_file(); + config.sync_config->rerun_init_subscription_on_open = true; + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(config.path, schema_v0, 0); + + realm->sync_session()->pause(); + + // Subscription to recover when upgrading the schema. + auto subs = realm->get_latest_subscription_set().make_mutable_copy(); + CHECK(subs.erase_by_class_name("TopLevel2")); + auto table = realm->read_group().get_table("class_TopLevel2"); + auto col_key = table->get_column_key("queryable_int_field"); + auto query = Query(table).greater_equal(col_key, int64_t(0)); + subs.insert_or_assign(query); + subs.commit(); + + // Object to recover when upgrading the schema. + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "biz"s}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field2", "non queryable 33"s}})); + realm->commit_transaction(); + // The server filters out this object because the schema version the client migrates to removes the queryable + // field. + realm->begin_transaction(); + Object::create( + c, realm, "TopLevel3", + std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); + realm->commit_transaction(); + realm->close(); + } + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, schema_v1, 1); + + // Upgrade the schema version + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + + auto task1 = Realm::get_synchronized_realm(config); + auto task2 = Realm::get_synchronized_realm(config); + + auto open_task1_pf = util::make_promise_future(); + auto open_task2_pf = util::make_promise_future(); + auto open_callback1 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task1_pf.promise))]( + ThreadSafeReference ref, std::exception_ptr err) mutable { + REQUIRE_FALSE(err); + auto realm = Realm::get_shared_realm(std::move(ref)); + REQUIRE(realm); + promise_holder.get_promise().emplace_value(realm); + }; + auto open_callback2 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task2_pf.promise))]( + ThreadSafeReference ref, std::exception_ptr err) mutable { + REQUIRE_FALSE(err); + auto realm = Realm::get_shared_realm(std::move(ref)); + REQUIRE(realm); + promise_holder.get_promise().emplace_value(realm); + }; + + task1->start(open_callback1); + task2->start(open_callback2); + + auto realm1 = open_task1_pf.future.get(); + auto realm2 = open_task2_pf.future.get(); + + auto verify_realm = [&](SharedRealm realm) { + check_realm_schema(config.path, schema_v1, 1); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 1); + table = realm->read_group().get_table("class_TopLevel2"); + CHECK(!table); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->is_empty()); + }; + + verify_realm(realm1); + verify_realm(realm2); +} + +TEST_CASE("Upgrade schema version with no subscription initializer", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + auto config = harness.make_test_file(); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(config.path, schema_v0, 0); + + realm->sync_session()->pause(); + + // Object to recover when upgrading the schema. + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "biz"s}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field2", "non queryable 33"s}})); + realm->commit_transaction(); + realm->close(); + } + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, schema_v1, 1); + + { + // Upgrade the schema version + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = nullptr; + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(config.path, schema_v1, 1); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->is_empty()); + table = realm->read_group().get_table("class_TopLevel2"); + CHECK(!table); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->is_empty()); + } +} + } // namespace realm::app #endif // REALM_ENABLE_AUTH_TESTS diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp index dc528d5a84f..78e18f170d7 100644 --- a/test/test_sync_subscriptions.cpp +++ b/test/test_sync_subscriptions.cpp @@ -646,7 +646,9 @@ TEST(Sync_SubscriptionStoreTerminate) CHECK_EQUAL(store->get_latest().version(), 3); CHECK_EQUAL(store->get_pending_subscriptions().size(), 3); - store->terminate(); // notifications are called on this thread + auto write_tr = fixture.db->start_write(); + store->reset(*write_tr); // notifications are called on this thread + write_tr->commit(); CHECK_EQUAL(hit_count, 3); CHECK_EQUAL(store->get_latest().version(), 0);