Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2006 Reuse realm file for sync schema migrations #7487

Merged
merged 6 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/realm/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,27 +798,27 @@ void Group::recycle_table_accessor(Table* to_be_recycled)
g_table_recycler_1.push_back(to_be_recycled);
}

void Group::remove_table(StringData name)
void Group::remove_table(StringData name, bool ignore_backlinks)
{
check_attached();
size_t table_ndx = m_table_names.find_first(name);
if (table_ndx == not_found)
throw NoSuchTable();
auto key = ndx2key(table_ndx);
remove_table(table_ndx, key); // Throws
remove_table(table_ndx, key, ignore_backlinks); // Throws
}


void Group::remove_table(TableKey key)
void Group::remove_table(TableKey key, bool ignore_backlinks)
{
check_attached();

size_t table_ndx = key2ndx_checked(key);
remove_table(table_ndx, key);
remove_table(table_ndx, key, ignore_backlinks);
}


void Group::remove_table(size_t table_ndx, TableKey key)
void Group::remove_table(size_t table_ndx, TableKey key, bool ignore_backlinks)
{
if (!m_is_writable)
throw LogicError(ErrorCodes::ReadOnlyDB, "Database not writable");
Expand All @@ -832,7 +832,7 @@ void Group::remove_table(size_t table_ndx, TableKey key)
// tables. Such a behaviour is deemed too obscure, and we shall therefore
// require that a removed table does not contain foreign origin backlink
// columns.
if (table->is_cross_table_link_target())
if (!ignore_backlinks && table->is_cross_table_link_target())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this leave the db in a state where there are tables out there with invalid links? I guess in the place this is used right now that may not be important to consider because we remove all tables in a single WT, but what are the implications if an SDK started using this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed. we remove all tables so it should not cause any issue. I don't think we expose this to the SDKs (had a look in spec.yml and there is no reference). I could also have a private method and friend the class (or some other utility)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java was the only SDK which exposed table schema mutations directly and everything else only does it by setting the object store schema. Should probably add a comment in the declaration that ignore_backlinks=true will leave things in an invalid state just in case someone ends up using it in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

throw CrossTableLinkTarget(table->get_name());

{
Expand Down
6 changes: 3 additions & 3 deletions src/realm/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ class Group : public ArrayParent {
TableRef get_or_add_table_with_primary_key(StringData name, DataType pk_type, StringData pk_name,
bool nullable = false, Table::Type table_type = Table::Type::TopLevel);

void remove_table(TableKey key);
void remove_table(StringData name);
void remove_table(TableKey key, bool ignore_backlinks = false);
void remove_table(StringData name, bool ignore_backlinks = false);

void rename_table(TableKey key, StringData new_name, bool require_unique_name = true);
void rename_table(StringData name, StringData new_name, bool require_unique_name = true);
Expand Down Expand Up @@ -631,7 +631,7 @@ class Group : public ArrayParent {
void attach_shared(ref_type new_top_ref, size_t new_file_size, bool writable, VersionID version);

void create_empty_group();
void remove_table(size_t table_ndx, TableKey key);
void remove_table(size_t table_ndx, TableKey key, bool ignore_backlinks);

void reset_free_space_tracking();

Expand Down
10 changes: 2 additions & 8 deletions src/realm/object-store/object_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,7 @@ static const char* schema_mode_to_string(SchemaMode mode)
void ObjectStore::apply_schema_changes(Transaction& transaction, uint64_t schema_version, Schema& target_schema,
uint64_t target_schema_version, SchemaMode mode,
std::vector<SchemaChange> const& changes, bool handle_automatically_backlinks,
std::function<void()> migration_function,
bool set_schema_version_on_version_decrease)
std::function<void()> migration_function)
{
using namespace std::chrono;
auto t1 = steady_clock::now();
Expand All @@ -889,16 +888,11 @@ void ObjectStore::apply_schema_changes(Transaction& transaction, uint64_t schema
create_metadata_tables(transaction);

if (mode == SchemaMode::AdditiveDiscovered || mode == SchemaMode::AdditiveExplicit) {
bool set_schema = (schema_version < target_schema_version || schema_version == ObjectStore::NotVersioned ||
tgoyne marked this conversation as resolved.
Show resolved Hide resolved
set_schema_version_on_version_decrease);

// With sync v2.x, indexes are no longer synced, so there's no reason to avoid creating them.
bool update_indexes = true;
apply_additive_changes(transaction, changes, update_indexes);

if (set_schema)
set_schema_version(transaction, target_schema_version);

set_schema_version(transaction, target_schema_version);
set_schema_keys(transaction, target_schema);
return;
}
Expand Down
3 changes: 1 addition & 2 deletions src/realm/object-store/object_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class ObjectStore {
static void apply_schema_changes(Transaction& group, uint64_t schema_version, Schema& target_schema,
uint64_t target_schema_version, SchemaMode mode,
std::vector<SchemaChange> const& changes, bool handle_automatically_backlinks,
std::function<void()> migration_function = {},
bool save_schema_version_on_version_decrease = false);
std::function<void()> migration_function = {});

static void apply_additive_changes(Group&, std::vector<SchemaChange> const&, bool update_indexes);

Expand Down
11 changes: 2 additions & 9 deletions src/realm/object-store/shared_realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,6 @@ void Realm::update_schema(Schema schema, uint64_t version, MigrationFunction mig

schema.copy_keys_from(actual_schema, m_config.schema_subset_mode);

bool save_schema_version_on_version_decrease = false;
#if REALM_ENABLE_SYNC
if (m_config.sync_config && m_config.sync_config->flx_sync_requested)
save_schema_version_on_version_decrease = true;
#endif

uint64_t old_schema_version = m_schema_version;
bool additive = m_config.schema_mode == SchemaMode::AdditiveDiscovered ||
m_config.schema_mode == SchemaMode::AdditiveExplicit ||
Expand Down Expand Up @@ -532,12 +526,11 @@ void Realm::update_schema(Schema schema, uint64_t version, MigrationFunction mig

ObjectStore::apply_schema_changes(transaction(), version, m_schema, m_schema_version, m_config.schema_mode,
required_changes, m_config.automatically_handle_backlinks_in_migrations,
wrapper, save_schema_version_on_version_decrease);
wrapper);
}
else {
ObjectStore::apply_schema_changes(transaction(), m_schema_version, schema, version, m_config.schema_mode,
required_changes, m_config.automatically_handle_backlinks_in_migrations,
nullptr, save_schema_version_on_version_decrease);
required_changes, m_config.automatically_handle_backlinks_in_migrations);
REALM_ASSERT_DEBUG(additive ||
(required_changes = ObjectStore::schema_from_group(read_group()).compare(schema)).empty());
}
Expand Down
52 changes: 7 additions & 45 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <realm/sync/subscriptions.hpp>
#include <realm/sync/noinst/sync_schema_migration.hpp>
#include <realm/object-store/impl/realm_coordinator.hpp>
#include <realm/object-store/sync/sync_manager.hpp>
#include <realm/object-store/sync/sync_session.hpp>
#include <realm/object-store/thread_safe_reference.hpp>

Expand Down Expand Up @@ -192,20 +191,10 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
return;
}

// Sync schema migrations require setting a subscription initializer callback to bootstrap the data. The
// subscriptions in the current realm file may not be compatible with the new schema so cannot rely on them.
auto config = coordinator->get_config();
if (!config.sync_config->subscription_initializer) {
status = Status(ErrorCodes::SyncSchemaMigrationError,
"Sync schema migrations must provide a subscription initializer callback in the sync config");
async_open_complete(std::move(callback), coordinator, status);
return;
}

// Migrate the schema.
// * First upload the changes at the old schema version
// * Then, delete the realm, reopen it, and bootstrap at new schema version
// The lifetime of the task is extended until bootstrap completes.
// * Then, pause the session, delete all tables, re-initialize the metadata, and finally restart the session.
// The lifetime of the task is extended until the bootstrap completes.
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self,
this](Status status) mutable {
Expand All @@ -220,38 +209,11 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
return;
}

auto future = SyncSession::Internal::pause_async(*session);
// Wait until the SessionWrapper is done using the DBRef.
std::move(future).get_async([callback = std::move(callback), coordinator, self, this](Status status) mutable {
{
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return; // Swallow all events if the task has been cancelled.
}

if (!status.is_ok()) {
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

// Delete the realm file and reopen it.
try {
util::CheckedLockGuard lock(m_mutex);
auto config = coordinator->get_config();
m_session = nullptr;
coordinator->close();
coordinator = nullptr;
util::File::remove(config.path);
coordinator = _impl::RealmCoordinator::get_coordinator(config);
m_session = coordinator->sync_session();
}
catch (...) {
async_open_complete(std::move(callback), coordinator, exception_to_status());
return;
}

auto migration_completed_callback = [callback = std::move(callback), coordinator = std::move(coordinator),
self](Status status) mutable {
self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
});
};
SyncSession::Internal::migrate_schema(*session, std::move(migration_completed_callback));
});
}

Expand All @@ -270,4 +232,4 @@ void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
}
}

} // namespace realm
} // namespace realm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing newline at end of file

71 changes: 62 additions & 9 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr<DB> db, co
, m_migration_store{sync::MigrationStore::create(m_db)}
, m_client(client)
, m_sync_manager(sync_manager)
, m_previous_schema_version(_impl::sync_schema_migration::has_pending_migration(*m_db->start_read()))
{
REALM_ASSERT(m_config.sync_config);
// we don't want the following configs enabled during a client reset
Expand Down Expand Up @@ -657,12 +656,12 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
revive_if_needed();
}

util::Future<void> SyncSession::Internal::pause_async(SyncSession& session)
util::Future<void> SyncSession::pause_async()
{
{
util::CheckedUniqueLock lock(session.m_state_mutex);
util::CheckedUniqueLock lock(m_state_mutex);
// Nothing to wait for if the session is already paused or inactive.
if (session.m_state == SyncSession::State::Paused || session.m_state == SyncSession::State::Inactive) {
if (m_state == SyncSession::State::Paused || m_state == SyncSession::State::Inactive) {
return util::Future<void>::make_ready();
}
}
Expand All @@ -671,8 +670,8 @@ util::Future<void> SyncSession::Internal::pause_async(SyncSession& session)
// must have been destroyed upon return. This allows the caller to follow up with a call to
// sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works
// so long as this SyncSession object remains in the `paused` state after the invocation of shutdown().
session.pause();
return session.m_client.notify_session_terminated();
pause();
return m_client.notify_session_terminated();
}

void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
Expand All @@ -682,7 +681,7 @@ void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::Sessi

util::Future<void> SyncSession::OnlyForTesting::pause_async(SyncSession& session)
{
return SyncSession::Internal::pause_async(session);
return session.pause_async();
}

// This method should only be called from within the error handler callback registered upon the underlying
Expand Down Expand Up @@ -1418,10 +1417,10 @@ void SyncSession::update_subscription_store(bool flx_sync_requested, std::option
// waiters
auto subscription_store = std::move(m_flx_subscription_store);
lock.unlock();
subscription_store->terminate();
auto tr = m_db->start_write();
subscription_store->reset(*tr);
history.set_write_validator_factory(nullptr);
tr->rollback();
tr->commit();
}
return;
}
Expand Down Expand Up @@ -1688,3 +1687,57 @@ util::Future<std::string> SyncSession::send_test_command(std::string body)

return m_session->send_test_command(std::move(body));
}

void SyncSession::migrate_schema(util::UniqueFunction<void(Status)>&& callback)
{
util::CheckedUniqueLock lock(m_state_mutex);
// If the schema migration is already in progress, just wait to complete.
if (m_schema_migration_in_progress) {
add_completion_callback(std::move(callback), ProgressDirection::download);
return;
}
m_schema_migration_in_progress = true;

// Perform the migration:
// 1. Pause the sync session
// 2. Once the sync client releases the realm file:
// a. Delete all public tables
// b. Reset the subscription store
// c. Clear the pending boostrap store
// d. Empty the sync history and adjust cursors
// e. Reset file ident (the server flags the old ident as in the case of a client reset)
// 3. Resume the session (the client asks for a new file ident)
// See `sync_schema_migration::perform_schema_migration` for more details.

CompletionCallbacks callbacks;
std::swap(m_completion_callbacks, callbacks);
auto guard = util::make_scope_exit([&]() noexcept {
util::CheckedUniqueLock lock(m_state_mutex);
if (m_completion_callbacks.empty())
std::swap(callbacks, m_completion_callbacks);
else
m_completion_callbacks.merge(std::move(callbacks));
});
m_state_mutex.unlock(lock);

auto future = pause_async();
std::move(future).get_async(
[callback = std::move(callback), weak_session = weak_from_this()](Status status) mutable {
if (!status.is_ok())
return callback(status);

auto session = weak_session.lock();
if (!session) {
status = Status(ErrorCodes::InvalidSession, "Sync session was destroyed during schema migration");
return callback(status);
}
sync_schema_migration::perform_schema_migration(session->m_db, *session->get_flx_subscription_store());
{
util::CheckedUniqueLock lock(session->m_state_mutex);
session->m_previous_schema_version.reset();
session->m_schema_migration_in_progress = false;
}
session->wait_for_download_completion(std::move(callback));
session->resume();
});
}
10 changes: 9 additions & 1 deletion src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,10 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.m_db;
}

static util::Future<void> pause_async(SyncSession& session);
static void migrate_schema(SyncSession& session, util::UniqueFunction<void(Status)>&& callback)
{
session.migrate_schema(std::move(callback));
}
};

// Expose some internal functionality to testing code.
Expand Down Expand Up @@ -410,6 +413,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
REQUIRES(!m_connection_state_mutex);
void become_paused(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_waiting_for_access_token() REQUIRES(m_state_mutex);
util::Future<void> pause_async() REQUIRES(!m_state_mutex, !m_connection_state_mutex);

// do restart session restarts the session without freeing any of the waiters
void do_restart_session(util::CheckedUniqueLock)
Expand All @@ -431,6 +435,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

util::Future<std::string> send_test_command(std::string body) REQUIRES(!m_state_mutex);

void migrate_schema(util::UniqueFunction<void(Status)>&& callback)
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);

std::function<TransactionCallback> m_sync_transact_callback GUARDED_BY(m_state_mutex);

template <typename Field>
Expand Down Expand Up @@ -497,6 +504,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

// Set if ProtocolError::schema_version_changed error is received from the server.
std::optional<uint64_t> m_previous_schema_version GUARDED_BY(m_state_mutex);
bool m_schema_migration_in_progress GUARDED_BY(m_state_mutex) = false;
};

} // namespace realm
Expand Down
8 changes: 4 additions & 4 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

namespace realm::sync {

void ClientHistory::set_client_reset_adjustments(
void ClientHistory::set_history_adjustments(
util::Logger& logger, version_type current_version, SaltedFileIdent client_file_ident,
SaltedVersion server_version, const std::vector<_impl::client_reset::RecoveredChange>& recovered_changesets)
{
Expand All @@ -53,7 +53,7 @@ void ClientHistory::set_client_reset_adjustments(
size_t uploadable_bytes = 0;
if (recovered_changesets.empty()) {
// Either we had nothing to upload or we're discarding the unsynced changes
logger.debug("Client reset adjustments: discarding %1 history entries", sync_history_size());
logger.debug("History adjustments: discarding %1 history entries", sync_history_size());
do_trim_sync_history(sync_history_size()); // Throws
}
else {
Expand All @@ -66,7 +66,7 @@ void ClientHistory::set_client_reset_adjustments(
do_trim_sync_history(discard_count);

if (logger.would_log(util::Logger::Level::debug)) {
logger.debug("Client reset adjustments: trimming %1 history entries and updating the remaining history "
logger.debug("History adjustments: trimming %1 history entries and updating the remaining history "
"entries (%2)",
discard_count, sync_history_size());
for (size_t i = 0, size = m_arrays->changesets.size(); i < size; ++i) {
Expand Down Expand Up @@ -94,7 +94,7 @@ void ClientHistory::set_client_reset_adjustments(
m_arrays->changesets.get(i).size(), server_version.version);
}
}
logger.debug("New uploadable bytes after client reset adjustment: %1", uploadable_bytes);
logger.debug("New uploadable bytes after history adjustment: %1", uploadable_bytes);

// Client progress versions are set to 0 to signal to the server that we've
// reset our versioning. If we send the actual values, the server would
Expand Down
Loading
Loading