From a7fba51e898db1d6935c6b7fb45862a4b3af874f Mon Sep 17 00:00:00 2001 From: Iaroslav Gryshaiev Date: Fri, 3 Jan 2025 16:24:09 +0100 Subject: [PATCH] fix(user): paginate users by id as a cursor when deleting stale ones --- apps/api/src/console.ts | 7 +-- .../user/repositories/user/user.repository.ts | 25 ++++++++-- .../stale-anonymous-users-cleaner.service.ts | 46 +++++++++++-------- .../stale-anonymous-users-cleanup.spec.ts | 28 ++++++----- 4 files changed, 69 insertions(+), 37 deletions(-) diff --git a/apps/api/src/console.ts b/apps/api/src/console.ts index 98437f963..25a6b27d7 100644 --- a/apps/api/src/console.ts +++ b/apps/api/src/console.ts @@ -33,7 +33,7 @@ program program .command("top-up-deployments") .description("Refill deployments with auto top up enabled") - .option("-c, --concurrency ", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value)) + .option("-c, --concurrency ", "How many wallets are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value)) .option("-d, --dry-run", "Dry run the top up deployments", false) .action(async (options, command) => { await executeCliHandler(command.name(), async () => { @@ -54,7 +54,7 @@ program program .command("cleanup-provider-deployments") .description("Close trial deployments for a provider") - .option("-c, --concurrency ", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value)) + .option("-c, --concurrency ", "How many wallets are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value)) .option("-d, --dry-run", "Dry run the trial provider cleanup", false) .option("-p, --provider ", "Provider address", value => z.string().parse(value)) .action(async (options, command) => { @@ -67,10 +67,11 @@ const userConfig = container.resolve(UserConfigService); program .command("cleanup-stale-anonymous-users") .description(`Remove users that have been inactive for ${userConfig.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS")} days`) + .option("-c, --concurrency ", "How many users are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value)) .option("-d, --dry-run", "Dry run the clean up stale anonymous users", false) .action(async (options, command) => { await executeCliHandler(command.name(), async () => { - await container.resolve(UserController).cleanUpStaleAnonymousUsers({ dryRun: options.dryRun }); + await container.resolve(UserController).cleanUpStaleAnonymousUsers(options); }); }); diff --git a/apps/api/src/user/repositories/user/user.repository.ts b/apps/api/src/user/repositories/user/user.repository.ts index d2bcbb632..5cbd9b2dc 100644 --- a/apps/api/src/user/repositories/user/user.repository.ts +++ b/apps/api/src/user/repositories/user/user.repository.ts @@ -1,6 +1,7 @@ import subDays from "date-fns/subDays"; -import { and, eq, isNull, lte, sql } from "drizzle-orm"; +import { and, desc, eq, isNull, lt, lte, sql } from "drizzle-orm"; import first from "lodash/first"; +import last from "lodash/last"; import { singleton } from "tsyringe"; import { ApiPgDatabase, ApiPgTables, InjectPg, InjectPgTable } from "@src/core/providers"; @@ -43,7 +44,25 @@ export class UserRepository extends BaseRepository Promise) { - await this.paginateRaw({ where: and(isNull(this.table.userId), lte(this.table.lastActiveAt, subDays(new Date(), inactivityInDays))), ...params }, cb); + async paginateStaleAnonymousUsers( + { inactivityInDays, limit = 100 }: { inactivityInDays: number; limit?: number }, + cb: (page: UserOutput[]) => Promise + ) { + let lastId: string | undefined; + + do { + const clauses = [isNull(this.table.userId), lte(this.table.lastActiveAt, subDays(new Date(), inactivityInDays))]; + + if (lastId) { + clauses.push(lt(this.table.id, lastId)); + } + + const items = this.toOutputList(await this.cursor.query.Users.findMany({ where: and(...clauses), limit, orderBy: [desc(this.table.id)] })); + lastId = last(items)?.id; + + if (items.length) { + await cb(items); + } + } while (lastId); } } diff --git a/apps/api/src/user/services/stale-anonymous-users-cleaner/stale-anonymous-users-cleaner.service.ts b/apps/api/src/user/services/stale-anonymous-users-cleaner/stale-anonymous-users-cleaner.service.ts index b8f858f62..e0d675767 100644 --- a/apps/api/src/user/services/stale-anonymous-users-cleaner/stale-anonymous-users-cleaner.service.ts +++ b/apps/api/src/user/services/stale-anonymous-users-cleaner/stale-anonymous-users-cleaner.service.ts @@ -1,5 +1,4 @@ import { LoggerService } from "@akashnetwork/logging"; -import { PromisePool } from "@supercharge/promise-pool"; import difference from "lodash/difference"; import { singleton } from "tsyringe"; @@ -12,7 +11,9 @@ import { UserRepository } from "@src/user/repositories"; import { StaleAnonymousUsersCleanerSummarizer } from "@src/user/services/stale-anonymous-users-cleaner-summarizer/stale-anonymous-users-cleaner-summarizer.service"; import { UserConfigService } from "@src/user/services/user-config/user-config.service"; -export interface StaleAnonymousUsersCleanerOptions extends DryRunOptions {} +export interface StaleAnonymousUsersCleanerOptions extends DryRunOptions { + concurrency?: number; +} @singleton() export class StaleAnonymousUsersCleanerService { @@ -30,15 +31,18 @@ export class StaleAnonymousUsersCleanerService { ) {} async cleanUpStaleAnonymousUsers(options: StaleAnonymousUsersCleanerOptions) { + const concurrency = options.concurrency || this.CONCURRENCY; const summary = new StaleAnonymousUsersCleanerSummarizer(); await this.userRepository.paginateStaleAnonymousUsers( - { inactivityInDays: this.config.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS"), limit: this.CONCURRENCY }, + { inactivityInDays: this.config.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS"), limit: concurrency }, async users => { const userIds = users.map(user => user.id); - const wallets = await this.userWalletRepository.findByUserId(users.map(user => user.id)); - const { errors } = await PromisePool.withConcurrency(this.CONCURRENCY) - .for(wallets) - .process(async wallet => { + const wallets = await this.userWalletRepository.findByUserId(userIds); + const userIdsWithWallets: string[] = []; + + const revokeAll = wallets.map(async wallet => { + userIdsWithWallets.push(wallet.userId); + try { const result = await this.managedUserWalletService.revokeAll(wallet.address, "USER_INACTIVITY", options); if (result.feeAllowance) { summary.inc("feeAllowanceRevokeCount"); @@ -46,22 +50,26 @@ export class StaleAnonymousUsersCleanerService { if (result.deploymentGrant) { summary.inc("deploymentGrantRevokeCount"); } - }); - const erroredUserIds = errors.map(({ item }) => item.userId); - const userIdsToRemove = difference(userIds, erroredUserIds); + return wallet.userId; + } catch (error) { + summary.inc("revokeErrorCount", 1); + this.logger.debug({ event: "STALE_ANONYMOUS_USERS_REVOKE_ERROR", error }); + this.sentry.captureEvent(this.sentryEventService.toEvent(error)); + } + }); + const userIdsToRemove = (await Promise.all(revokeAll)).filter(Boolean); + const usersWithoutWallets = difference(userIds, userIdsWithWallets); + userIdsToRemove.push(...usersWithoutWallets); - if (errors.length) { - summary.inc("revokeErrorCount", errors.length); - this.logger.debug({ event: "STALE_ANONYMOUS_USERS_REVOKE_ERROR", errors }); - this.sentry.captureEvent(this.sentryEventService.toEvent(errors)); + if (!userIdsToRemove.length) { + return; } - if (userIdsToRemove.length) { - if (!options.dryRun) { - await this.userRepository.deleteById(userIdsToRemove); - } - summary.inc("usersDroppedCount", userIdsToRemove.length); + if (!options.dryRun) { + await this.userRepository.deleteById(userIdsToRemove); } + + summary.inc("usersDroppedCount", userIdsToRemove.length); } ); diff --git a/apps/api/test/functional/stale-anonymous-users-cleanup.spec.ts b/apps/api/test/functional/stale-anonymous-users-cleanup.spec.ts index 278107eb3..185d102bc 100644 --- a/apps/api/test/functional/stale-anonymous-users-cleanup.spec.ts +++ b/apps/api/test/functional/stale-anonymous-users-cleanup.spec.ts @@ -11,7 +11,7 @@ import { UserRepository } from "@src/user/repositories"; import { DbTestingService } from "@test/services/db-testing.service"; import { WalletTestingService } from "@test/services/wallet-testing.service"; -jest.setTimeout(50000); +jest.setTimeout(100000); describe("Users", () => { const dbService = container.resolve(DbTestingService); @@ -33,18 +33,18 @@ describe("Users", () => { describe("stale anonymous users cleanup", () => { it("should remove anonymous users inactive for defined period", async () => { - const [stale, reactivated, recent, invalidAddress, staleNoWallet, recentNoWallet] = await Promise.all([ - walletService.createUserAndWallet(), + const [reactivated, recent, invalidAddress, staleNoWallet, recentNoWallet, ...staleUsers] = await Promise.all([ walletService.createUserAndWallet(), walletService.createUserAndWallet(), walletService.createUserAndWallet(), walletService.createUser(), - walletService.createUser() + walletService.createUser(), + ...Array.from({ length: 10 }).map(() => walletService.createUserAndWallet()) ]); const staleParams = { lastActiveAt: subDays(new Date(), 91) }; await Promise.all([ - userRepository.updateById(stale.user.id, staleParams), + ...staleUsers.map(user => userRepository.updateById(user.user.id, staleParams)), userRepository.updateById(staleNoWallet.user.id, staleParams), userRepository.updateById(reactivated.user.id, staleParams), userRepository.updateById(invalidAddress.user.id, staleParams), @@ -54,7 +54,7 @@ describe("Users", () => { const reactivate = walletService.getWalletByUserId(reactivated.user.id, reactivated.token); await reactivate; - await controller.cleanUpStaleAnonymousUsers({ dryRun: false }); + await controller.cleanUpStaleAnonymousUsers({ dryRun: false, concurrency: 4 }); const [users, wallets] = await Promise.all([userRepository.find(), userWalletRepository.find()]); @@ -71,14 +71,18 @@ describe("Users", () => { ); await Promise.all([ - expect(authzHttpService.hasValidFeeAllowance(recent.wallet.address, masterAddress)).resolves.toBeFalsy(), - expect(authzHttpService.hasValidDepositDeploymentGrant(recent.wallet.address, masterAddress)).resolves.toBeFalsy(), + expect(authzHttpService.hasFeeAllowance(recent.wallet.address, masterAddress)).resolves.toBeFalsy(), + expect(authzHttpService.hasDepositDeploymentGrant(recent.wallet.address, masterAddress)).resolves.toBeFalsy(), expect(authzHttpService.hasValidFeeAllowance(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(), - expect(authzHttpService.hasValidDepositDeploymentGrant(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(), - - expect(authzHttpService.hasValidFeeAllowance(stale.wallet.address, masterAddress)).resolves.toBeFalsy(), - expect(authzHttpService.hasValidDepositDeploymentGrant(stale.wallet.address, masterAddress)).resolves.toBeFalsy() + expect(authzHttpService.hasDepositDeploymentGrant(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(), + + ...staleUsers + .map(user => [ + expect(authzHttpService.hasFeeAllowance(user.wallet.address, masterAddress)).resolves.toBeFalsy(), + expect(authzHttpService.hasDepositDeploymentGrant(user.wallet.address, masterAddress)).resolves.toBeFalsy() + ]) + .flat() ]); }); });