Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(user): paginate users by id as a cursor when deleting stale ones #617

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/api/src/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ program
program
.command("top-up-deployments")
.description("Refill deployments with auto top up enabled")
.option("-c, --concurrency <number>", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-c, --concurrency <number>", "How many wallets are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-d, --dry-run", "Dry run the top up deployments", false)
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
Expand All @@ -54,7 +54,7 @@ program
program
.command("cleanup-provider-deployments")
.description("Close trial deployments for a provider")
.option("-c, --concurrency <number>", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-c, --concurrency <number>", "How many wallets are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-d, --dry-run", "Dry run the trial provider cleanup", false)
.option("-p, --provider <string>", "Provider address", value => z.string().parse(value))
.action(async (options, command) => {
Expand All @@ -67,10 +67,11 @@ const userConfig = container.resolve(UserConfigService);
program
.command("cleanup-stale-anonymous-users")
.description(`Remove users that have been inactive for ${userConfig.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS")} days`)
.option("-c, --concurrency <number>", "How many users are processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-d, --dry-run", "Dry run the clean up stale anonymous users", false)
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
await container.resolve(UserController).cleanUpStaleAnonymousUsers({ dryRun: options.dryRun });
await container.resolve(UserController).cleanUpStaleAnonymousUsers(options);
});
});

Expand Down
25 changes: 22 additions & 3 deletions apps/api/src/user/repositories/user/user.repository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import subDays from "date-fns/subDays";
import { and, eq, isNull, lte, sql } from "drizzle-orm";
import { and, desc, eq, isNull, lt, lte, sql } from "drizzle-orm";
import first from "lodash/first";
import last from "lodash/last";
import { singleton } from "tsyringe";

import { ApiPgDatabase, ApiPgTables, InjectPg, InjectPgTable } from "@src/core/providers";
Expand Down Expand Up @@ -43,7 +44,25 @@ export class UserRepository extends BaseRepository<ApiPgTables["Users"], UserInp
.where(eq(this.table.id, id));
}

async paginateStaleAnonymousUsers({ inactivityInDays, ...params }: { inactivityInDays: number; limit?: number }, cb: (page: UserOutput[]) => Promise<void>) {
await this.paginateRaw({ where: and(isNull(this.table.userId), lte(this.table.lastActiveAt, subDays(new Date(), inactivityInDays))), ...params }, cb);
async paginateStaleAnonymousUsers(
{ inactivityInDays, limit = 100 }: { inactivityInDays: number; limit?: number },
cb: (page: UserOutput[]) => Promise<void>
) {
let lastId: string | undefined;

do {
const clauses = [isNull(this.table.userId), lte(this.table.lastActiveAt, subDays(new Date(), inactivityInDays))];

if (lastId) {
clauses.push(lt(this.table.id, lastId));
}

const items = this.toOutputList(await this.cursor.query.Users.findMany({ where: and(...clauses), limit, orderBy: [desc(this.table.id)] }));
lastId = last(items)?.id;

if (items.length) {
await cb(items);
}
} while (lastId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { LoggerService } from "@akashnetwork/logging";
import { PromisePool } from "@supercharge/promise-pool";
import difference from "lodash/difference";
import { singleton } from "tsyringe";

Expand All @@ -12,7 +11,9 @@ import { UserRepository } from "@src/user/repositories";
import { StaleAnonymousUsersCleanerSummarizer } from "@src/user/services/stale-anonymous-users-cleaner-summarizer/stale-anonymous-users-cleaner-summarizer.service";
import { UserConfigService } from "@src/user/services/user-config/user-config.service";

export interface StaleAnonymousUsersCleanerOptions extends DryRunOptions {}
export interface StaleAnonymousUsersCleanerOptions extends DryRunOptions {
concurrency?: number;
}

@singleton()
export class StaleAnonymousUsersCleanerService {
Expand All @@ -30,38 +31,45 @@ export class StaleAnonymousUsersCleanerService {
) {}

async cleanUpStaleAnonymousUsers(options: StaleAnonymousUsersCleanerOptions) {
const concurrency = options.concurrency || this.CONCURRENCY;
const summary = new StaleAnonymousUsersCleanerSummarizer();
await this.userRepository.paginateStaleAnonymousUsers(
{ inactivityInDays: this.config.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS"), limit: this.CONCURRENCY },
{ inactivityInDays: this.config.get("STALE_ANONYMOUS_USERS_LIVE_IN_DAYS"), limit: concurrency },
async users => {
const userIds = users.map(user => user.id);
const wallets = await this.userWalletRepository.findByUserId(users.map(user => user.id));
const { errors } = await PromisePool.withConcurrency(this.CONCURRENCY)
.for(wallets)
.process(async wallet => {
const wallets = await this.userWalletRepository.findByUserId(userIds);
const userIdsWithWallets: string[] = [];

const revokeAll = wallets.map(async wallet => {
userIdsWithWallets.push(wallet.userId);
try {
const result = await this.managedUserWalletService.revokeAll(wallet.address, "USER_INACTIVITY", options);
if (result.feeAllowance) {
summary.inc("feeAllowanceRevokeCount");
}
if (result.deploymentGrant) {
summary.inc("deploymentGrantRevokeCount");
}
});
const erroredUserIds = errors.map(({ item }) => item.userId);
const userIdsToRemove = difference(userIds, erroredUserIds);
return wallet.userId;
} catch (error) {
summary.inc("revokeErrorCount", 1);
this.logger.debug({ event: "STALE_ANONYMOUS_USERS_REVOKE_ERROR", error });
this.sentry.captureEvent(this.sentryEventService.toEvent(error));
}
});
const userIdsToRemove = (await Promise.all(revokeAll)).filter(Boolean);
const usersWithoutWallets = difference(userIds, userIdsWithWallets);
userIdsToRemove.push(...usersWithoutWallets);

if (errors.length) {
summary.inc("revokeErrorCount", errors.length);
this.logger.debug({ event: "STALE_ANONYMOUS_USERS_REVOKE_ERROR", errors });
this.sentry.captureEvent(this.sentryEventService.toEvent(errors));
if (!userIdsToRemove.length) {
return;
}

if (userIdsToRemove.length) {
if (!options.dryRun) {
await this.userRepository.deleteById(userIdsToRemove);
}
summary.inc("usersDroppedCount", userIdsToRemove.length);
if (!options.dryRun) {
await this.userRepository.deleteById(userIdsToRemove);
}

summary.inc("usersDroppedCount", userIdsToRemove.length);
}
);

Expand Down
28 changes: 16 additions & 12 deletions apps/api/test/functional/stale-anonymous-users-cleanup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { UserRepository } from "@src/user/repositories";
import { DbTestingService } from "@test/services/db-testing.service";
import { WalletTestingService } from "@test/services/wallet-testing.service";

jest.setTimeout(50000);
jest.setTimeout(100000);

describe("Users", () => {
const dbService = container.resolve(DbTestingService);
Expand All @@ -33,18 +33,18 @@ describe("Users", () => {

describe("stale anonymous users cleanup", () => {
it("should remove anonymous users inactive for defined period", async () => {
const [stale, reactivated, recent, invalidAddress, staleNoWallet, recentNoWallet] = await Promise.all([
walletService.createUserAndWallet(),
const [reactivated, recent, invalidAddress, staleNoWallet, recentNoWallet, ...staleUsers] = await Promise.all([
walletService.createUserAndWallet(),
walletService.createUserAndWallet(),
walletService.createUserAndWallet(),
walletService.createUser(),
walletService.createUser()
walletService.createUser(),
...Array.from({ length: 10 }).map(() => walletService.createUserAndWallet())
]);

const staleParams = { lastActiveAt: subDays(new Date(), 91) };
await Promise.all([
userRepository.updateById(stale.user.id, staleParams),
...staleUsers.map(user => userRepository.updateById(user.user.id, staleParams)),
userRepository.updateById(staleNoWallet.user.id, staleParams),
userRepository.updateById(reactivated.user.id, staleParams),
userRepository.updateById(invalidAddress.user.id, staleParams),
Expand All @@ -54,7 +54,7 @@ describe("Users", () => {
const reactivate = walletService.getWalletByUserId(reactivated.user.id, reactivated.token);
await reactivate;

await controller.cleanUpStaleAnonymousUsers({ dryRun: false });
await controller.cleanUpStaleAnonymousUsers({ dryRun: false, concurrency: 4 });

const [users, wallets] = await Promise.all([userRepository.find(), userWalletRepository.find()]);

Expand All @@ -71,14 +71,18 @@ describe("Users", () => {
);

await Promise.all([
expect(authzHttpService.hasValidFeeAllowance(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasValidDepositDeploymentGrant(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasFeeAllowance(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasDepositDeploymentGrant(recent.wallet.address, masterAddress)).resolves.toBeFalsy(),

expect(authzHttpService.hasValidFeeAllowance(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasValidDepositDeploymentGrant(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(),

expect(authzHttpService.hasValidFeeAllowance(stale.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasValidDepositDeploymentGrant(stale.wallet.address, masterAddress)).resolves.toBeFalsy()
expect(authzHttpService.hasDepositDeploymentGrant(reactivated.wallet.address, masterAddress)).resolves.toBeFalsy(),

...staleUsers
.map(user => [
expect(authzHttpService.hasFeeAllowance(user.wallet.address, masterAddress)).resolves.toBeFalsy(),
expect(authzHttpService.hasDepositDeploymentGrant(user.wallet.address, masterAddress)).resolves.toBeFalsy()
])
.flat()
]);
});
});
Expand Down
Loading