diff --git a/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml b/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml index 6700c2420c3..80349713d36 100644 --- a/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml +++ b/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml @@ -65,7 +65,6 @@ - marksdb 1/m diff --git a/core/src/main/java/google/registry/tmch/NordnUploadAction.java b/core/src/main/java/google/registry/tmch/NordnUploadAction.java index 32fa98793c4..994e94d8731 100644 --- a/core/src/main/java/google/registry/tmch/NordnUploadAction.java +++ b/core/src/main/java/google/registry/tmch/NordnUploadAction.java @@ -15,7 +15,6 @@ package google.registry.tmch; import static com.google.appengine.api.taskqueue.QueueFactory.getQueue; -import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.net.HttpHeaders.LOCATION; @@ -31,25 +30,28 @@ import com.google.appengine.api.taskqueue.LeaseOptions; import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; import com.google.appengine.api.taskqueue.TransientFailureException; import com.google.apphosting.api.DeadlineExceededException; +import com.google.cloud.tasks.v2.Task; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.flogger.FluentLogger; import google.registry.config.RegistryConfig.Config; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.Parameter; import google.registry.request.RequestParameters; import google.registry.request.UrlConnectionService; import google.registry.request.UrlConnectionUtils; import google.registry.request.auth.Auth; import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; import google.registry.util.Retrier; -import google.registry.util.TaskQueueUtils; import google.registry.util.UrlConnectionException; import java.io.IOException; import java.net.HttpURLConnection; @@ -81,6 +83,7 @@ public final class NordnUploadAction implements Runnable { static final String PATH = "/_dr/task/nordnUpload"; static final String LORDN_PHASE_PARAM = "lordn-phase"; + private static final int QUEUE_BATCH_SIZE = 1000; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final Duration LEASE_PERIOD = Duration.standardHours(1); @@ -100,7 +103,9 @@ public final class NordnUploadAction implements Runnable { @Inject @Config("tmchMarksdbUrl") String tmchMarksdbUrl; @Inject @Parameter(LORDN_PHASE_PARAM) String phase; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; - @Inject TaskQueueUtils taskQueueUtils; + + @Inject CloudTasksUtils cloudTasksUtils; + @Inject NordnUploadAction() {} /** @@ -108,6 +113,7 @@ public final class NordnUploadAction implements Runnable { * changed on our end. */ private static final String PARAM_LORDN_PHASE_SUNRISE = "sunrise"; + private static final String PARAM_LORDN_PHASE_CLAIMS = "claims"; /** How long to wait before attempting to verify an upload by fetching the log. */ @@ -127,7 +133,7 @@ public void run() { * delimited String. */ static String convertTasksToCsv(List tasks, DateTime now, String columns) { - // Use a Set for deduping purposes so we can be idempotent in case tasks happened to be + // Use a Set for deduping purposes, so we can be idempotent in case tasks happened to be // enqueued multiple times for a given domain create. ImmutableSortedSet.Builder builder = new ImmutableSortedSet.Builder<>(Ordering.natural()); @@ -152,7 +158,7 @@ List loadAllTasks(Queue queue, String tld) { queue.leaseTasks( LeaseOptions.Builder.withTag(tld) .leasePeriod(LEASE_PERIOD.getMillis(), TimeUnit.MILLISECONDS) - .countLimit(TaskQueueUtils.getBatchSize())), + .countLimit(QUEUE_BATCH_SIZE)), TransientFailureException.class, DeadlineExceededException.class); if (tasks.isEmpty()) { @@ -163,9 +169,10 @@ List loadAllTasks(Queue queue, String tld) { } private void processLordnTasks() throws IOException, GeneralSecurityException { - checkArgument(phase.equals(PARAM_LORDN_PHASE_SUNRISE) - || phase.equals(PARAM_LORDN_PHASE_CLAIMS), - "Invalid phase specified to Nordn servlet: %s.", phase); + checkArgument( + phase.equals(PARAM_LORDN_PHASE_SUNRISE) || phase.equals(PARAM_LORDN_PHASE_CLAIMS), + "Invalid phase specified to Nordn servlet: %s.", + phase); DateTime now = clock.nowUtc(); Queue queue = getQueue( @@ -182,7 +189,11 @@ private void processLordnTasks() throws IOException, GeneralSecurityException { if (!tasks.isEmpty()) { String csvData = convertTasksToCsv(tasks, now, columns); uploadCsvToLordn(String.format("/LORDN/%s/%s", tld, phase), csvData); - taskQueueUtils.deleteTasks(queue, tasks); + Lists.partition(tasks, QUEUE_BATCH_SIZE) + .forEach( + batch -> + retrier.callWithRetry( + () -> queue.deleteTask(batch), TransientFailureException.class)); } } @@ -231,18 +242,22 @@ private void uploadCsvToLordn(String urlPath, String csvData) actionLogId), connection); } - getQueue(NordnVerifyAction.QUEUE).add(makeVerifyTask(new URL(location))); + cloudTasksUtils.enqueue(NordnVerifyAction.QUEUE, makeVerifyTask(new URL(location))); } catch (IOException e) { throw new IOException(String.format("Error connecting to MarksDB at URL %s", url), e); } } - private TaskOptions makeVerifyTask(URL url) { + private Task makeVerifyTask(URL url) { // The actionLogId is used to uniquely associate the verify task back to the upload task. - return withUrl(NordnVerifyAction.PATH) - .header(NordnVerifyAction.URL_HEADER, url.toString()) - .header(NordnVerifyAction.HEADER_ACTION_LOG_ID, actionLogId) - .param(RequestParameters.PARAM_TLD, tld) - .countdownMillis(VERIFY_DELAY.getMillis()); + return cloudTasksUtils.createPostTaskWithDelay( + NordnVerifyAction.PATH, + Service.BACKEND.toString(), + ImmutableMultimap.builder() + .put(NordnVerifyAction.NORDN_URL_PARAM, url.toString()) + .put(NordnVerifyAction.NORDN_LOG_ID_PARAM, actionLogId) + .put(RequestParameters.PARAM_TLD, tld) + .build(), + Duration.millis(VERIFY_DELAY.getMillis())); } } diff --git a/core/src/main/java/google/registry/tmch/NordnVerifyAction.java b/core/src/main/java/google/registry/tmch/NordnVerifyAction.java index a958d2e09a3..5257637acfb 100644 --- a/core/src/main/java/google/registry/tmch/NordnVerifyAction.java +++ b/core/src/main/java/google/registry/tmch/NordnVerifyAction.java @@ -23,7 +23,6 @@ import com.google.common.flogger.FluentLogger; import com.google.common.io.ByteSource; import google.registry.request.Action; -import google.registry.request.Header; import google.registry.request.HttpException.ConflictException; import google.registry.request.Parameter; import google.registry.request.RequestParameters; @@ -60,8 +59,8 @@ public final class NordnVerifyAction implements Runnable { static final String PATH = "/_dr/task/nordnVerify"; static final String QUEUE = "marksdb"; - static final String URL_HEADER = "X-DomainRegistry-Nordn-Url"; - static final String HEADER_ACTION_LOG_ID = "X-DomainRegistry-ActionLogId"; + static final String NORDN_URL_PARAM = "nordnUrl"; + static final String NORDN_LOG_ID_PARAM = "nordnLogId"; private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -69,10 +68,20 @@ public final class NordnVerifyAction implements Runnable { @Inject Response response; @Inject UrlConnectionService urlConnectionService; - @Inject @Header(URL_HEADER) URL url; - @Inject @Header(HEADER_ACTION_LOG_ID) String actionLogId; - @Inject @Parameter(RequestParameters.PARAM_TLD) String tld; - @Inject NordnVerifyAction() {} + @Inject + @Parameter(NORDN_URL_PARAM) + URL url; + + @Inject + @Parameter(NORDN_LOG_ID_PARAM) + String actionLogId; + + @Inject + @Parameter(RequestParameters.PARAM_TLD) + String tld; + + @Inject + NordnVerifyAction() {} @Override public void run() { diff --git a/core/src/main/java/google/registry/tmch/TmchModule.java b/core/src/main/java/google/registry/tmch/TmchModule.java index daab90ca9b8..ba2c106b988 100644 --- a/core/src/main/java/google/registry/tmch/TmchModule.java +++ b/core/src/main/java/google/registry/tmch/TmchModule.java @@ -16,13 +16,11 @@ import static com.google.common.io.Resources.asByteSource; import static com.google.common.io.Resources.getResource; -import static google.registry.request.RequestParameters.extractRequiredHeader; import static google.registry.request.RequestParameters.extractRequiredParameter; import dagger.Module; import dagger.Provides; import google.registry.keyring.api.KeyModule.Key; -import google.registry.request.Header; import google.registry.request.HttpException.BadRequestException; import google.registry.request.Parameter; import java.net.MalformedURLException; @@ -34,8 +32,10 @@ @Module public final class TmchModule { - private static final PGPPublicKey MARKSDB_PUBLIC_KEY = TmchData - .loadPublicKey(asByteSource(getResource(TmchModule.class, "marksdb-public-key.asc"))); + private static final PGPPublicKey MARKSDB_PUBLIC_KEY = + TmchData.loadPublicKey(asByteSource(getResource(TmchModule.class, "marksdb-public-key.asc"))); + + private TmchModule() {} @Provides @Key("marksdbPublicKey") @@ -50,18 +50,18 @@ static String provideLordnPhase(HttpServletRequest req) { } @Provides - @Header(NordnVerifyAction.URL_HEADER) - static URL provideUrl(HttpServletRequest req) { + @Parameter(NordnVerifyAction.NORDN_URL_PARAM) + static URL provideNordnUrl(HttpServletRequest req) { try { - return new URL(extractRequiredHeader(req, NordnVerifyAction.URL_HEADER)); + return new URL(extractRequiredParameter(req, NordnVerifyAction.NORDN_URL_PARAM)); } catch (MalformedURLException e) { - throw new BadRequestException("Bad URL: " + NordnVerifyAction.URL_HEADER); + throw new BadRequestException("Bad URL: " + NordnVerifyAction.NORDN_URL_PARAM); } } @Provides - @Header(NordnVerifyAction.HEADER_ACTION_LOG_ID) - static String provideActionLogId(HttpServletRequest req) { - return extractRequiredHeader(req, NordnVerifyAction.HEADER_ACTION_LOG_ID); + @Parameter(NordnVerifyAction.NORDN_LOG_ID_PARAM) + static String provideNordnLogId(HttpServletRequest req) { + return extractRequiredParameter(req, NordnVerifyAction.NORDN_LOG_ID_PARAM); } } diff --git a/core/src/test/java/google/registry/tmch/NordnUploadActionTest.java b/core/src/test/java/google/registry/tmch/NordnUploadActionTest.java index b3f1b74d791..a3e15035430 100644 --- a/core/src/test/java/google/registry/tmch/NordnUploadActionTest.java +++ b/core/src/test/java/google/registry/tmch/NordnUploadActionTest.java @@ -23,7 +23,6 @@ import static google.registry.testing.DatabaseHelper.loadRegistrar; import static google.registry.testing.DatabaseHelper.persistDomainAndEnqueueLordn; import static google.registry.testing.DatabaseHelper.persistResource; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static java.nio.charset.StandardCharsets.UTF_8; import static javax.servlet.http.HttpServletResponse.SC_ACCEPTED; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; @@ -51,14 +50,15 @@ import google.registry.model.tld.Registry; import google.registry.persistence.transaction.JpaTestExtensions; import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.testing.DatabaseHelper; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; import google.registry.testing.FakeUrlConnectionService; import google.registry.testing.TaskQueueExtension; -import google.registry.testing.TaskQueueHelper.TaskMatcher; +import google.registry.util.CloudTasksUtils; import google.registry.util.Retrier; -import google.registry.util.TaskQueueUtils; import google.registry.util.UrlConnectionException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -90,6 +90,8 @@ class NordnUploadActionTest { private static final String LOCATION_URL = "http://trololol"; private final FakeClock clock = new FakeClock(DateTime.parse("2010-05-01T10:11:12Z")); + private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(clock); + private final CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); @RegisterExtension final JpaIntegrationTestExtension jpa = @@ -117,10 +119,10 @@ void beforeEach() throws Exception { createTld("tld"); persistResource(Registry.get("tld").asBuilder().setLordnUsername("lolcat").build()); action.clock = clock; + action.cloudTasksUtils = cloudTasksUtils; action.urlConnectionService = urlConnectionService; action.lordnRequestInitializer = lordnRequestInitializer; action.phase = "claims"; - action.taskQueueUtils = new TaskQueueUtils(new Retrier(new FakeSleeper(clock), 3)); action.tld = "tld"; action.tmchMarksdbUrl = "http://127.0.0.1"; action.random = new SecureRandom(); @@ -235,11 +237,11 @@ void testRun_claimsMode_payloadMatchesClaimsCsv() { void testRun_claimsMode_verifyTaskGetsEnqueuedWithClaimsCsv() { persistClaimsModeDomain(); action.run(); - assertTasksEnqueued( + cloudTasksHelper.assertTasksEnqueued( NordnVerifyAction.QUEUE, new TaskMatcher() .url(NordnVerifyAction.PATH) - .header(NordnVerifyAction.URL_HEADER, LOCATION_URL) + .param(NordnVerifyAction.NORDN_URL_PARAM, LOCATION_URL) .header(CONTENT_TYPE, FORM_DATA.toString())); } @@ -263,11 +265,11 @@ void test_noResponseContent_stillWorksNormally() throws Exception { void testRun_sunriseMode_verifyTaskGetsEnqueuedWithSunriseCsv() { persistSunriseModeDomain(); action.run(); - assertTasksEnqueued( + cloudTasksHelper.assertTasksEnqueued( NordnVerifyAction.QUEUE, new TaskMatcher() .url(NordnVerifyAction.PATH) - .header(NordnVerifyAction.URL_HEADER, LOCATION_URL) + .param(NordnVerifyAction.NORDN_URL_PARAM, LOCATION_URL) .header(CONTENT_TYPE, FORM_DATA.toString())); } diff --git a/core/src/test/java/google/registry/util/TaskQueueUtilsTest.java b/core/src/test/java/google/registry/util/TaskQueueUtilsTest.java deleted file mode 100644 index 1d0a009b471..00000000000 --- a/core/src/test/java/google/registry/util/TaskQueueUtilsTest.java +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2018 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.util; - -import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; -import static com.google.common.truth.Truth.assertThat; -import static google.registry.testing.TaskQueueHelper.getQueueInfo; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.appengine.api.taskqueue.Queue; -import com.google.appengine.api.taskqueue.QueueFactory; -import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TransientFailureException; -import com.google.common.collect.ImmutableList; -import google.registry.testing.FakeClock; -import google.registry.testing.FakeSleeper; -import google.registry.testing.TaskQueueExtension; -import org.joda.time.DateTime; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -/** Unit tests for {@link TaskQueueUtils}. */ -public final class TaskQueueUtilsTest { - - private static final int MAX_RETRIES = 3; - - @RegisterExtension final TaskQueueExtension taskQueue = new TaskQueueExtension(); - - private int origBatchSize; - - private final FakeClock clock = new FakeClock(DateTime.parse("2000-01-01TZ")); - private final FakeSleeper sleeper = new FakeSleeper(clock); - private final TaskQueueUtils taskQueueUtils = - new TaskQueueUtils(new Retrier(sleeper, MAX_RETRIES)); - private final Queue queue = mock(Queue.class); - private final TaskOptions task = withUrl("url").taskName("name"); - private final TaskHandle handle = new TaskHandle(task, "handle"); - - @BeforeEach - void beforeEach() { - origBatchSize = TaskQueueUtils.BATCH_SIZE; - TaskQueueUtils.BATCH_SIZE = 2; - } - - @AfterEach - void afterEach() { - TaskQueueUtils.BATCH_SIZE = origBatchSize; - } - - @Test - void testEnqueue_worksOnFirstTry_doesntSleep() { - when(queue.add(ImmutableList.of(task))).thenReturn(ImmutableList.of(handle)); - assertThat(taskQueueUtils.enqueue(queue, task)).isSameInstanceAs(handle); - verify(queue).add(ImmutableList.of(task)); - assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01TZ")); - } - - @Test - void testEnqueue_twoTransientErrorsThenSuccess_stillWorksAfterSleeping() { - when(queue.add(ImmutableList.of(task))) - .thenThrow(new TransientFailureException("")) - .thenThrow(new TransientFailureException("")) - .thenReturn(ImmutableList.of(handle)); - assertThat(taskQueueUtils.enqueue(queue, task)).isSameInstanceAs(handle); - verify(queue, times(3)).add(ImmutableList.of(task)); - assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01T00:00:00.6Z")); // 200 + 400ms - } - - @Test - void testEnqueue_multiple() { - TaskOptions taskA = withUrl("a").taskName("a"); - TaskOptions taskB = withUrl("b").taskName("b"); - ImmutableList handles = - ImmutableList.of(new TaskHandle(taskA, "a"), new TaskHandle(taskB, "b")); - when(queue.add(ImmutableList.of(taskA, taskB))).thenReturn(handles); - assertThat(taskQueueUtils.enqueue(queue, ImmutableList.of(taskA, taskB))) - .isSameInstanceAs(handles); - assertThat(clock.nowUtc()).isEqualTo(DateTime.parse("2000-01-01TZ")); - } - - @Test - void testEnqueue_maxRetries_givesUp() { - when(queue.add(ImmutableList.of(task))) - .thenThrow(new TransientFailureException("one")) - .thenThrow(new TransientFailureException("two")) - .thenThrow(new TransientFailureException("three")) - .thenThrow(new TransientFailureException("four")); - TransientFailureException thrown = - assertThrows(TransientFailureException.class, () -> taskQueueUtils.enqueue(queue, task)); - assertThat(thrown).hasMessageThat().contains("three"); - } - - @Test - void testEnqueue_transientErrorThenInterrupt_throwsTransientError() { - when(queue.add(ImmutableList.of(task))).thenThrow(new TransientFailureException("")); - try { - Thread.currentThread().interrupt(); - assertThrows(TransientFailureException.class, () -> taskQueueUtils.enqueue(queue, task)); - } finally { - Thread.interrupted(); // Clear interrupt state so it doesn't pwn other tests. - } - } - - @Test - void testDeleteTasks_usesMultipleBatches() { - Queue defaultQ = QueueFactory.getQueue("default"); - TaskOptions taskOptA = withUrl("/a").taskName("a"); - TaskOptions taskOptB = withUrl("/b").taskName("b"); - TaskOptions taskOptC = withUrl("/c").taskName("c"); - taskQueueUtils.enqueue(defaultQ, ImmutableList.of(taskOptA, taskOptB, taskOptC)); - assertThat(getQueueInfo("default").getTaskInfo()).hasSize(3); - - taskQueueUtils.deleteTasks( - defaultQ, - ImmutableList.of( - new TaskHandle(taskOptA, "default"), - new TaskHandle(taskOptB, "default"), - new TaskHandle(taskOptC, "default"))); - assertThat(getQueueInfo("default").getTaskInfo()).hasSize(0); - } -} diff --git a/util/src/main/java/google/registry/util/TaskQueueUtils.java b/util/src/main/java/google/registry/util/TaskQueueUtils.java deleted file mode 100644 index 52f1cf90814..00000000000 --- a/util/src/main/java/google/registry/util/TaskQueueUtils.java +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2018 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.util; - -import com.google.appengine.api.taskqueue.Queue; -import com.google.appengine.api.taskqueue.TaskHandle; -import com.google.appengine.api.taskqueue.TaskOptions; -import com.google.appengine.api.taskqueue.TransientFailureException; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.flogger.FluentLogger; -import java.io.Serializable; -import java.util.List; -import javax.inject.Inject; - -/** - * Utilities for dealing with App Engine task queues. - * - *

Use {@link CloudTasksUtils} to interact with push queues (Cloud Task queues). Pull queues will - * be implemented separately in SQL and you can continue using this class for that for now. - */ -@Deprecated -public class TaskQueueUtils implements Serializable { - - private static final long serialVersionUID = 7893211200220508362L; - - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - - private final Retrier retrier; - - @Inject - public TaskQueueUtils(Retrier retrier) { - this.retrier = retrier; - } - - @NonFinalForTesting - @VisibleForTesting - static int BATCH_SIZE = 1000; - - /** - * The batch size to use for App Engine task queue operations. - * - *

Note that 1,000 is currently the maximum allowable batch size in App Engine. - */ - public static int getBatchSize() { - return BATCH_SIZE; - } - - /** - * Adds a task to a App Engine task queue in a reliable manner. - * - *

This is the same as {@link Queue#add(TaskOptions)} except it'll automatically retry with - * exponential backoff if {@link TransientFailureException} is thrown. - * - * @throws TransientFailureException if retrying failed for the maximum period of time, or an - * {@link InterruptedException} told us to stop trying - * @return successfully enqueued task - */ - public TaskHandle enqueue(Queue queue, TaskOptions task) { - return enqueue(queue, ImmutableList.of(task)).get(0); - } - - /** - * Adds tasks to an App Engine task queue in a reliable manner. - * - *

This is the same as {@link Queue#add(Iterable)} except it'll automatically retry with - * exponential backoff if {@link TransientFailureException} is thrown. - * - * @throws TransientFailureException if retrying failed for the maximum period of time, or an - * {@link InterruptedException} told us to stop trying - * @return successfully enqueued tasks - */ - public List enqueue(final Queue queue, final Iterable tasks) { - return retrier.callWithRetry( - () -> { - for (TaskOptions task : tasks) { - logger.atInfo().log( - "Enqueuing queue='%s' endpoint='%s'.", queue.getQueueName(), task.getUrl()); - } - return queue.add(tasks); - }, - TransientFailureException.class); - } - - /** Deletes the specified tasks from the queue in batches, with retrying. */ - public void deleteTasks(Queue queue, List tasks) { - Lists.partition(tasks, BATCH_SIZE) - .stream() - .forEach( - batch -> - retrier.callWithRetry( - () -> queue.deleteTask(batch), TransientFailureException.class)); - } -}