Skip to content

Commit

Permalink
feat: queue message payloads
Browse files Browse the repository at this point in the history
Add sent payloads to message
queue and resend if no response
to message inside MaxMessageSuspendTimeout

fixes #20507
  • Loading branch information
caalador committed Dec 20, 2024
1 parent a273c7d commit 5b7dfd6
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) {
}

/**
* Should only prepare resync after the if (locked ||
* Should only prepare resync after the (locked ||
* !isNextExpectedMessage(serverId)) {...} since
* stateTree.repareForResync() will remove the nodes, and if locked is
* true, it will return without handling the message, thus won't adding
* true, it will return without handling the message, thus won't add
* nodes back.
*
* This is related to https://github.com/vaadin/flow/issues/8699 It
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package com.vaadin.client.communication;

import java.util.ArrayList;
import java.util.List;

import com.google.gwt.core.client.GWT;
import com.google.gwt.user.client.Timer;

import com.vaadin.client.ConnectionIndicator;
import com.vaadin.client.Console;
Expand Down Expand Up @@ -67,6 +71,10 @@ public enum ResynchronizationState {

private JsonObject pushPendingMessage;

private List<JsonObject> messageQueue = new ArrayList<>();

private Timer resendMessageTimer;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -146,6 +154,8 @@ private void doSendInvocationsToServer() {
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.warn("Resynchronizing from server");
messageQueue.clear();
resetTimer();
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
Expand All @@ -166,7 +176,6 @@ protected void send(final JsonArray reqInvocations,
final JsonObject extraJson) {
registry.getRequestResponseTracker().startRequest();
send(preparePayload(reqInvocations, extraJson));

}

private JsonObject preparePayload(final JsonArray reqInvocations,
Expand All @@ -192,12 +201,33 @@ private JsonObject preparePayload(final JsonArray reqInvocations,

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
* given URI. Adds message to message queue and postpones sending if queue
* not empty.
*
* @param payload
* The contents of the request to send
*/
public void send(final JsonObject payload) {
if (!registry.getRequestResponseTracker().hasActiveRequest()) {
// Someone called send directly as request not set active.
// If queue empty add message and wait to send.
if (!messageQueue.isEmpty()) {
messageQueue.add(payload);
return;
}
}
messageQueue.add(payload);
sendPayload(payload);
}

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
*
* @param payload
* The contents of the request to send
*/
private void sendPayload(final JsonObject payload) {
if (push != null && push.isBidirectional()) {
// When using bidirectional transport, the payload is not resent
// to the server during reconnection attempts.
Expand All @@ -211,6 +241,30 @@ public void send(final JsonObject payload) {
} else {
Console.debug("send XHR");
registry.getXhrConnection().send(payload);

resetTimer();
// resend last payload if response hasn't come in.
resendMessageTimer = new Timer() {
@Override
public void run() {
resendMessageTimer = null;
if (!registry.getRequestResponseTracker()
.hasActiveRequest()) {
registry.getRequestResponseTracker().startRequest();
}
registry.getXhrConnection().send(payload);
}
};
resendMessageTimer.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
}
}

private void resetTimer() {
if (resendMessageTimer != null) {
resendMessageTimer.cancel();
resendMessageTimer = null;

}
}

Expand Down Expand Up @@ -289,6 +343,8 @@ public String getCommunicationMethodName() {
*/
public void resynchronize() {
if (requestResynchronize()) {
messageQueue.clear();
resetTimer();
sendInvocationsToServer();
}
}
Expand All @@ -311,12 +367,29 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
pushPendingMessage = null;
}
if (!messageQueue.isEmpty()) {
synchronized (messageQueue) {
// If queued message is the expected one. remove from queue
// and sen next message if any.
if (messageQueue.get(0)
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID)
+ 1 == nextExpectedId) {
resetTimer();
messageQueue.remove(0);
if (!messageQueue.isEmpty()) {
sendPayload(messageQueue.get(0));
}
}
}
}
return;
}
if (force) {
Console.debug(
"Forced update of clientId to " + clientToServerMessageId);
clientToServerMessageId = nextExpectedId;
messageQueue.clear();
resetTimer();
return;
}

Expand Down

0 comments on commit 5b7dfd6

Please sign in to comment.