Skip to content

Commit

Permalink
feat(deployment): clean up trial deployments for a provider
Browse files Browse the repository at this point in the history
  • Loading branch information
baktun14 committed Nov 27, 2024
1 parent ec70124 commit 57ed601
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { LoggerService } from "@akashnetwork/logging";
import { singleton } from "tsyringe";

import { BillingConfig, InjectBillingConfig } from "@src/billing/providers";
import { UserWalletOutput, UserWalletRepository } from "@src/billing/repositories";
import { ManagedUserWalletService, RpcMessageService } from "@src/billing/services";
import { ErrorService } from "@src/core/services/error/error.service";
import { DeploymentRepository } from "@src/deployment/repositories/deployment/deployment.repository";
import { TxSignerService } from "../tx-signer/tx-signer.service";

export interface ProviderCleanupParams {
concurrency: number;
providerAddress: string;
}

@singleton()
export class ProviderCleanupService {
private readonly logger = LoggerService.forContext(ProviderCleanupService.name);

constructor(
@InjectBillingConfig() private readonly config: BillingConfig,
private readonly userWalletRepository: UserWalletRepository,
private readonly managedUserWalletService: ManagedUserWalletService,
private readonly txSignerService: TxSignerService,
private readonly deploymentRepository: DeploymentRepository,
private readonly rpcMessageService: RpcMessageService,
private readonly errorService: ErrorService
) {}

async cleanup(options: ProviderCleanupParams) {
await this.userWalletRepository.paginate({ query: { isTrialing: true }, limit: options.concurrency || 10 }, async wallets => {
const cleanUpAllWallets = wallets.map(async wallet => {
await this.errorService.execWithErrorHandler(
{
wallet,
event: "PROVIDER_CLEAN_UP_ERROR",
context: ProviderCleanupService.name
},
() => this.cleanUpForWallet(wallet, options)
);
});

await Promise.all(cleanUpAllWallets);
});
}

private async cleanUpForWallet(wallet: UserWalletOutput, options: ProviderCleanupParams) {
const client = await this.txSignerService.getClientForAddressIndex(wallet.id);
const deployments = await this.deploymentRepository.findDeploymentsForProvider({
owner: wallet.address,
provider: options.providerAddress
});

const closeAllWalletStaleDeployments = deployments.map(async deployment => {
const message = this.rpcMessageService.getCloseDeploymentMsg(wallet.address, deployment.dseq);
this.logger.info({ event: "PROVIDER_CLEAN_UP", params: { owner: wallet.address, dseq: deployment.dseq } });

try {
await client.signAndBroadcast([message]);
this.logger.info({ event: "PROVIDER_CLEAN_UP_SUCCESS" });
} catch (error) {
if (error.message.includes("not allowed to pay fees")) {
await this.managedUserWalletService.authorizeSpending({
address: wallet.address,
limits: {
fees: this.config.FEE_ALLOWANCE_REFILL_AMOUNT
}
});

await client.signAndBroadcast([message]);
this.logger.info({ event: "PROVIDER_CLEAN_UP_SUCCESS" });
} else {
throw error;
}
}
});

await Promise.all(closeAllWalletStaleDeployments);
}
}
14 changes: 13 additions & 1 deletion apps/api/src/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { chainDb } from "@src/db/dbConnection";
import { TopUpDeploymentsController } from "@src/deployment/controllers/deployment/deployment.controller";
import { UserController } from "@src/user/controllers/user/user.controller";
import { UserConfigService } from "@src/user/services/user-config/user-config.service";
import { ProviderController } from "./deployment/controllers/provider/provider.controller";

const program = new Command();

Expand Down Expand Up @@ -42,13 +43,24 @@ program
program
.command("cleanup-stale-deployments")
.description("Close deployments without leases created at least 10min ago")
.option("-c, --concurrency <number>", "How much wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-c, --concurrency <number>", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
await container.resolve(TopUpDeploymentsController).cleanUpStaleDeployment(options);
});
});

program
.command("cleanup-provider-deployments")
.description("Close trial deployments for a provider")
.option("-c, --concurrency <number>", "How many wallets is processed concurrently", value => z.number({ coerce: true }).optional().default(10).parse(value))
.option("-p, --provider <string>", "Provider address", value => z.string().parse(value))
.action(async (options, command) => {
await executeCliHandler(command.name(), async () => {
await container.resolve(ProviderController).cleanupProviderDeployments(options);
});
});

const userConfig = container.resolve(UserConfigService);
program
.command("cleanup-stale-anonymous-users")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import { singleton } from "tsyringe";

import { TrialProvidersService } from "@src/deployment/services/trial-providers/trial-providers.service";
import { ProviderCleanupParams, ProviderCleanupService } from "@src/billing/services/provider-cleanup/provider-cleanup.service";

@singleton()
export class ProviderController {
constructor(private readonly trialProvidersService: TrialProvidersService) {}
constructor(
private readonly trialProvidersService: TrialProvidersService,
private readonly providerCleanupService: ProviderCleanupService
) {}

async getTrialProviders(): Promise<string[]> {
return await this.trialProvidersService.getTrialProviders();
}

async cleanupProviderDeployments(options: ProviderCleanupParams) {
return await this.providerCleanupService.cleanup(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ export interface StaleDeploymentsOptions {
owner: string;
}

export interface ProviderCleanupOptions {
owner: string;
provider: string;
}

export interface StaleDeploymentsOutput {
dseq: number;
}
Expand Down Expand Up @@ -37,4 +42,26 @@ export class DeploymentRepository {

return deployments ? (deployments as unknown as StaleDeploymentsOutput[]) : [];
}

async findDeploymentsForProvider(options: ProviderCleanupOptions): Promise<StaleDeploymentsOutput[]> {
const deployments = await Deployment.findAll({
attributes: ["dseq"],
where: {
owner: options.owner
},
include: [
{
model: Lease,
attributes: [],
required: true,
where: {
provider: options.provider
}
}
],
raw: true
});

return deployments ? (deployments as unknown as StaleDeploymentsOutput[]) : [];
}
}

0 comments on commit 57ed601

Please sign in to comment.