Skip to content

Commit

Permalink
Always use SQL based ID allocation (#1899)
Browse files Browse the repository at this point in the history
We've been using it in production for three weeks now. Everything seems
to be working fine. Removing the code related to checking the migration
state and using the override.
  • Loading branch information
jianglai authored Jan 10, 2023
1 parent 9dab1e8 commit 99a3142
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package google.registry.beam.common;

import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import java.util.Objects;
Expand Down Expand Up @@ -57,17 +56,6 @@ public interface RegistryPipelineOptions extends GcpOptions {

void setSqlWriteBatchSize(int sqlWriteBatchSize);

@DeleteAfterMigration
@Description(
"Whether to use self allocated primary IDs when building entities. This should only be used"
+ " when the IDs are not significant and the resulting entities are not persisted back to"
+ " the database. Use with caution as self allocated IDs are not unique across workers,"
+ " and persisting entities with these IDs can be dangerous.")
@Default.Boolean(false)
boolean getUseSelfAllocatedId();

void setUseSelfAllocatedId(boolean useSelfAllocatedId);

static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) {
return DaggerRegistryPipelineComponent.builder()
.isolationOverride(options.getIsolationOverride())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import dagger.Lazy;
import google.registry.config.RegistryEnvironment;
import google.registry.config.SystemPropertySetter;
import google.registry.model.IdService;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
Expand Down Expand Up @@ -63,15 +62,5 @@ public void beforeProcessing(PipelineOptions options) {
}
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");
// Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from
// multiple workers, which can also collide with existing IDs in the database. So they cannot be
// dependent upon for comparison or anything significant. The resulting entities can never be
// persisted back into the database. This is a stop-gap measure that should only be used when
// you need to create Buildables in Beam, but do not have control over how the IDs are
// allocated, and you don't care about the generated IDs as long
// as you can build the entities.
if (registryOptions.getUseSelfAllocatedId()) {
IdService.setForceUseSelfAllocatedId();
}
}
}
7 changes: 0 additions & 7 deletions core/src/main/java/google/registry/beam/rde/RdePipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,6 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
RdePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);

// We need to self allocate the IDs because the pipeline creates EPP resources from history
// entries and projects them to watermark. These buildable entities would otherwise request an
// ID from datastore, which Beam does not have access to. The IDs are not included in the
// deposits or are these entities persisted back to the database, so it is OK to use a self
// allocated ID to get around the limitations of beam.
options.setUseSelfAllocatedId(true);

RegistryPipelineOptions.validateRegistryPipelineOptions(options);
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();
Expand Down
75 changes: 5 additions & 70 deletions core/src/main/java/google/registry/model/IdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,25 @@
//
package google.registry.model;

import static com.google.common.base.Preconditions.checkState;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static org.joda.time.DateTimeZone.UTC;

import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.config.RegistryEnvironment;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.joda.time.DateTime;

/**
* Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity.
*/
public final class IdService {

private static final FluentLogger logger = FluentLogger.forEnclosingClass();

// TODO(ptkach): remove once the Cloud SQL sequence-based method is live in production
private static boolean forceUseSelfAllocateId = false;

public static void setForceUseSelfAllocatedId() {
checkState(
"true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")),
"Can only set ID supplier in a Beam pipeline");
logger.atWarning().log("Using ID supplier override!");
IdService.forceUseSelfAllocateId = true;
}

private static class SelfAllocatedIdSupplier implements Supplier<Long> {

private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier();

/** Counts of used ids for self allocating IDs. */
private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero

private static SelfAllocatedIdSupplier getInstance() {
return INSTANCE;
}
private IdService() {}

@Override
public Long get() {
return nextSelfAllocatedId.getAndIncrement();
}
}
/**
* A SQL Sequence based ID allocator that generates an ID from a monotonically increasing atomic
* {@link long}
* A SQL Sequence based ID allocator that generates an ID from a monotonically increasing {@link
* AtomicLong}
*
* <p>The generated IDs are project-wide unique
* <p>The generated IDs are project-wide unique.
*/
private static Long getSequenceBasedId() {
public static long allocateId() {
return tm().transact(
() ->
(BigInteger)
Expand All @@ -78,32 +41,4 @@ private static Long getSequenceBasedId() {
.getSingleResult())
.longValue();
}

// TODO(ptkach): Remove once all instances switch to sequenceBasedId
/**
* A Datastore based ID allocator that generates an ID from a monotonically increasing atomic
* {@link long}
*
* <p>The generated IDs are project-wide unique
*/
private static Long getDatastoreBasedId() {
return DatastoreServiceFactory.getDatastoreService()
.allocateIds("common", 1)
.iterator()
.next()
.getId();
}

private IdService() {}

public static long allocateId() {
if (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC))
.equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID)
|| RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())) {
return getSequenceBasedId();
} else if (IdService.forceUseSelfAllocateId) {
return SelfAllocatedIdSupplier.getInstance().get();
}
return getDatastoreBasedId();
}
}

0 comments on commit 99a3142

Please sign in to comment.