From 5759c98bff7ca3a0996a6d5334f25341dd39dd0c Mon Sep 17 00:00:00 2001 From: Mikael Grankvist Date: Thu, 19 Dec 2024 09:28:17 +0200 Subject: [PATCH] feat: queue message payloads Add sent payloads to message queue and resend if no response to message inside MaxMessageSuspendTimeout fixes #20507 --- .../client/communication/MessageHandler.java | 4 +- .../client/communication/MessageSender.java | 77 ++++++++++++++++++- 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java b/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java index 98533e45c1e..9963c03cbf0 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/MessageHandler.java @@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) { } /** - * Should only prepare resync after the if (locked || + * Should only prepare resync after the (locked || * !isNextExpectedMessage(serverId)) {...} since * stateTree.repareForResync() will remove the nodes, and if locked is - * true, it will return without handling the message, thus won't adding + * true, it will return without handling the message, thus won't add * nodes back. * * This is related to https://github.com/vaadin/flow/issues/8699 It diff --git a/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java b/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java index 6182fde8123..61ccc0bff96 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/MessageSender.java @@ -15,7 +15,11 @@ */ package com.vaadin.client.communication; +import java.util.ArrayList; +import java.util.List; + import com.google.gwt.core.client.GWT; +import com.google.gwt.user.client.Timer; import com.vaadin.client.ConnectionIndicator; import com.vaadin.client.Console; @@ -67,6 +71,10 @@ public enum ResynchronizationState { private JsonObject pushPendingMessage; + private List messageQueue = new ArrayList<>(); + + private Timer resendMessageTimer; + /** * Creates a new instance connected to the given registry. * @@ -146,6 +154,8 @@ private void doSendInvocationsToServer() { if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) { resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE; Console.warn("Resynchronizing from server"); + messageQueue.clear(); + resetTimer(); extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true); } if (showLoadingIndicator) { @@ -166,7 +176,6 @@ protected void send(final JsonArray reqInvocations, final JsonObject extraJson) { registry.getRequestResponseTracker().startRequest(); send(preparePayload(reqInvocations, extraJson)); - } private JsonObject preparePayload(final JsonArray reqInvocations, @@ -192,12 +201,33 @@ private JsonObject preparePayload(final JsonArray reqInvocations, /** * Sends an asynchronous or synchronous UIDL request to the server using the - * given URI. + * given URI. Adds message to message queue and postpones sending if queue + * not empty. * * @param payload * The contents of the request to send */ public void send(final JsonObject payload) { + if (!registry.getRequestResponseTracker().hasActiveRequest()) { + // Someone called send directly as request not set active. + // If queue empty add message and wait to send. + if (!messageQueue.isEmpty()) { + messageQueue.add(payload); + return; + } + } + messageQueue.add(payload); + sendPayload(payload); + } + + /** + * Sends an asynchronous or synchronous UIDL request to the server using the + * given URI. + * + * @param payload + * The contents of the request to send + */ + private void sendPayload(final JsonObject payload) { if (push != null && push.isBidirectional()) { // When using bidirectional transport, the payload is not resent // to the server during reconnection attempts. @@ -211,6 +241,30 @@ public void send(final JsonObject payload) { } else { Console.debug("send XHR"); registry.getXhrConnection().send(payload); + + resetTimer(); + // resend last payload if response hasn't come in. + resendMessageTimer = new Timer() { + @Override + public void run() { + resendMessageTimer = null; + if (!registry.getRequestResponseTracker() + .hasActiveRequest()) { + registry.getRequestResponseTracker().startRequest(); + } + registry.getXhrConnection().send(payload); + } + }; + resendMessageTimer.schedule(registry.getApplicationConfiguration() + .getMaxMessageSuspendTimeout() + 500); + } + } + + private void resetTimer() { + if (resendMessageTimer != null) { + resendMessageTimer.cancel(); + resendMessageTimer = null; + } } @@ -289,6 +343,8 @@ public String getCommunicationMethodName() { */ public void resynchronize() { if (requestResynchronize()) { + messageQueue.clear(); + resetTimer(); sendInvocationsToServer(); } } @@ -311,12 +367,29 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) { ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) { pushPendingMessage = null; } + if (!messageQueue.isEmpty()) { + synchronized (messageQueue) { + // If queued message is the expected one. remove from queue + // and sen next message if any. + if (messageQueue.get(0) + .getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID) + + 1 == nextExpectedId) { + resetTimer(); + messageQueue.remove(0); + if (!messageQueue.isEmpty()) { + sendPayload(messageQueue.get(0)); + } + } + } + } return; } if (force) { Console.debug( "Forced update of clientId to " + clientToServerMessageId); clientToServerMessageId = nextExpectedId; + messageQueue.clear(); + resetTimer(); return; }