diff --git a/libs/spoke-codegen/src/graphql/autosending.graphql b/libs/spoke-codegen/src/graphql/autosending.graphql index eaafff85c..8e6107c4d 100644 --- a/libs/spoke-codegen/src/graphql/autosending.graphql +++ b/libs/spoke-codegen/src/graphql/autosending.graphql @@ -3,6 +3,7 @@ fragment BasicAutosendingTarget on Campaign { title isStarted autosendStatus + autosendLimit } fragment DetailedAutosendingTarget on Campaign { @@ -28,6 +29,16 @@ fragment AutosendingTarget on Campaign { ...DetailedAutosendingTarget } +query GetCampaignAutosendingLimit($campaignId: String!) { + campaign(id: $campaignId) { + id + autosendLimit + stats { + countMessagedContacts + } + } +} + query CampaignsEligibleForAutosending( $organizationId: String! $isStarted: Boolean! @@ -41,7 +52,7 @@ query CampaignsEligibleForAutosending( ) { campaigns { ...BasicAutosendingTarget - ...DetailedAutosendingTarget @skip (if: $isBasic) + ...DetailedAutosendingTarget @skip(if: $isBasic) } } } @@ -60,3 +71,10 @@ mutation PauseAutosending($campaignId: String!) { autosendStatus } } + +mutation UpdateCampaignAutosendingLimit($campaignId: String!, $limit: Int) { + updateCampaignAutosendingLimit(campaignId: $campaignId, limit: $limit) { + id + autosendLimit + } +} diff --git a/migrations/20221016075536_add_autosend_limit.js b/migrations/20221016075536_add_autosend_limit.js new file mode 100644 index 000000000..17776b27f --- /dev/null +++ b/migrations/20221016075536_add_autosend_limit.js @@ -0,0 +1,295 @@ +exports.up = function up(knex) { + return knex.schema + .alterTable("all_campaign", (table) => { + table.integer("autosend_limit"); + table.integer("autosend_limit_max_contact_id"); + table + .foreign("autosend_limit_max_contact_id") + .references("campaign_contact.id"); + }) + .then(() => { + return knex.schema.raw(` + create or replace view campaign as + select + id, + organization_id, + title, + description, + is_started, + due_by, + created_at, + is_archived, + use_dynamic_assignment, + logo_image_url, + intro_html, + primary_color, + texting_hours_start, + texting_hours_end, + timezone, + creator_id, + is_autoassign_enabled, + limit_assignment_to_teams, + updated_at, + replies_stale_after_minutes, + landlines_filtered, + external_system_id, + is_approved, + autosend_status, + autosend_user_id, + messaging_service_sid, + autosend_limit, + autosend_limit_max_contact_id + from all_campaign + where is_template = false; + `); + }); +}; + +exports.down = function down(knex) { + return knex.schema + .raw( + ` + drop view campaign cascade; + create view campaign as + select + id, + organization_id, + title, + description, + is_started, + due_by, + created_at, + is_archived, + use_dynamic_assignment, + logo_image_url, + intro_html, + primary_color, + texting_hours_start, + texting_hours_end, + timezone, + creator_id, + is_autoassign_enabled, + limit_assignment_to_teams, + updated_at, + replies_stale_after_minutes, + landlines_filtered, + external_system_id, + is_approved, + autosend_status, + autosend_user_id, + messaging_service_sid + from all_campaign + where is_template = false; + + create or replace view assignable_campaigns as ( + select id, title, organization_id, limit_assignment_to_teams + from campaign + where is_started = true + and is_archived = false + and is_autoassign_enabled = true + ); + + create or replace view assignable_campaign_contacts as ( + select + campaign_contact.id, campaign_contact.campaign_id, + campaign_contact.message_status, campaign.texting_hours_end, + campaign_contact.timezone::text as contact_timezone + from campaign_contact + join campaign on campaign_contact.campaign_id = campaign.id + where assignment_id is null + and is_opted_out = false + and archived = false + and not exists ( + select 1 + from campaign_contact_tag + join tag on campaign_contact_tag.tag_id = tag.id + where tag.is_assignable = false + and campaign_contact_tag.campaign_contact_id = campaign_contact.id + ) + ); + + create or replace view assignable_needs_message as ( + select acc.id, acc.campaign_id, acc.message_status + from assignable_campaign_contacts as acc + join campaign on campaign.id = acc.campaign_id + where message_status = 'needsMessage' + and ( + ( acc.contact_timezone is null + and extract(hour from CURRENT_TIMESTAMP at time zone campaign.timezone) < campaign.texting_hours_end + and extract(hour from CURRENT_TIMESTAMP at time zone campaign.timezone) >= campaign.texting_hours_start + ) + or + ( campaign.texting_hours_end > extract(hour from (CURRENT_TIMESTAMP at time zone acc.contact_timezone) + interval '10 minutes') + and campaign.texting_hours_start <= extract(hour from (CURRENT_TIMESTAMP at time zone acc.contact_timezone)) + ) + ) + ); + + create or replace view assignable_campaigns_with_needs_message as ( + select * + from assignable_campaigns + where + exists ( + select 1 + from assignable_needs_message + where campaign_id = assignable_campaigns.id + ) + and not exists ( + select 1 + from campaign + where campaign.id = assignable_campaigns.id + and now() > date_trunc('day', (due_by + interval '24 hours') at time zone campaign.timezone) + ) + ); + + create or replace view assignable_needs_reply as ( + select acc.id, acc.campaign_id, acc.message_status + from assignable_campaign_contacts as acc + join campaign on campaign.id = acc.campaign_id + where message_status = 'needsResponse' + and ( + ( acc.contact_timezone is null + and extract(hour from CURRENT_TIMESTAMP at time zone campaign.timezone) < campaign.texting_hours_end + and extract(hour from CURRENT_TIMESTAMP at time zone campaign.timezone) >= campaign.texting_hours_start + ) + or + ( campaign.texting_hours_end > extract(hour from (CURRENT_TIMESTAMP at time zone acc.contact_timezone) + interval '2 minutes') + and campaign.texting_hours_start <= extract(hour from (CURRENT_TIMESTAMP at time zone acc.contact_timezone)) + ) + ) + ); + + create or replace view assignable_campaigns_with_needs_reply as ( + select * + from assignable_campaigns + where exists ( + select 1 + from assignable_needs_reply + where campaign_id = assignable_campaigns.id + ) + ); + + create or replace view assignable_needs_reply_with_escalation_tags as ( + select acc.id, acc.campaign_id, acc.message_status, acc.applied_escalation_tags + from assignable_campaign_contacts_with_escalation_tags as acc + join campaign on campaign.id = acc.campaign_id + where message_status = 'needsResponse' + and ( + ( acc.contact_timezone is null + and extract(hour from CURRENT_TIMESTAMP at time zone campaign.timezone) < campaign.texting_hours_end + and extract(hour from CURRENT_TIMESTAMP at time zone campaign.timezone) >= campaign.texting_hours_start + ) + or + ( campaign.texting_hours_end > extract(hour from (CURRENT_TIMESTAMP at time zone acc.contact_timezone) + interval '2 minutes') + and campaign.texting_hours_start <= extract(hour from (CURRENT_TIMESTAMP at time zone acc.contact_timezone)) + ) + ) + ); + + create or replace view public.missing_external_sync_question_response_configuration as + select + all_values.*, + external_system.id as system_id + from ( + select + istep.campaign_id, + istep.parent_interaction_id as interaction_step_id, + istep.answer_option as value, + exists ( + select 1 + from public.question_response as istep_qr + where + istep_qr.interaction_step_id = istep.parent_interaction_id + and istep_qr.value = istep.answer_option + ) as is_required + from public.interaction_step istep + where istep.parent_interaction_id is not null + union + select + qr_istep.campaign_id, + qr.interaction_step_id, + qr.value, + true as is_required + from public.question_response as qr + join public.interaction_step qr_istep on qr_istep.id = qr.interaction_step_id + ) all_values + join campaign on campaign.id = all_values.campaign_id + join external_system + on external_system.organization_id = campaign.organization_id + where + not exists ( + select 1 + from public.all_external_sync_question_response_configuration aqrc + where + all_values.campaign_id = aqrc.campaign_id + and external_system.id = aqrc.system_id + and all_values.interaction_step_id = aqrc.interaction_step_id + and all_values.value = aqrc.question_response_value + ); + + create or replace view sendable_campaigns as ( + select id, title, organization_id, limit_assignment_to_teams, autosend_status, is_autoassign_enabled + from campaign + where is_started and not is_archived + ); + + create or replace view assignable_campaigns as ( + select id, title, organization_id, limit_assignment_to_teams, autosend_status + from sendable_campaigns + where is_autoassign_enabled + ); + + create or replace view assignable_campaigns_with_needs_message as ( + select * + from assignable_campaigns + where + exists ( + select 1 + from assignable_needs_message + where campaign_id = assignable_campaigns.id + ) + and not exists ( + select 1 + from campaign + where campaign.id = assignable_campaigns.id + and now() > date_trunc('day', (due_by + interval '24 hours') at time zone campaign.timezone) + ) + and autosend_status <> 'sending' + ); + + create or replace view assignable_campaigns_with_needs_reply as ( + select * + from assignable_campaigns + where exists ( + select 1 + from assignable_needs_reply + where campaign_id = assignable_campaigns.id + ) + ); + + create or replace view autosend_campaigns_to_send as ( + select * + from sendable_campaigns + where + exists ( -- assignable contacts are valid for both autoassign and autosending + select 1 + from assignable_needs_message + where campaign_id = sendable_campaigns.id + ) + and not exists ( + select 1 + from campaign + where campaign.id = sendable_campaigns.id + and now() > date_trunc('day', (due_by + interval '24 hours') at time zone campaign.timezone) + ) + and autosend_status = 'sending' + ); + ` + ) + .then(() => + knex.schema.alterTable("all_campaign", (table) => { + table.dropColumn("autosend_limit"); + table.dropColumn("autosend_limit_max_contact_id"); + }) + ); +}; diff --git a/schema-dump.sql b/schema-dump.sql index 664cd40ca..5fa1e4823 100644 --- a/schema-dump.sql +++ b/schema-dump.sql @@ -223,6 +223,8 @@ CREATE TABLE public.all_campaign ( autosend_user_id integer, is_template boolean DEFAULT false NOT NULL, messaging_service_sid text, + autosend_limit integer, + autosend_limit_max_contact_id integer, CONSTRAINT campaign_autosend_status_check CHECK ((autosend_status = ANY (ARRAY['unstarted'::text, 'sending'::text, 'paused'::text, 'complete'::text]))) ); @@ -1479,7 +1481,9 @@ CREATE VIEW public.campaign AS all_campaign.is_approved, all_campaign.autosend_status, all_campaign.autosend_user_id, - all_campaign.messaging_service_sid + all_campaign.messaging_service_sid, + all_campaign.autosend_limit, + all_campaign.autosend_limit_max_contact_id FROM public.all_campaign WHERE (all_campaign.is_template = false); @@ -5060,6 +5064,14 @@ CREATE TRIGGER _500_user_team_updated_at BEFORE UPDATE ON public.user_team FOR E CREATE TRIGGER _500_user_updated_at BEFORE UPDATE ON public."user" FOR EACH ROW EXECUTE FUNCTION public.universal_updated_at(); +-- +-- Name: all_campaign all_campaign_autosend_limit_max_contact_id_foreign; Type: FK CONSTRAINT; Schema: public; Owner: postgres +-- + +ALTER TABLE ONLY public.all_campaign + ADD CONSTRAINT all_campaign_autosend_limit_max_contact_id_foreign FOREIGN KEY (autosend_limit_max_contact_id) REFERENCES public.campaign_contact(id); + + -- -- Name: all_campaign all_campaign_messaging_service_sid_foreign; Type: FK CONSTRAINT; Schema: public; Owner: postgres -- diff --git a/src/api/campaign.ts b/src/api/campaign.ts index 00eea6b34..4e0746809 100644 --- a/src/api/campaign.ts +++ b/src/api/campaign.ts @@ -90,6 +90,7 @@ export interface CampaignInput { timezone: string | null; repliesStaleAfter: number | null; messagingServiceSid: string | null; + autosendLimit: number | null; } export interface Campaign { @@ -127,6 +128,7 @@ export interface Campaign { deliverabilityStats: CampaignDeliverabilityStats; previewUrl?: string | null; messagingServiceSid?: string | null; + autosendLimit?: number | null; } export interface PaginatedCampaigns { @@ -264,6 +266,7 @@ export const schema = ` deliverabilityStats(filter: CampaignDeliverabilityStatsFilter): CampaignDeliverabilityStats! autosendStatus: String! messagingServiceSid: String + autosendLimit: Int } type CampaignEdge { @@ -331,5 +334,6 @@ export const schema = ` timezone: String repliesStaleAfter: Int messagingServiceSid: String + autosendLimit: Int } `; diff --git a/src/api/schema.ts b/src/api/schema.ts index ce976feb3..fdac602df 100644 --- a/src/api/schema.ts +++ b/src/api/schema.ts @@ -332,6 +332,7 @@ const rootSchema = ` markForSecondPass(campaignId: String!, input: SecondPassInput!): String! startAutosending(campaignId: String!): Campaign! pauseAutosending(campaignId: String!): Campaign! + updateCampaignAutosendingLimit(campaignId: String!, limit: Int): Campaign! unMarkForSecondPass(campaignId: String!): String! deleteNeedsMessage(campaignId: String!): String! insertLinkDomain(organizationId: String!, domain: String!, maxUsageCount: Int!): LinkDomain! diff --git a/src/containers/AdminAutosending/components/AutosendingBasicTargetRow.tsx b/src/containers/AdminAutosending/components/AutosendingBasicTargetRow.tsx index d37f4bc17..1836bbed5 100644 --- a/src/containers/AdminAutosending/components/AutosendingBasicTargetRow.tsx +++ b/src/containers/AdminAutosending/components/AutosendingBasicTargetRow.tsx @@ -9,14 +9,15 @@ import type { AutosendingTargetFragment } from "@spoke/spoke-codegen"; import React from "react"; import { Link } from "react-router-dom"; +import AutosendingLimitField from "./AutosendingLimitField"; import useChipStyles from "./chipStyles"; interface AutosendingTargetRowProps { target: AutosendingTargetFragment; organizationId: string; disabled?: boolean; - onStart?: () => Promise | unknown; - onPause?: () => Promise | unknown; + onStart: () => Promise | unknown; + onPause: () => Promise | unknown; } export const AutosendingTargetRow: React.FC = ( @@ -43,6 +44,9 @@ export const AutosendingTargetRow: React.FC = ( + + + {target.autosendStatus === "complete" ? undefined : target.autosendStatus === "sending" || diff --git a/src/containers/AdminAutosending/components/AutosendingLimitField.tsx b/src/containers/AdminAutosending/components/AutosendingLimitField.tsx new file mode 100644 index 000000000..c49fbbfa1 --- /dev/null +++ b/src/containers/AdminAutosending/components/AutosendingLimitField.tsx @@ -0,0 +1,83 @@ +import CircularProgress from "@material-ui/core/CircularProgress"; +import InputAdornment from "@material-ui/core/InputAdornment"; +import TextField from "@material-ui/core/TextField"; +import Tooltip from "@material-ui/core/Tooltip"; +import CheckCircleIcon from "@material-ui/icons/CheckCircle"; +import ErrorIcon from "@material-ui/icons/Error"; +import { + useGetCampaignAutosendingLimitQuery, + useUpdateCampaignAutosendingLimitMutation +} from "@spoke/spoke-codegen"; +import isNil from "lodash/isNil"; +import React, { useCallback, useMemo, useState } from "react"; +import { useDebouncedCallback } from "use-debounce"; + +export interface AutosendingLimitFieldProps { + campaignId: string; +} + +export const AutosendingLimitField: React.FC = ({ + campaignId +}) => { + const [inputValue, setInputValue] = useState(null); + + const { data, loading } = useGetCampaignAutosendingLimitQuery({ + variables: { campaignId } + }); + + const [ + updateAutosendingLimit, + { data: mutationData, loading: mutationLoading, error: mutationError } + ] = useUpdateCampaignAutosendingLimitMutation(); + + const debouncedUpdate = useDebouncedCallback((limit: number | null) => { + updateAutosendingLimit({ variables: { campaignId, limit } }); + }, 300); + + const countMessagedContacts = useMemo( + () => data?.campaign?.stats?.countMessagedContacts, + [data] + ); + + const handleOnChange = useCallback( + (event: React.ChangeEvent) => { + const limitInt = parseInt(event.target.value, 10); + const limit = Number.isNaN(limitInt) + ? null + : Math.max(limitInt, countMessagedContacts ?? 0); + setInputValue(isNil(limit) ? "" : `${limit}`); + debouncedUpdate(limit); + }, + [debouncedUpdate, countMessagedContacts] + ); + + return ( + + + + ) : mutationError !== undefined ? ( + + + + + + ) : mutationData !== undefined ? ( + + + + ) : undefined + }} + onChange={handleOnChange} + /> + ); +}; + +export default AutosendingLimitField; diff --git a/src/containers/AdminAutosending/components/AutosendingTargetRow.tsx b/src/containers/AdminAutosending/components/AutosendingTargetRow.tsx index fb5772f89..fcf8560cf 100644 --- a/src/containers/AdminAutosending/components/AutosendingTargetRow.tsx +++ b/src/containers/AdminAutosending/components/AutosendingTargetRow.tsx @@ -10,14 +10,15 @@ import type { AutosendingTargetFragment } from "@spoke/spoke-codegen"; import React from "react"; import { Link } from "react-router-dom"; +import AutosendingLimitField from "./AutosendingLimitField"; import useChipStyles from "./chipStyles"; interface AutosendingTargetRowProps { target: AutosendingTargetFragment; organizationId: string; disabled?: boolean; - onStart?: () => Promise | unknown; - onPause?: () => Promise | unknown; + onStart: () => Promise | unknown; + onPause: () => Promise | unknown; } export const AutosendingTargetRow: React.FC = ( @@ -55,6 +56,9 @@ export const AutosendingTargetRow: React.FC = ( + + + {target.autosendStatus === "complete" ? undefined : target.autosendStatus === "sending" || @@ -79,7 +83,6 @@ export const AutosendingTargetRow: React.FC = ( )} {target.contactsCount} - {target.deliverabilityStats.deliveredCount} {target.contactsCount! - diff --git a/src/containers/AdminAutosending/index.tsx b/src/containers/AdminAutosending/index.tsx index 8457aa4b9..bcf5bc8de 100644 --- a/src/containers/AdminAutosending/index.tsx +++ b/src/containers/AdminAutosending/index.tsx @@ -210,7 +210,7 @@ const AdminAutosending: React.FC = () => { {isBasic ? null : ( - + Progress Engagement @@ -219,6 +219,7 @@ const AdminAutosending: React.FC = () => { Campaign Autosending Status + Autosending Limit {/* Actions */} {isBasic ? null : ( diff --git a/src/schema.graphql b/src/schema.graphql index b5f090318..0caab3d28 100644 --- a/src/schema.graphql +++ b/src/schema.graphql @@ -299,6 +299,7 @@ type RootMutation { markForSecondPass(campaignId: String!, input: SecondPassInput!): String! startAutosending(campaignId: String!): Campaign! pauseAutosending(campaignId: String!): Campaign! + updateCampaignAutosendingLimit(campaignId: String!, limit: Int): Campaign! unMarkForSecondPass(campaignId: String!): String! deleteNeedsMessage(campaignId: String!): String! insertLinkDomain(organizationId: String!, domain: String!, maxUsageCount: Int!): LinkDomain! @@ -669,6 +670,7 @@ type Campaign { deliverabilityStats(filter: CampaignDeliverabilityStatsFilter): CampaignDeliverabilityStats! autosendStatus: String! messagingServiceSid: String + autosendLimit: Int } type CampaignEdge { @@ -736,6 +738,7 @@ input CampaignInput { timezone: String repliesStaleAfter: Int messagingServiceSid: String + autosendLimit: Int } diff --git a/src/server/api/campaign.js b/src/server/api/campaign.js index 307cec7b5..75db3f7e8 100644 --- a/src/server/api/campaign.js +++ b/src/server/api/campaign.js @@ -438,7 +438,8 @@ export const resolvers = { "isAutoassignEnabled", "createdAt", "landlinesFiltered", - "messagingServiceSid" + "messagingServiceSid", + "autosendLimit" ]), isApproved: (campaign) => isNil(campaign.is_approved) ? false : campaign.is_approved, diff --git a/src/server/api/schema.js b/src/server/api/schema.js index 34f8a2278..7ca092313 100644 --- a/src/server/api/schema.js +++ b/src/server/api/schema.js @@ -1979,6 +1979,60 @@ const rootMutations = { return result; }, + updateCampaignAutosendingLimit: async ( + _ignore, + { campaignId, limit }, + { user } + ) => { + const id = parseInt(campaignId, 10); + + const campaign = await r + .knex("all_campaign") + .where({ id }) + .first(["organization_id"]); + + const organizationId = campaign.organization_id; + await accessRequired(user, organizationId, "ADMIN", true); + + const updatedCampaign = + limit === null + ? await r + .knex("all_campaign") + .update({ + autosend_limit: null, + autosend_limit_max_contact_id: null + }) + .where({ id }) + .returning("*") + .then((rows) => rows[0]) + : await r.knex + .raw( + ` + update all_campaign + set + autosend_limit = ?::int, + autosend_limit_max_contact_id = ( + select max(id) + from ( + select id + from campaign_contact + where true + and campaign_id = ?::int + and archived = false + order by id asc + limit ?::int + ) campaign_contact_ids + ) + where id = ?::int + returning * + `, + [limit, id, limit, id] + ) + .then(({ rows }) => rows[0]); + + return updatedCampaign; + }, + unMarkForSecondPass: async (_ignore, { campaignId }, { user }) => { // verify permissions const campaign = await r diff --git a/src/server/api/types.ts b/src/server/api/types.ts index 8fa93ee3c..92338ed9f 100644 --- a/src/server/api/types.ts +++ b/src/server/api/types.ts @@ -86,6 +86,13 @@ export enum FilteredContactReason { OptedOut = "OPTEDOUT" } +export enum AutosendStatus { + Unstarted = "unstarted", + Sending = "sending", + Paused = "paused", + Complete = "complete" +} + export interface CampaignRecord { id: number; organization_id: number; @@ -111,6 +118,8 @@ export interface CampaignRecord { replies_stale_after_minutes: number | null; landlines_filtered: boolean; external_system_id: string | null; + autosend_status: AutosendStatus; + autosend_user_id: number; messaging_service_sid: string | null; } diff --git a/src/server/tasks/queue-autosend-initials.spec.ts b/src/server/tasks/queue-autosend-initials.spec.ts index 946116e73..ccf5154ba 100644 --- a/src/server/tasks/queue-autosend-initials.spec.ts +++ b/src/server/tasks/queue-autosend-initials.spec.ts @@ -1,6 +1,6 @@ import faker from "faker"; -import type { WorkerOptions } from "graphile-worker"; import { runTaskListOnce } from "graphile-worker"; +import type { PoolClient } from "pg"; import { Pool } from "pg"; import { @@ -14,24 +14,142 @@ import { import { config } from "../../config"; import type { CampaignContactRecord, + CampaignRecord, OrganizationRecord, UserRecord } from "../api/types"; +import { AutosendStatus, MessageStatusType } from "../api/types"; import { withClient } from "../utils"; import { QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER, queueAutoSendOrganizationInitials } from "./queue-autosend-initials"; +import { TASK_IDENTIFIER as RETRY_ISTEP_IDENTIFIER } from "./retry-interaction-step"; + +const TASK_IDENTIFIER = QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER; + +interface SetUpAutosendingOptions { + organizationId: number; + autosendUserId: number; + autosendStatus: AutosendStatus; + unmessagedCount: number; + messagedCount?: number; +} + +const setUpAutosending = async ( + client: PoolClient, + options: SetUpAutosendingOptions +) => { + const { + organizationId, + autosendUserId, + autosendStatus, + unmessagedCount, + messagedCount = 0 + } = options; + + const campaign = await createCampaign(client, { + organizationId, + isStarted: true, + autosendUserId, + autosendStatus + }); + await createInteractionStep(client, { + campaignId: campaign.id, + scriptOptions: ["Have a text {firstName}"] + }); + const assignment = await createAssignment(client, { + campaignId: campaign.id, + userId: autosendUserId + }); + const messagedContacts = await Promise.all( + [...Array(messagedCount)].map(() => + createCampaignContact(client, { + campaignId: campaign.id, + firstName: faker.name.firstName(), + messageStatus: MessageStatusType.Messaged + }) + ) + ); + const unmessagedContacts = await Promise.all( + [...Array(unmessagedCount)].map(() => + createCampaignContact(client, { + campaignId: campaign.id, + firstName: faker.name.firstName() + }) + ) + ); + + return { campaign, assignment, messagedContacts, unmessagedContacts }; +}; + +interface RunQueueAutosendOptions { + client: PoolClient; + pool: Pool; + organizationId: number; +} + +const runQueueAutosend = async ({ + client, + pool, + organizationId +}: RunQueueAutosendOptions) => { + await client.query(`select graphile_worker.add_job($1, $2)`, [ + TASK_IDENTIFIER, + { organization_id: organizationId } + ]); + + await runTaskListOnce( + { pgPool: pool }, + { [TASK_IDENTIFIER]: queueAutoSendOrganizationInitials }, + client + ); +}; + +const fetchTaskCount = async (client: PoolClient, campaignId: number) => + client + .query( + `select * from graphile_worker.jobs where task_identifier = $1 and payload->>'campaignId' = $2`, + [RETRY_ISTEP_IDENTIFIER, campaignId] + ) + .then(({ rowCount }) => rowCount); + +const runRetryInteractionSteps = async ( + client: PoolClient, + campaignId: number +) => { + await client.query<{ id: number }>( + ` + with cc_ids as ( + delete from graphile_worker.jobs + where true + and task_identifier = $1 + and payload->>'campaignId' = $2 + returning (payload->>'campaignContactId')::integer as id + ) + update campaign_contact + set message_status = 'messaged' + where id in (select id from cc_ids) + `, + [RETRY_ISTEP_IDENTIFIER, campaignId] + ); +}; + +const cleanUp = async (pool: Pool) => { + const taskIdentifiers = [TASK_IDENTIFIER, RETRY_ISTEP_IDENTIFIER]; + await pool.query( + `delete from graphile_worker.jobs where task_identifier = ANY($1)`, + [taskIdentifiers] + ); +}; describe("queue-autosend-organization-initials", () => { let pool: Pool; - let workerOptions: WorkerOptions; let texter: UserRecord; let organization: OrganizationRecord; beforeAll(async () => { pool = new Pool({ connectionString: config.TEST_DATABASE_URL }); - workerOptions = { pgPool: pool }; await withClient(pool, async (client) => { // Set up campaign contact texter = await createTexter(client, {}); @@ -39,47 +157,24 @@ describe("queue-autosend-organization-initials", () => { }); }); + afterEach(async () => { + await cleanUp(pool); + }); + afterAll(async () => { if (pool) await pool.end(); }); it("sends queues valid contacts when run", async () => { await withClient(pool, async (client) => { - const campaign = await createCampaign(client, { + const { campaign, assignment } = await setUpAutosending(client, { organizationId: organization.id, - isStarted: true, autosendUserId: texter.id, - autosendStatus: "sending" - }); - await createInteractionStep(client, { - campaignId: campaign.id, - scriptOptions: ["Have a text {firstName}"] - }); - const assignment = await createAssignment(client, { - campaignId: campaign.id, - userId: texter.id + autosendStatus: AutosendStatus.Sending, + unmessagedCount: 3 }); - await Promise.all( - [...Array(3)].map(() => - createCampaignContact(client, { - campaignId: campaign.id, - firstName: faker.name.firstName() - }) - ) - ); - await client.query(`select graphile_worker.add_job($1, $2)`, [ - "queue-autosend-organization-initials", - { organization_id: organization.id } - ]); - - await runTaskListOnce( - workerOptions, - { - "queue-autosend-organization-initials": queueAutoSendOrganizationInitials - }, - client - ); + await runQueueAutosend({ client, pool, organizationId: organization.id }); const { rows: contacts } = await client.query( `select * from campaign_contact where campaign_id = $1`, @@ -88,58 +183,21 @@ describe("queue-autosend-organization-initials", () => { expect(contacts).toHaveLength(3); expect(contacts[0].assignment_id).toBe(assignment.id); - const { - rowCount - } = await client.query( - `select * from graphile_worker.jobs where payload->>'campaignId' = $1`, - [campaign.id] - ); - expect(rowCount).toBe(3); - - await pool.query( - `delete from graphile_worker.jobs where task_identifier = ANY($1)`, - [["queue-autosend-organization-initials", "retry-interaction-step"]] - ); + const taskCount = await fetchTaskCount(client, campaign.id); + expect(taskCount).toBe(3); }); }); it("does not queue invalid contacts when run", async () => { await withClient(pool, async (client) => { - const campaign = await createCampaign(client, { + const { campaign, assignment } = await setUpAutosending(client, { organizationId: organization.id, - isStarted: true, autosendUserId: texter.id, - autosendStatus: "unstarted" - }); - await createInteractionStep(client, { - campaignId: campaign.id, - scriptOptions: ["Have a text {firstName}"] - }); - const assignment = await createAssignment(client, { - campaignId: campaign.id, - userId: texter.id + autosendStatus: AutosendStatus.Unstarted, + unmessagedCount: 3 }); - await Promise.all( - [...Array(3)].map(() => - createCampaignContact(client, { - campaignId: campaign.id, - firstName: faker.name.firstName() - }) - ) - ); - await client.query(`select graphile_worker.add_job($1, $2)`, [ - QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER, - { organization_id: organization.id } - ]); - - await runTaskListOnce( - workerOptions, - { - [QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER]: queueAutoSendOrganizationInitials - }, - client - ); + await runQueueAutosend({ client, pool, organizationId: organization.id }); const { rows: contacts } = await client.query( `select * from campaign_contact where campaign_id = $1`, @@ -151,23 +209,8 @@ describe("queue-autosend-organization-initials", () => { expect(contact.assignment_id).toBeNull(); } - const { - rowCount - } = await client.query( - `select * from graphile_worker.jobs where payload->>'campaignId' = $1`, - [campaign.id] - ); - expect(rowCount).toBe(0); - - await pool.query( - `delete from graphile_worker.jobs where task_identifier = ANY($1)`, - [ - [ - QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER, - "retry-interaction-step" - ] - ] - ); + const taskCount = await fetchTaskCount(client, campaign.id); + expect(taskCount).toBe(0); }); }); @@ -179,62 +222,103 @@ describe("queue-autosend-organization-initials", () => { // only relying on job_key it("does not double queue contacts after change to include assigned messages", async () => { await withClient(pool, async (client) => { - const campaign = await createCampaign(client, { + const { campaign } = await setUpAutosending(client, { organizationId: organization.id, - isStarted: true, autosendUserId: texter.id, - autosendStatus: "sending" + autosendStatus: AutosendStatus.Sending, + unmessagedCount: 6 }); - await createInteractionStep(client, { - campaignId: campaign.id, - scriptOptions: ["Have a text {firstName}"] + await runQueueAutosend({ client, pool, organizationId: organization.id }); + await runQueueAutosend({ client, pool, organizationId: organization.id }); + + const taskCount = await fetchTaskCount(client, campaign.id); + expect(taskCount).toBe(6); + }); + }); + + it("only queues up to the limit and then pauses", async () => { + await withClient(pool, async (client) => { + const unmessagedCount = 10; + const limit = 5; + + const { campaign, unmessagedContacts } = await setUpAutosending(client, { + organizationId: organization.id, + autosendUserId: texter.id, + autosendStatus: AutosendStatus.Sending, + unmessagedCount }); - await Promise.all( - [...Array(6)].map(() => - createCampaignContact(client, { - campaignId: campaign.id, - firstName: faker.name.firstName() - }) - ) + const unmessagedContactIds = unmessagedContacts.map((cc) => cc.id); + unmessagedContactIds.sort(); + + await client.query( + ` + update campaign + set + autosend_limit = $1, + autosend_limit_max_contact_id = $2 + where id = $3 + `, + [limit, unmessagedContactIds[limit - 1], campaign.id] ); - const runQueueAutoSendInitials = async () => { - await client.query(`select graphile_worker.add_job($1, $2)`, [ - QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER, - { organization_id: organization.id } - ]); + await runQueueAutosend({ client, pool, organizationId: organization.id }); - await runTaskListOnce( - workerOptions, - { - [QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER]: queueAutoSendOrganizationInitials - }, - client - ); - }; + const taskCount = await fetchTaskCount(client, campaign.id); + expect(taskCount).toBe(limit); - await runQueueAutoSendInitials(); - await runQueueAutoSendInitials(); + await runRetryInteractionSteps(client, campaign.id); + await runQueueAutosend({ client, pool, organizationId: organization.id }); const { - rowCount - } = await client.query( - `select * from graphile_worker.jobs where payload->>'campaignId' = $1`, + rows: [finalCampaign] + } = await client.query( + `select autosend_status from campaign where id = $1`, [campaign.id] ); - expect(rowCount).toBe(6); - - await pool.query( - `delete from graphile_worker.jobs where task_identifier = ANY($1)`, - [ - [ - QUEUE_AUTOSEND_ORGANIZATION_INITIALS_TASK_IDENTIFIER, - "retry-interaction-step" - ] - ] + expect(finalCampaign.autosend_status).toBe(AutosendStatus.Paused); + }); + }); + + it("marks completed campaign with autosend limit as completed", async () => { + await withClient(pool, async (client) => { + const messagedCount = 25; + + const { campaign, messagedContacts } = await setUpAutosending(client, { + organizationId: organization.id, + autosendUserId: texter.id, + autosendStatus: AutosendStatus.Sending, + messagedCount, + unmessagedCount: 0 + }); + + const maxMessagedContactId = Math.max( + ...messagedContacts.map((cc) => cc.id) + ); + + await client.query( + ` + update campaign + set + autosend_limit = $1, + autosend_limit_max_contact_id = $2 + where id = $3 + `, + [messagedCount, maxMessagedContactId, campaign.id] + ); + + await runQueueAutosend({ client, pool, organizationId: organization.id }); + await runRetryInteractionSteps(client, campaign.id); + await runQueueAutosend({ client, pool, organizationId: organization.id }); + + const { + rows: [finalCampaign] + } = await client.query( + `select * from campaign where id = $1`, + [campaign.id] ); + expect(finalCampaign.autosend_status).toBe(AutosendStatus.Complete); }); }); }); diff --git a/src/server/tasks/queue-autosend-initials.ts b/src/server/tasks/queue-autosend-initials.ts index d5196d56a..d60f97be1 100644 --- a/src/server/tasks/queue-autosend-initials.ts +++ b/src/server/tasks/queue-autosend-initials.ts @@ -1,5 +1,4 @@ import type { Task } from "graphile-worker"; -import { fromPairs } from "lodash"; import { config } from "../../config"; @@ -68,6 +67,10 @@ export const queueAutoSendOrganizationInitials: Task = async ( and cc.archived = false and cc.message_status = 'needsMessage' and cc.is_opted_out = false + and ( + c.autosend_limit_max_contact_id is null + or cc.id <= c.autosend_limit_max_contact_id + ) -- campaign requirements for autosending and c.is_archived = false and c.is_started = true @@ -168,45 +171,58 @@ export const queueAutoSendOrganizationInitials: Task = async ( [contactsToQueueInOneMinute, autosendingMps, organizationId] ); - const { rows: totalCountToSend } = await helpers.query<{ - campaign_id: string; - total_count_to_send: string; - }>( + const campaignIdsQueued = contactsQueued.map((cq) => cq.campaign_id); + + await helpers.query( ` - select cc.campaign_id, count(*) as total_count_to_send - from campaign_contact cc - join campaign c on cc.campaign_id = c.id + with sendable_contacts as ( + select id, campaign_id + from campaign_contact + where true + and archived = false + and message_status = 'needsMessage' + and is_opted_out = false + ), + campaign_summary_raw as ( + select + c.id as campaign_id, + c.autosend_limit_max_contact_id, + count(cc.id) as total_count_to_send, + min(cc.id) as min_cc_id + from campaign c + left join sendable_contacts cc on cc.campaign_id = c.id + where true + -- organization requirements for autosending + and c.organization_id = $1 + -- campaign requirements for autosending + and c.id = ANY ($2::integer[]) + and c.is_archived = false + and c.is_started = true + and c.autosend_status = 'sending' + group by 1, 2 + ), + campaign_summary as ( + select + campaign_id, + (case + when total_count_to_send = 0 then 'complete' + when ( + autosend_limit_max_contact_id is not null + and min_cc_id > autosend_limit_max_contact_id + ) then 'paused' + else null + end) as new_autosend_status + from campaign_summary_raw + ) + update campaign + set autosend_status = new_autosend_status + from campaign_summary where true - -- organization requirements for autosending - and c.organization_id = $1 - -- campaign requirements for autosending - and c.is_archived = false - and c.is_started = true - and c.autosend_status = 'sending' - and message_status = 'needsMessage' - and is_opted_out = false - group by 1 + and organization_id = $1 + and campaign_summary.campaign_id = campaign.id + and new_autosend_status is not null + and id = ANY($2::integer[]) `, - [organizationId] - ); - - const countToSendMap = fromPairs( - totalCountToSend.map(({ campaign_id, total_count_to_send }) => [ - campaign_id, - total_count_to_send - ]) - ); - - const toMarkAsDoneSending = contactsQueued - .filter((campaign) => { - const totalCountToSendForCampaign = - countToSendMap[campaign.campaign_id] || 0; - return totalCountToSendForCampaign === 0; - }) - .map((campaign) => campaign.campaign_id); - - await helpers.query( - `update campaign set autosend_status = 'complete' where id = ANY($1::integer[]) and organization_id = $2`, - [toMarkAsDoneSending, organizationId] + [organizationId, campaignIdsQueued] ); }; diff --git a/src/server/tasks/retry-interaction-step.ts b/src/server/tasks/retry-interaction-step.ts index 583363efe..40bbedfb8 100644 --- a/src/server/tasks/retry-interaction-step.ts +++ b/src/server/tasks/retry-interaction-step.ts @@ -17,6 +17,8 @@ import type { import { r } from "../models"; import { SendTimeMessagingError } from "../send-message-errors"; +export const TASK_IDENTIFIER = "retry-interaction-step"; + export interface RetryInteractionStepPayload { campaignContactId: number; unassignAfterSend?: boolean;