Skip to content

Commit

Permalink
fix(user): paginate users by id as a cursor when deleting stale ones
Browse files Browse the repository at this point in the history
  • Loading branch information
ygrishajev committed Jan 3, 2025
1 parent 42d5873 commit a7fba51
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 37 deletions.
7 changes: 4 additions & 3 deletions apps/api/src/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ program
program
.command("top-up-deployments")
.description("Refill deployments with auto top up enabled")
.option("-c, --concurrency <number>", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-c, --concurrency <number>", "How many wallets are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-d, --dry-run", "Dry run the top up deployments", false)
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
Expand All @@ -54,7 +54,7 @@ program
program
.command("cleanup-provider-deployments")
.description("Close trial deployments for a provider")
.option("-c, --concurrency <number>", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-c, --concurrency <number>", "How many wallets are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-d, --dry-run", "Dry run the trial provider cleanup", false)
.option("-p, --provider <string>", "Provider address", value => z.string().parse(value))
.action(async (options, command) => {
Expand All @@ -67,10 +67,11 @@ const userConfig = container.resolve(UserConfigService);
program
.command("cleanup-stale-anonymous-users")
.description(`Remove users that have been inactive for ${userConfig.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS")} days`)
.option("-c, --concurrency <number>", "How many users are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-d, --dry-run", "Dry run the clean up stale anonymous users", false)
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
await container.resolve(UserController).cleanUpStaleAnonymousUsers({ dryRun: options.dryRun });
await container.resolve(UserController).cleanUpStaleAnonymousUsers(options);
});
});

Expand Down
25 changes: 22 additions & 3 deletions apps/api/src/user/repositories/user/user.repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import subDays from "date-fns/subDays";
import { and, eq, isNull, lte, sql } from "drizzle-orm";
import { and, desc, eq, isNull, lt, lte, sql } from "drizzle-orm";
import first from "lodash/first";
import last from "lodash/last";
import { singleton } from "tsyringe";

import { ApiPgDatabase, ApiPgTables, InjectPg, InjectPgTable } from "@src/core/providers";
Expand Down Expand Up @@ -43,7 +44,25 @@ export class UserRepository extends BaseRepository<ApiPgTables["Users"], UserInp
.where(eq(this.table.id, id));
}

async paginateStaleAnonymousUsers({ inactivityInDays, ...params }: { inactivityInDays: number; limit?: number }, cb: (page: UserOutput[]) => Promise<void>) {
await this.paginateRaw({ where: and(isNull(this.table.userId), lte(this.table.lastActiveAt, subDays(new Date(), inactivityInDays))), ...params }, cb);
async paginateStaleAnonymousUsers(
{ inactivityInDays, limit = 100 }: { inactivityInDays: number; limit?: number },
cb: (page: UserOutput[]) => Promise<void>
) {
let lastId: string | undefined;

do {
const clauses = [isNull(this.table.userId), lte(this.table.lastActiveAt, subDays(new Date(), inactivityInDays))];

if (lastId) {
clauses.push(lt(this.table.id, lastId));
}

const items = this.toOutputList(await this.cursor.query.Users.findMany({ where: and(...clauses), limit, orderBy: [desc(this.table.id)] }));
lastId = last(items)?.id;

if (items.length) {
await cb(items);
}
} while (lastId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { LoggerService } from "@akashnetwork/logging";
import { PromisePool } from "@supercharge/promise-pool";
import difference from "lodash/difference";
import { singleton } from "tsyringe";

Expand All @@ -12,7 +11,9 @@ import { UserRepository } from "@src/user/repositories";
import { StaleAnonymousUsersCleanerSummarizer } from "@src/user/services/stale-anonymous-users-cleaner-summarizer/stale-anonymous-users-cleaner-summarizer.service";
import { UserConfigService } from "@src/user/services/user-config/user-config.service";

export interface StaleAnonymousUsersCleanerOptions extends DryRunOptions {}
export interface StaleAnonymousUsersCleanerOptions extends DryRunOptions {
concurrency?: number;
}

@singleton()
export class StaleAnonymousUsersCleanerService {
Expand All @@ -30,38 +31,45 @@ export class StaleAnonymousUsersCleanerService {
) {}

async cleanUpStaleAnonymousUsers(options: StaleAnonymousUsersCleanerOptions) {
const concurrency = options.concurrency || this.CONCURRENCY;
const summary = new StaleAnonymousUsersCleanerSummarizer();
await this.userRepository.paginateStaleAnonymousUsers(
{ inactivityInDays: this.config.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS"), limit: this.CONCURRENCY },
{ inactivityInDays: this.config.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS"), limit: concurrency },
async users => {
const userIds = users.map(user => user.id);
const wallets = await this.userWalletRepository.findByUserId(users.map(user => user.id));
const { errors } = await PromisePool.withConcurrency(this.CONCURRENCY)
.for(wallets)
.process(async wallet => {
const wallets = await this.userWalletRepository.findByUserId(userIds);
const userIdsWithWallets: string[] = [];

const revokeAll = wallets.map(async wallet => {
userIdsWithWallets.push(wallet.userId);
try {
const result = await this.managedUserWalletService.revokeAll(wallet.address, "USER_INACTIVITY", options);
if (result.feeAllowance) {
summary.inc("feeAllowanceRevokeCount");
}
if (result.deploymentGrant) {
summary.inc("deploymentGrantRevokeCount");
}
});
const erroredUserIds = errors.map(({ item }) => item.userId);
const userIdsToRemove = difference(userIds, erroredUserIds);
return wallet.userId;
} catch (error) {
summary.inc("revokeErrorCount", 1);
this.logger.debug({ event: "STALE_ANONYMOUS_USERS_REVOKE_ERROR", error });
this.sentry.captureEvent(this.sentryEventService.toEvent(error));
}
});
const userIdsToRemove = (await Promise.all(revokeAll)).filter(Boolean);
const usersWithoutWallets = difference(userIds, userIdsWithWallets);
userIdsToRemove.push(...usersWithoutWallets);

if (errors.length) {
summary.inc("revokeErrorCount", errors.length);
this.logger.debug({ event: "STALE_ANONYMOUS_USERS_REVOKE_ERROR", errors });
this.sentry.captureEvent(this.sentryEventService.toEvent(errors));
if (!userIdsToRemove.length) {
return;
}

if (userIdsToRemove.length) {
if (!options.dryRun) {
await this.userRepository.deleteById(userIdsToRemove);
}
summary.inc("usersDroppedCount", userIdsToRemove.length);
if (!options.dryRun) {
await this.userRepository.deleteById(userIdsToRemove);
}

summary.inc("usersDroppedCount", userIdsToRemove.length);
}
);

Expand Down
28 changes: 16 additions & 12 deletions apps/api/test/functional/stale-anonymous-users-cleanup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { UserRepository } from "@src/user/repositories";
import { DbTestingService } from "@test/services/db-testing.service";
import { WalletTestingService } from "@test/services/wallet-testing.service";

jest.setTimeout(50000);
jest.setTimeout(100000);

describe("Users", () => {
const dbService = container.resolve(DbTestingService);
Expand All @@ -33,18 +33,18 @@ describe("Users", () => {

describe("stale anonymous users cleanup", () => {
it("should remove anonymous users inactive for defined period", async () => {
const [stale, reactivated, recent, invalidAddress, staleNoWallet, recentNoWallet] = await Promise.all([
walletService.createUserAndWallet(),
const [reactivated, recent, invalidAddress, staleNoWallet, recentNoWallet, ...staleUsers] = await Promise.all([
walletService.createUserAndWallet(),
walletService.createUserAndWallet(),
walletService.createUserAndWallet(),
walletService.createUser(),
walletService.createUser()
walletService.createUser(),
...Array.from({ length: 10 }).map(() => walletService.createUserAndWallet())
]);

const staleParams = { lastActiveAt: subDays(new Date(), 91) };
await Promise.all([
userRepository.updateById(stale.user.id, staleParams),
...staleUsers.map(user => userRepository.updateById(user.user.id, staleParams)),
userRepository.updateById(staleNoWallet.user.id, staleParams),
userRepository.updateById(reactivated.user.id, staleParams),
userRepository.updateById(invalidAddress.user.id, staleParams),
Expand All @@ -54,7 +54,7 @@ describe("Users", () => {
const reactivate = walletService.getWalletByUserId(reactivated.user.id, reactivated.token);
await reactivate;

await controller.cleanUpStaleAnonymousUsers({ dryRun: false });
await controller.cleanUpStaleAnonymousUsers({ dryRun: false, concurrency: 4 });

const [users, wallets] = await Promise.all([userRepository.find(), userWalletRepository.find()]);

Expand All @@ -71,14 +71,18 @@ describe("Users", () => {
);

await Promise.all([
expect(authzHttpService.hasValidFeeAllowance(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasValidDepositDeploymentGrant(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasFeeAllowance(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasDepositDeploymentGrant(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),

expect(authzHttpService.hasValidFeeAllowance(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasValidDepositDeploymentGrant(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(),

expect(authzHttpService.hasValidFeeAllowance(stale.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasValidDepositDeploymentGrant(stale.wallet.address, masterAddress)).resolves.toBeFalsy()
expect(authzHttpService.hasDepositDeploymentGrant(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(),

...staleUsers
.map(user => [
expect(authzHttpService.hasFeeAllowance(user.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasDepositDeploymentGrant(user.wallet.address, masterAddress)).resolves.toBeFalsy()
])
.flat()
]);
});
});
Expand Down

0 comments on commit a7fba51

Please sign in to comment.