Skip to content

Commit

Permalink
Delete TaskQueueUtils (#1908)
Browse files Browse the repository at this point in the history
For push queues, use CloudTasksUtils. Pull queues for now directly calls
the GAE task queue APIs. The usage of pull queues will be soon replaced.
  • Loading branch information
jianglai authored Jan 19, 2023
1 parent 925c9ba commit 630ae1f
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
</queue>

<!-- Queue for tasks that communicate with TMCH MarksDB webserver. -->
<!-- TODO(b/17623181): Delete this once the queue implementation is live and working. -->
<queue>
<name>marksdb</name>
<rate>1/m</rate>
Expand Down
49 changes: 32 additions & 17 deletions core/src/main/java/google/registry/tmch/NordnUploadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package google.registry.tmch;

import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.net.HttpHeaders.LOCATION;
Expand All @@ -31,25 +30,28 @@
import com.google.appengine.api.taskqueue.LeaseOptions;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.TaskHandle;
import com.google.appengine.api.taskqueue.TaskOptions;
import com.google.appengine.api.taskqueue.TransientFailureException;
import com.google.apphosting.api.DeadlineExceededException;
import com.google.cloud.tasks.v2.Task;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.flogger.FluentLogger;
import google.registry.config.RegistryConfig.Config;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.UrlConnectionService;
import google.registry.request.UrlConnectionUtils;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.CloudTasksUtils;
import google.registry.util.Retrier;
import google.registry.util.TaskQueueUtils;
import google.registry.util.UrlConnectionException;
import java.io.IOException;
import java.net.HttpURLConnection;
Expand Down Expand Up @@ -81,6 +83,7 @@ public final class NordnUploadAction implements Runnable {
static final String PATH = "/_dr/task/nordnUpload";
static final String LORDN_PHASE_PARAM = "lordn-phase";

private static final int QUEUE_BATCH_SIZE = 1000;
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final Duration LEASE_PERIOD = Duration.standardHours(1);

Expand All @@ -100,14 +103,17 @@ public final class NordnUploadAction implements Runnable {
@Inject @Config("tmchMarksdbUrl") String tmchMarksdbUrl;
@Inject @Parameter(LORDN_PHASE_PARAM) String phase;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject TaskQueueUtils taskQueueUtils;

@Inject CloudTasksUtils cloudTasksUtils;

@Inject NordnUploadAction() {}

/**
* These LORDN parameter names correspond to the relative paths in LORDN URLs and cannot be
* changed on our end.
*/
private static final String PARAM_LORDN_PHASE_SUNRISE = "sunrise";

private static final String PARAM_LORDN_PHASE_CLAIMS = "claims";

/** How long to wait before attempting to verify an upload by fetching the log. */
Expand All @@ -127,7 +133,7 @@ public void run() {
* delimited String.
*/
static String convertTasksToCsv(List<TaskHandle> tasks, DateTime now, String columns) {
// Use a Set for deduping purposes so we can be idempotent in case tasks happened to be
// Use a Set for deduping purposes, so we can be idempotent in case tasks happened to be
// enqueued multiple times for a given domain create.
ImmutableSortedSet.Builder<String> builder =
new ImmutableSortedSet.Builder<>(Ordering.natural());
Expand All @@ -152,7 +158,7 @@ List<TaskHandle> loadAllTasks(Queue queue, String tld) {
queue.leaseTasks(
LeaseOptions.Builder.withTag(tld)
.leasePeriod(LEASE_PERIOD.getMillis(), TimeUnit.MILLISECONDS)
.countLimit(TaskQueueUtils.getBatchSize())),
.countLimit(QUEUE_BATCH_SIZE)),
TransientFailureException.class,
DeadlineExceededException.class);
if (tasks.isEmpty()) {
Expand All @@ -163,9 +169,10 @@ List<TaskHandle> loadAllTasks(Queue queue, String tld) {
}

private void processLordnTasks() throws IOException, GeneralSecurityException {
checkArgument(phase.equals(PARAM_LORDN_PHASE_SUNRISE)
|| phase.equals(PARAM_LORDN_PHASE_CLAIMS),
"Invalid phase specified to Nordn servlet: %s.", phase);
checkArgument(
phase.equals(PARAM_LORDN_PHASE_SUNRISE) || phase.equals(PARAM_LORDN_PHASE_CLAIMS),
"Invalid phase specified to Nordn servlet: %s.",
phase);
DateTime now = clock.nowUtc();
Queue queue =
getQueue(
Expand All @@ -182,7 +189,11 @@ private void processLordnTasks() throws IOException, GeneralSecurityException {
if (!tasks.isEmpty()) {
String csvData = convertTasksToCsv(tasks, now, columns);
uploadCsvToLordn(String.format("/LORDN/%s/%s", tld, phase), csvData);
taskQueueUtils.deleteTasks(queue, tasks);
Lists.partition(tasks, QUEUE_BATCH_SIZE)
.forEach(
batch ->
retrier.callWithRetry(
() -> queue.deleteTask(batch), TransientFailureException.class));
}
}

Expand Down Expand Up @@ -231,18 +242,22 @@ private void uploadCsvToLordn(String urlPath, String csvData)
actionLogId),
connection);
}
getQueue(NordnVerifyAction.QUEUE).add(makeVerifyTask(new URL(location)));
cloudTasksUtils.enqueue(NordnVerifyAction.QUEUE, makeVerifyTask(new URL(location)));
} catch (IOException e) {
throw new IOException(String.format("Error connecting to MarksDB at URL %s", url), e);
}
}

private TaskOptions makeVerifyTask(URL url) {
private Task makeVerifyTask(URL url) {
// The actionLogId is used to uniquely associate the verify task back to the upload task.
return withUrl(NordnVerifyAction.PATH)
.header(NordnVerifyAction.URL_HEADER, url.toString())
.header(NordnVerifyAction.HEADER_ACTION_LOG_ID, actionLogId)
.param(RequestParameters.PARAM_TLD, tld)
.countdownMillis(VERIFY_DELAY.getMillis());
return cloudTasksUtils.createPostTaskWithDelay(
NordnVerifyAction.PATH,
Service.BACKEND.toString(),
ImmutableMultimap.<String, String>builder()
.put(NordnVerifyAction.NORDN_URL_PARAM, url.toString())
.put(NordnVerifyAction.NORDN_LOG_ID_PARAM, actionLogId)
.put(RequestParameters.PARAM_TLD, tld)
.build(),
Duration.millis(VERIFY_DELAY.getMillis()));
}
}
23 changes: 16 additions & 7 deletions core/src/main/java/google/registry/tmch/NordnVerifyAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.flogger.FluentLogger;
import com.google.common.io.ByteSource;
import google.registry.request.Action;
import google.registry.request.Header;
import google.registry.request.HttpException.ConflictException;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
Expand Down Expand Up @@ -60,19 +59,29 @@ public final class NordnVerifyAction implements Runnable {

static final String PATH = "/_dr/task/nordnVerify";
static final String QUEUE = "marksdb";
static final String URL_HEADER = "X-DomainRegistry-Nordn-Url";
static final String HEADER_ACTION_LOG_ID = "X-DomainRegistry-ActionLogId";
static final String NORDN_URL_PARAM = "nordnUrl";
static final String NORDN_LOG_ID_PARAM = "nordnLogId";

private static final FluentLogger logger = FluentLogger.forEnclosingClass();

@Inject LordnRequestInitializer lordnRequestInitializer;
@Inject Response response;
@Inject UrlConnectionService urlConnectionService;

@Inject @Header(URL_HEADER) URL url;
@Inject @Header(HEADER_ACTION_LOG_ID) String actionLogId;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject NordnVerifyAction() {}
@Inject
@Parameter(NORDN_URL_PARAM)
URL url;

@Inject
@Parameter(NORDN_LOG_ID_PARAM)
String actionLogId;

@Inject
@Parameter(RequestParameters.PARAM_TLD)
String tld;

@Inject
NordnVerifyAction() {}

@Override
public void run() {
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/java/google/registry/tmch/TmchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

import static com.google.common.io.Resources.asByteSource;
import static com.google.common.io.Resources.getResource;
import static google.registry.request.RequestParameters.extractRequiredHeader;
import static google.registry.request.RequestParameters.extractRequiredParameter;

import dagger.Module;
import dagger.Provides;
import google.registry.keyring.api.KeyModule.Key;
import google.registry.request.Header;
import google.registry.request.HttpException.BadRequestException;
import google.registry.request.Parameter;
import java.net.MalformedURLException;
Expand All @@ -34,8 +32,10 @@
@Module
public final class TmchModule {

private static final PGPPublicKey MARKSDB_PUBLIC_KEY = TmchData
.loadPublicKey(asByteSource(getResource(TmchModule.class, "marksdb-public-key.asc")));
private static final PGPPublicKey MARKSDB_PUBLIC_KEY =
TmchData.loadPublicKey(asByteSource(getResource(TmchModule.class, "marksdb-public-key.asc")));

private TmchModule() {}

@Provides
@Key("marksdbPublicKey")
Expand All @@ -50,18 +50,18 @@ static String provideLordnPhase(HttpServletRequest req) {
}

@Provides
@Header(NordnVerifyAction.URL_HEADER)
static URL provideUrl(HttpServletRequest req) {
@Parameter(NordnVerifyAction.NORDN_URL_PARAM)
static URL provideNordnUrl(HttpServletRequest req) {
try {
return new URL(extractRequiredHeader(req, NordnVerifyAction.URL_HEADER));
return new URL(extractRequiredParameter(req, NordnVerifyAction.NORDN_URL_PARAM));
} catch (MalformedURLException e) {
throw new BadRequestException("Bad URL: " + NordnVerifyAction.URL_HEADER);
throw new BadRequestException("Bad URL: " + NordnVerifyAction.NORDN_URL_PARAM);
}
}

@Provides
@Header(NordnVerifyAction.HEADER_ACTION_LOG_ID)
static String provideActionLogId(HttpServletRequest req) {
return extractRequiredHeader(req, NordnVerifyAction.HEADER_ACTION_LOG_ID);
@Parameter(NordnVerifyAction.NORDN_LOG_ID_PARAM)
static String provideNordnLogId(HttpServletRequest req) {
return extractRequiredParameter(req, NordnVerifyAction.NORDN_LOG_ID_PARAM);
}
}
18 changes: 10 additions & 8 deletions core/src/test/java/google/registry/tmch/NordnUploadActionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static google.registry.testing.DatabaseHelper.loadRegistrar;
import static google.registry.testing.DatabaseHelper.persistDomainAndEnqueueLordn;
import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static java.nio.charset.StandardCharsets.UTF_8;
import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
Expand Down Expand Up @@ -51,14 +50,15 @@
import google.registry.model.tld.Registry;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.DatabaseHelper;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper;
import google.registry.testing.FakeUrlConnectionService;
import google.registry.testing.TaskQueueExtension;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import google.registry.util.CloudTasksUtils;
import google.registry.util.Retrier;
import google.registry.util.TaskQueueUtils;
import google.registry.util.UrlConnectionException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -90,6 +90,8 @@ class NordnUploadActionTest {
private static final String LOCATION_URL = "http://trololol";

private final FakeClock clock = new FakeClock(DateTime.parse("2010-05-01T10:11:12Z"));
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(clock);
private final CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();

@RegisterExtension
final JpaIntegrationTestExtension jpa =
Expand Down Expand Up @@ -117,10 +119,10 @@ void beforeEach() throws Exception {
createTld("tld");
persistResource(Registry.get("tld").asBuilder().setLordnUsername("lolcat").build());
action.clock = clock;
action.cloudTasksUtils = cloudTasksUtils;
action.urlConnectionService = urlConnectionService;
action.lordnRequestInitializer = lordnRequestInitializer;
action.phase = "claims";
action.taskQueueUtils = new TaskQueueUtils(new Retrier(new FakeSleeper(clock), 3));
action.tld = "tld";
action.tmchMarksdbUrl = "http://127.0.0.1";
action.random = new SecureRandom();
Expand Down Expand Up @@ -235,11 +237,11 @@ void testRun_claimsMode_payloadMatchesClaimsCsv() {
void testRun_claimsMode_verifyTaskGetsEnqueuedWithClaimsCsv() {
persistClaimsModeDomain();
action.run();
assertTasksEnqueued(
cloudTasksHelper.assertTasksEnqueued(
NordnVerifyAction.QUEUE,
new TaskMatcher()
.url(NordnVerifyAction.PATH)
.header(NordnVerifyAction.URL_HEADER, LOCATION_URL)
.param(NordnVerifyAction.NORDN_URL_PARAM, LOCATION_URL)
.header(CONTENT_TYPE, FORM_DATA.toString()));
}

Expand All @@ -263,11 +265,11 @@ void test_noResponseContent_stillWorksNormally() throws Exception {
void testRun_sunriseMode_verifyTaskGetsEnqueuedWithSunriseCsv() {
persistSunriseModeDomain();
action.run();
assertTasksEnqueued(
cloudTasksHelper.assertTasksEnqueued(
NordnVerifyAction.QUEUE,
new TaskMatcher()
.url(NordnVerifyAction.PATH)
.header(NordnVerifyAction.URL_HEADER, LOCATION_URL)
.param(NordnVerifyAction.NORDN_URL_PARAM, LOCATION_URL)
.header(CONTENT_TYPE, FORM_DATA.toString()));
}

Expand Down
Loading

0 comments on commit 630ae1f

Please sign in to comment.