Skip to content

Commit

Permalink
Add a beam pipeline to expand recurring billing event (#1881)
Browse files Browse the repository at this point in the history
This will replace the ExpandRecurringBillingEventsAction, which has a
couple of issues:

1) The action starts with too many Recurrings that are later filtered out
   because their expanded OneTimes are not actually in scope. This is due
   to the Recurrings not recording its latest expanded event time, and
   therefore many Recurrings that are not yet due for renewal get included
   in the initial query.

2) The action works in sequence, which exacerbated the issue in 1) and
   makes it very slow to run if the window of operation is wider than
   one day, which in turn makes it impossible to run any catch-up
   expansions with any significant gap to fill.

3) The action only expands the recurrence when the billing times because
   due, but most of its logic works on event time, which is 45 days
   before billing time, making the code hard to reason about and
   error-prone.  This has led to b/258822640 where a premature
   optimization intended to fix 1) caused some autorenwals to not be
   expanded correctly when subsequent manual renews within the autorenew
   grace period closed the original recurrece.

As a result, the new pipeline addresses the above issues in the
following way:

1) Update the recurrenceLastExpansion field on the Recurring when a new
   expansion occurs, and narrow down the Recurrings in scope for
   expansion by only looking for the ones that have not been expanded for
   more than a year.

2) Make it a Beam pipeline so expansions can happen in parallel. The
   Recurrings are grouped into batches in order to not overwhelm the
   database with writes for each expansion.

3) Create new expansions when the event time, as opposed to billing
   time, is within the operation window. This streamlines the logic and
   makes it clearer and easier to reason about. This also aligns with
   how other (cancelllable) operations for which there are accompanying
   grace periods are handled, when the corresponding data is always
   speculatively created at event time. Lastly, doing this negates the
   need to check if the expansion has finished running before generating
   the monthly invoices, because the billing events are now created not
   just-in-time, but 45 days in advance.

Note that this PR only adds the pipeline. It does not switch the default
behavior to using the pipeline, which is still done by
ExpandRecurringBillingEventsAction. We will first use this pipeline to
generate missing billing events and domain histories caused by
b/258822640. This also allows us to test it in production, as it
backfills data that will not affect ongoing invoice generation. If
anything goes wrong, we can always delete the generated billing events
and domain histories, based on the unique "reason" in them.

This pipeline can only run after we switch to use SQL sequence based ID
allocation, introduced in #1831.
  • Loading branch information
jianglai authored Jan 9, 2023
1 parent 60cbebd commit 9dab1e8
Show file tree
Hide file tree
Showing 29 changed files with 1,194 additions and 27 deletions.
7 changes: 6 additions & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,14 @@ if (environment == 'alpha') {
],
invoicing :
[
mainClass: 'google.registry.beam.invoicing.InvoicingPipeline',
mainClass: 'google.registry.beam.billing.InvoicingPipeline',
metaData : 'google/registry/beam/invoicing_pipeline_metadata.json'
],
expandBilling :
[
mainClass: 'google.registry.beam.billing.ExpandRecurringBillingEventsPipeline',
metaData : 'google/registry/beam/expand_recurring_billing_events_pipeline_metadata.json'
],
rde :
[
mainClass: 'google.registry.beam.rde.RdePipeline',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.beam.invoicing;
package google.registry.beam.billing;

import com.google.auto.value.AutoValue;
import com.google.common.base.Joiner;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.beam.billing;

import google.registry.beam.common.RegistryPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;

public interface ExpandRecurringBillingEventsPipelineOptions extends RegistryPipelineOptions {
@Description(
"The inclusive lower bound of on the range of event times that will be expanded, in ISO 8601"
+ " format")
String getStartTime();

void setStartTime(String startTime);

@Description(
"The exclusive upper bound of on the range of event times that will be expanded, in ISO 8601"
+ " format")
String getEndTime();

void setEndTime(String endTime);

@Description("If true, the expanded billing events and history entries will not be saved.")
@Default.Boolean(false)
boolean getIsDryRun();

void setIsDryRun(boolean isDryRun);

@Description(
"If true, set the RECURRING_BILLING global cursor to endTime after saving all expanded"
+ " billing events and history entries.")
@Default.Boolean(true)
boolean getAdvanceCursor();

void setAdvanceCursor(boolean advanceCursor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.beam.invoicing;
package google.registry.beam.billing;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;

import com.google.common.flogger.FluentLogger;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.model.billing.BillingEvent.Flag;
import google.registry.model.billing.BillingEvent.OneTime;
import google.registry.model.registrar.Registrar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.beam.invoicing;
package google.registry.beam.billing;

import google.registry.beam.common.RegistryPipelineOptions;
import org.apache.beam.sdk.options.Description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static <R, T> Read<R, T> read(
}

/**
* Returns a {@link Read} connector based on the given {@code jpql} query string.
* Returns a {@link Read} connector based on the given native or {@code jpql} query string.
*
* <p>User should take care to prevent sql-injection attacks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import javax.annotation.Nullable;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.TemporalType;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaQuery;
import org.joda.time.DateTime;

/** Interface for query instances used by {@link RegistryJpaIO.Read}. */
public interface RegistryQuery<T> extends Serializable {
Expand Down Expand Up @@ -57,7 +59,14 @@ static <T> RegistryQuery<T> createQuery(
Query query =
nativeQuery ? entityManager.createNativeQuery(sql) : entityManager.createQuery(sql);
if (parameters != null) {
parameters.forEach(query::setParameter);
parameters.forEach(
(key, value) -> {
if (value instanceof DateTime) {
query.setParameter(key, ((DateTime) value).toDate(), TemporalType.TIMESTAMP);
} else {
query.setParameter(key, value);
}
});
}
JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE);
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
* Flex Templates</a>
*/
@SuppressWarnings("ALL")
@Singleton
public class RdePipeline implements Serializable {

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/google/registry/config/RegistryConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ public static int provideInitialWorkerCount(RegistryConfigSettings config) {
/**
* Returns the default job region to run Apache Beam (Cloud Dataflow) jobs in.
*
* @see google.registry.beam.invoicing.InvoicingPipeline
* @see google.registry.beam.billing.InvoicingPipeline
* @see google.registry.beam.spec11.Spec11Pipeline
* @see google.registry.beam.invoicing.InvoicingPipeline
* @see google.registry.beam.billing.InvoicingPipeline
*/
@Provides
@Config("defaultJobRegion")
Expand Down Expand Up @@ -655,7 +655,7 @@ public static String provideBillingBucket(@Config("projectId") String projectId)
/**
* Returns the URL of the GCS bucket we store invoices and detail reports in.
*
* @see google.registry.beam.invoicing.InvoicingPipeline
* @see google.registry.beam.billing.InvoicingPipeline
*/
@Provides
@Config("billingBucketUrl")
Expand Down Expand Up @@ -691,7 +691,7 @@ public static ImmutableList<InternetAddress> provideInvoiceEmailRecipients(
/**
* Returns the file prefix for the invoice CSV file.
*
* @see google.registry.beam.invoicing.InvoicingPipeline
* @see google.registry.beam.billing.InvoicingPipeline
* @see google.registry.reporting.billing.BillingEmailUtils
*/
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@
<cron>
<url><![CDATA[/_dr/cron/fanout?queue=retryable-cron-tasks&endpoint=/_dr/task/generateInvoices?shouldPublish=true&runInEmpty]]></url>
<description>
Starts the beam/invoicing/InvoicingPipeline Dataflow template, which creates the overall invoice and
Starts the beam/billing/InvoicingPipeline Dataflow template, which creates the overall invoice and
detail report CSVs for last month, storing them in gs://[PROJECT-ID]-billing/invoices/yyyy-MM.
Upon success, sends an e-mail copy of the invoice to billing personnel, and copies detail
reports to the associated registrars' drive folders.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ static ImmutableSet<ForeignKeyedDesignatedContact> loadForeignKeyedDesignatedCon
* hasn't been reported yet and b) matches the predicate 3. Return the transactionRecords under
* the most recent HistoryEntry that fits the above criteria, with negated reportAmounts.
*/
static ImmutableSet<DomainTransactionRecord> createCancelingRecords(
public static ImmutableSet<DomainTransactionRecord> createCancelingRecords(
Domain domain,
final DateTime now,
Duration maxSearchPeriod,
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/google/registry/model/billing/BillingEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public OneTime build() {
@Index(columnList = "eventTime"),
@Index(columnList = "domainRepoId"),
@Index(columnList = "recurrenceEndTime"),
@Index(columnList = "recurrenceLastExpansion"),
@Index(columnList = "recurrence_time_of_year")
})
@AttributeOverride(name = "id", column = @Column(name = "billing_recurrence_id"))
Expand All @@ -481,6 +482,16 @@ public static class Recurring extends BillingEvent {
*/
DateTime recurrenceEndTime;

/**
* The most recent {@link DateTime} when this recurrence was expanded.
*
* <p>We only bother checking recurrences for potential expansion if this is at least one year
* in the past. If it's more recent than that, it means that the recurrence was already expanded
* too recently to need to be checked again (as domains autorenew each year).
*/
@Column(nullable = false)
DateTime recurrenceLastExpansion;

/**
* The eventTime recurs every year on this [month, day, time] between {@link #eventTime} and
* {@link #recurrenceEndTime}, inclusive of the start but not of the end.
Expand Down Expand Up @@ -519,6 +530,10 @@ public DateTime getRecurrenceEndTime() {
return recurrenceEndTime;
}

public DateTime getRecurrenceLastExpansion() {
return recurrenceLastExpansion;
}

public TimeOfYear getRecurrenceTimeOfYear() {
return recurrenceTimeOfYear;
}
Expand Down Expand Up @@ -559,6 +574,11 @@ public Builder setRecurrenceEndTime(DateTime recurrenceEndTime) {
return this;
}

public Builder setRecurrenceLastExpansion(DateTime recurrenceLastExpansion) {
getInstance().recurrenceLastExpansion = recurrenceLastExpansion;
return this;
}

public Builder setRenewalPriceBehavior(RenewalPriceBehavior renewalPriceBehavior) {
getInstance().renewalPriceBehavior = renewalPriceBehavior;
return this;
Expand All @@ -574,6 +594,12 @@ public Recurring build() {
Recurring instance = getInstance();
checkNotNull(instance.eventTime);
checkNotNull(instance.reason);
// Don't require recurrenceLastExpansion to be individually set on every new Recurrence.
// The correct default value if not otherwise set is the event time of the recurrence minus
// 1 year.
instance.recurrenceLastExpansion =
Optional.ofNullable(instance.recurrenceLastExpansion)
.orElse(instance.eventTime.minusYears(1));
checkArgument(
instance.renewalPriceBehavior == RenewalPriceBehavior.SPECIFIED
^ instance.renewalPrice == null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private static class TransactionInfo {
DateTime transactionTime;

// The set of entity objects that have been either persisted (via insert()) or merged (via
// put()/update()). If the entity manager returns these as a result of a find() or query
// put()/update()). If the entity manager returns these as a result of a find() or query
// operation, we can not detach them -- detaching removes them from the transaction and causes
// them to not be saved to the database -- so we throw an exception instead.
Set<Object> objectsToSave = Collections.newSetFromMap(new IdentityHashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* Invokes the {@code InvoicingPipeline} beam template via the REST api, and enqueues the {@link
* PublishInvoicesAction} to publish the subsequent output.
*
* <p>This action runs the {@link google.registry.beam.invoicing.InvoicingPipeline} beam flex
* <p>This action runs the {@link google.registry.beam.billing.InvoicingPipeline} beam flex
* template. The pipeline then generates invoices for the month and stores them on GCS.
*/
@Action(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.joda.time.YearMonth;

/**
* Uploads the results of the {@link google.registry.beam.invoicing.InvoicingPipeline}.
* Uploads the results of the {@link google.registry.beam.billing.InvoicingPipeline}.
*
* <p>This relies on the retry semantics in {@code queue.xml} to ensure proper upload, in spite of
* fluctuations in generation timing.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"name": "Expand Recurring Billings Events for Implicit Auto-Renewals",
"description": "An Apache Beam batch pipeline that finds all auto-renewals that have implicitly occurred between the given window and creates the corresponding billing events and hisotry entries.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "startTime",
"label": "The inclusive lower bound of the operation window.",
"helpText": "The inclusive lower bound of the operation window, in ISO 8601 format.",
"is_optional": false
},
{
"name": "endTime",
"label": "The exclusive upper bound of the operation window.",
"helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.",
"is_optional": false
},
{
"name": "shard",
"label": "The exclusive upper bound of the operation window.",
"helpText": "The exclusive upper bound of the operation window, in ISO 8601 format.",
"is_optional": true
},
{
"name": "isDryRun",
"label": "Whether this job is a dry run.",
"helpText": "If true, no changes will be saved to the database.",
"is_optional": true,
"regexes": [
"^true|false$"
]
},
{
"name": "advanceCursor",
"label": "Whether the BILLING_TIME global cursor should be advanced.",
"helpText": "If true, after all expansions are persisted, the cursor will be changed from startTime to endTime.",
"is_optional": true,
"regexes": [
"^true|false$"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.beam.invoicing;
package google.registry.beam.billing;

import static com.google.common.truth.Truth.assertThat;

import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey;
import google.registry.beam.billing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Loading

0 comments on commit 9dab1e8

Please sign in to comment.