Skip to content

Commit

Permalink
fix: going through missed days
Browse files Browse the repository at this point in the history
  • Loading branch information
sshanzel committed Dec 19, 2024
1 parent 8073118 commit 17dba79
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 142 deletions.
2 changes: 2 additions & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ export const ONE_DAY_IN_SECONDS = ONE_HOUR_IN_SECONDS * 24;
export const ONE_WEEK_IN_SECONDS = ONE_DAY_IN_SECONDS * 7;

export const MAX_FOLLOWERS_LIMIT = 5_000;

export const SUCCESSFUL_CIO_SYNC_DATE = 'successful_cio_sync_date';
46 changes: 27 additions & 19 deletions src/common/googleCloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { PropsParameters } from '../types';
import path from 'path';
import { BigQuery } from '@google-cloud/bigquery';
import { Query } from '@google-cloud/bigquery/build/src/bigquery';
import { subDays } from 'date-fns';

export const downloadFile = async ({
url,
Expand Down Expand Up @@ -32,32 +33,33 @@ export const downloadJsonFile = async <T>({
return JSON.parse(result);
};

// TODO: turn this into a get query function so we can pass the dates to supply the query
export const userActiveStateQuery = `
with d as (
select uss.primary_user_id,
min(last_app_timestamp) as last_app_timestamp,
min(registration_timestamp) as registration_timestamp,
min(
case when period_end between date('2024-12-07' - interval 6*7 day) and '2024-12-07' then '1. active_last_6w'
when period_end between date('2024-12-07' - interval 12*7 day) and date('2024-12-07' - interval 6*7 + 1 day) then '2. active_7w_12w'
when date(u.last_app_timestamp) < date('2024-12-07' - interval 12*7 day) then '3. active_12w+'
when date(u.registration_timestamp) < date('2024-12-07' - interval 12*7 day) then '3. active_12w+'
case when period_end between date(@run_date - interval 6*7 day) and @run_date then '1. active_last_6w'
when period_end between date(@run_date - interval 12*7 day) and date(@run_date - interval 6*7 + 1 day) then '2. active_7w_12w'
when date(u.last_app_timestamp) < date(@run_date - interval 12*7 day) then '3. active_12w+'
when date(u.registration_timestamp) < date(@run_date - interval 12*7 day) then '3. active_12w+'
else '4. never_active' end
) as previous_state,
min(
case when period_end between date('2024-12-08' - interval 6*7 day) and '2024-12-08' then '1. active_last_6w'
when period_end between date('2024-12-08' - interval 12*7 day) and date('2024-12-08' - interval 6*7 + 1 day) then '2. active_7w_12w'
when date(u.last_app_timestamp) < date('2024-12-08' - interval 12*7 day) then '3. active_12w+'
when date(u.registration_timestamp) < date('2024-12-08' - interval 12*7 day) then '3. active_12w+'
case when period_end between date(@previous_date - interval 6*7 day) and @previous_date then '1. active_last_6w'
when period_end between date(@previous_date - interval 12*7 day) and date(@previous_date - interval 6*7 + 1 day) then '2. active_7w_12w'
when date(u.last_app_timestamp) < date(@previous_date - interval 12*7 day) then '3. active_12w+'
when date(u.registration_timestamp) < date(@previous_date - interval 12*7 day) then '3. active_12w+'
else '4. never_active' end
) as current_state,
from analytics.user as u
left join analytics.user_state_sparse as uss on uss.primary_user_id = u.primary_user_id
and uss.period_end between date('2024-12-07' - interval 12* 7 day) and '2024-12-08'
and uss.period_end between date(@run_date - interval 12* 7 day) and @previous_date
and uss.period = 'daily'
and uss.app_active_state = 'active'
and uss.registration_state = 'registered'
Expand All @@ -77,33 +79,39 @@ export const userActiveStateQuery = `
where previous_state != current_state
`;

interface GetUsersActiveState {
export const getUserActiveStateQuery = (runDate: Date): Query => {
const run_date = runDate.toISOString().split('T')[0];
const previous_date = subDays(runDate, 1).toISOString().split('T')[0];

return { query: userActiveStateQuery, params: { previous_date, run_date } };
};

export interface GetUsersActiveState {
inactiveUsers: string[];
downgradeUsers: string[];
reactivateUsers: string[];
}

interface UserActiveState {
export interface UserActiveState {
current_state: string;
previous_state: string;
primary_user_id: string;
}

const bigquery = new BigQuery();

export const queryFromBq = async (
query: string,
): Promise<UserActiveState[]> => {
const options: Query = { query };

const [job] = await bigquery.createQueryJob(options);
export const queryFromBq = async (query: Query): Promise<UserActiveState[]> => {
const [job] = await bigquery.createQueryJob(query);
const [rows] = await job.getQueryResults();

return rows;
};

export const getUsersActiveState = async (): Promise<GetUsersActiveState> => {
const usersFromBq = await queryFromBq(userActiveStateQuery);
export const getUsersActiveState = async (
runDate: Date,
): Promise<GetUsersActiveState> => {
const query = getUserActiveStateQuery(runDate);
const usersFromBq = await queryFromBq(query);
const inactiveUsers: string[] = [];
const downgradeUsers: string[] = [];
const reactivateUsers: string[] = [];
Expand Down
131 changes: 130 additions & 1 deletion src/common/mailing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import {
import CIORequest from 'customerio-node/dist/lib/request';
import { SendEmailRequestOptionalOptions } from 'customerio-node/lib/api/requests';
import { SendEmailRequestWithTemplate } from 'customerio-node/dist/lib/api/requests';
import { DataSource } from 'typeorm';
import { DataSource, In } from 'typeorm';
import {
Source,
User,
UserPersonalizedDigest,
UserPersonalizedDigestSendType,
UserPersonalizedDigestType,
} from '../entity';
import { blockingBatchRunner, callWithRetryDefault } from './async';
import { cioV2, generateIdentifyObject } from '../cio';
import { setTimeout } from 'node:timers/promises';
import { updateFlagsStatement } from './utils';
import { GetUsersActiveState } from './googleCloud';

export enum CioUnsubscribeTopic {
Marketing = '4',
Expand Down Expand Up @@ -171,3 +177,126 @@ export const addPrivateSourceJoinParams = ({

return urlObj.toString();
};

const ITEMS_PER_DESTROY = 4000;
const ITEMS_PER_IDENTIFY = 250;

interface SyncSubscriptionsWithActiveStateProps {
con: DataSource;
users: GetUsersActiveState;
}

export const syncSubscriptionsWithActiveState = async ({
con,
users: { inactiveUsers, downgradeUsers, reactivateUsers },
}: SyncSubscriptionsWithActiveStateProps) => {
await blockingBatchRunner({
data: reactivateUsers,
runner: async (batch) => {
const validReactivateUsers = await con.getRepository(User).find({
select: ['id'],
where: { id: In(batch), cioRegistered: false },
});

if (validReactivateUsers.length === 0) {
return true;
}

await blockingBatchRunner({
batchLimit: ITEMS_PER_IDENTIFY,
data: validReactivateUsers.map(({ id }) => id),
runner: async (ids) => {
const users = await con
.getRepository(User)
.find({ where: { id: In(ids) } });

const data = await Promise.all(
users.map((user) =>
generateIdentifyObject(con, JSON.parse(JSON.stringify(user))),
),
);

await callWithRetryDefault(() =>
cioV2.request.post('/users', { batch: data }),
);

await con
.getRepository(User)
.update({ id: In(ids) }, { cioRegistered: true });

await setTimeout(20); // wait for a bit to avoid rate limiting
},
});
},
});

// inactive for 12 weeks: remove from CIO
await blockingBatchRunner({
data: inactiveUsers,
runner: async (batch) => {
const validInactiveUsers = await con.getRepository(User).find({
select: ['id'],
where: { id: In(batch), cioRegistered: true },
});

if (validInactiveUsers.length === 0) {
return true;
}

await blockingBatchRunner({
batchLimit: ITEMS_PER_DESTROY,
data: validInactiveUsers.map(({ id }) => id),
runner: async (ids) => {
const data = ids.map((id) => ({
action: 'destroy',
type: 'person',
identifiers: { id },
}));

await callWithRetryDefault(() =>
cioV2.request.post('/users', { batch: data }),
);

await con.getRepository(User).update(
{ id: In(ids) },
{
cioRegistered: false,
acceptedMarketing: false,
followingEmail: false,
notificationEmail: false,
},
);

await setTimeout(20); // wait for a bit to avoid rate limiting
},
});
},
});

// inactive for 6 weeks: downgrade from daily to weekly digest
await blockingBatchRunner({
data: downgradeUsers,
runner: async (current) => {
const validDowngradeUsers = await con
.getRepository(User)
.createQueryBuilder('u')
.select('id')
.innerJoin(UserPersonalizedDigest, 'upd', 'u.id = upd."userId"')
.where('u.id IN (:...ids)', { ids: current })
.andWhere(`upd.flags->>'sendType' = 'daily'`)
.getRawMany<Pick<User, 'id'>>();

// set digest to weekly on Wednesday 9am
await con.getRepository(UserPersonalizedDigest).update(
{ userId: In(validDowngradeUsers.map(({ id }) => id)) },
{
preferredDay: 3,
preferredHour: 9,
flags: updateFlagsStatement({
sendType: UserPersonalizedDigestSendType.weekly,
}),
},
);
},
});
};
5 changes: 4 additions & 1 deletion src/common/users.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,10 @@ export enum LogoutReason {
KratosSessionAlreadyAvailable = 'kratos session already available',
}

const getAbsoluteDifferenceInDays: typeof differenceInDays = (date1, date2) => {
export const getAbsoluteDifferenceInDays: typeof differenceInDays = (
date1,
date2,
) => {
const day1 = startOfDay(date1);
const day2 = startOfDay(date2);

Expand Down
Loading

0 comments on commit 17dba79

Please sign in to comment.