Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: queue message payloads #20749

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ protected void handleJSON(final ValueMap valueMap) {
}

/**
* Should only prepare resync after the if (locked ||
* Should only prepare resync after the (locked ||
* !isNextExpectedMessage(serverId)) {...} since
* stateTree.repareForResync() will remove the nodes, and if locked is
* true, it will return without handling the message, thus won't adding
* true, it will return without handling the message, thus won't add
* nodes back.
*
* This is related to https://github.com/vaadin/flow/issues/8699 It
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package com.vaadin.client.communication;

import java.util.ArrayList;
import java.util.List;

import com.google.gwt.core.client.GWT;
import com.google.gwt.user.client.Timer;

import com.vaadin.client.ConnectionIndicator;
import com.vaadin.client.Console;
Expand Down Expand Up @@ -67,6 +71,10 @@ public enum ResynchronizationState {

private JsonObject pushPendingMessage;

private List<JsonObject> messageQueue = new ArrayList<>();

private Timer resendMessageTimer;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -121,6 +129,12 @@ private void doSendInvocationsToServer() {
registry.getRequestResponseTracker().startRequest();
send(payload);
return;
} else if (hasQueuedMessages() && resendMessageTimer == null) {
if (!registry.getRequestResponseTracker().hasActiveRequest()) {
registry.getRequestResponseTracker().startRequest();
}
sendPayload(messageQueue.get(0));
return;
}

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
Expand All @@ -146,6 +160,8 @@ private void doSendInvocationsToServer() {
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.warn("Resynchronizing from server");
messageQueue.clear();
resetTimer();
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
Expand All @@ -166,7 +182,6 @@ protected void send(final JsonArray reqInvocations,
final JsonObject extraJson) {
registry.getRequestResponseTracker().startRequest();
send(preparePayload(reqInvocations, extraJson));

}

private JsonObject preparePayload(final JsonArray reqInvocations,
Expand All @@ -177,10 +192,6 @@ private JsonObject preparePayload(final JsonArray reqInvocations,
payload.put(ApplicationConstants.CSRF_TOKEN, csrfToken);
}
payload.put(ApplicationConstants.RPC_INVOCATIONS, reqInvocations);
payload.put(ApplicationConstants.SERVER_SYNC_ID,
registry.getMessageHandler().getLastSeenServerSyncId());
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);
if (extraJson != null) {
for (String key : extraJson.keys()) {
JsonValue value = extraJson.get(key);
Expand All @@ -192,12 +203,34 @@ private JsonObject preparePayload(final JsonArray reqInvocations,

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
* given URI. Adds message to message queue and postpones sending if queue
* not empty.
*
* @param payload
* The contents of the request to send
*/
public void send(final JsonObject payload) {
if (!messageQueue.isEmpty()) {
messageQueue.add(payload);
return;
}
messageQueue.add(payload);
sendPayload(payload);
}

/**
* Sends an asynchronous or synchronous UIDL request to the server using the
* given URI.
*
* @param payload
* The contents of the request to send
*/
private void sendPayload(final JsonObject payload) {
payload.put(ApplicationConstants.SERVER_SYNC_ID,
registry.getMessageHandler().getLastSeenServerSyncId());
payload.put(ApplicationConstants.CLIENT_TO_SERVER_ID,
clientToServerMessageId++);

if (push != null && push.isBidirectional()) {
// When using bidirectional transport, the payload is not resent
// to the server during reconnection attempts.
Expand All @@ -211,6 +244,31 @@ public void send(final JsonObject payload) {
} else {
Console.debug("send XHR");
registry.getXhrConnection().send(payload);

resetTimer();
// resend last payload if response hasn't come in.
resendMessageTimer = new Timer() {
@Override
public void run() {
resendMessageTimer
.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
if (!registry.getRequestResponseTracker()
.hasActiveRequest()) {
registry.getRequestResponseTracker().startRequest();
}
registry.getXhrConnection().send(payload);
}
};
resendMessageTimer.schedule(registry.getApplicationConfiguration()
.getMaxMessageSuspendTimeout() + 500);
}
}

private void resetTimer() {
if (resendMessageTimer != null) {
resendMessageTimer.cancel();
resendMessageTimer = null;
}
}

Expand Down Expand Up @@ -289,6 +347,8 @@ public String getCommunicationMethodName() {
*/
public void resynchronize() {
if (requestResynchronize()) {
messageQueue.clear();
resetTimer();
sendInvocationsToServer();
}
}
Expand All @@ -311,12 +371,26 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
ApplicationConstants.CLIENT_TO_SERVER_ID) < nextExpectedId) {
pushPendingMessage = null;
}
if (!messageQueue.isEmpty()) {
synchronized (messageQueue) {
// If queued message is the expected one. remove from queue
// and sen next message if any.
if (messageQueue.get(0)
.getNumber(ApplicationConstants.CLIENT_TO_SERVER_ID)
+ 1 == nextExpectedId) {
resetTimer();
messageQueue.remove(0);
}
}
}
return;
}
if (force) {
Console.debug(
"Forced update of clientId to " + clientToServerMessageId);
clientToServerMessageId = nextExpectedId;
messageQueue.clear();
resetTimer();
return;
}

Expand All @@ -337,6 +411,7 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
} else {
// Server has not yet seen all our messages
// Do nothing as they will arrive eventually
resetTimer();
}
}

Expand Down Expand Up @@ -372,4 +447,8 @@ void clearResynchronizationState() {
ResynchronizationState getResynchronizationState() {
return resynchronizationState;
}

public boolean hasQueuedMessages() {
return !messageQueue.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ public void endRequest() {
hasActiveRequest = false;

if ((registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending())
&& (registry.getServerRpcQueue().isFlushPending())
|| registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER
|| registry.getMessageSender().hasQueuedMessages())) {
// Send the pending RPCs immediately.
// This might be an unnecessary optimization as ServerRpcQueue has a
// finally scheduled command which trigger the send if we do not do
Expand Down
1 change: 1 addition & 0 deletions flow-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
<module>test-react-adapter</module>
<module>test-react-adapter/pom-production.xml</module>
<module>test-legacy-frontend</module>
<module>test-client-queue</module>
</modules>
</profile>
<profile>
Expand Down
94 changes: 94 additions & 0 deletions flow-tests/test-client-queue/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flow-tests</artifactId>
<groupId>com.vaadin</groupId>
<version>24.7-SNAPSHOT</version>
</parent>
<artifactId>flow-client-queue-test</artifactId>
<name>Test Flow client queue</name>

<packaging>war</packaging>

<dependencies>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>flow-test-resources</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>vaadin-dev-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.vaadin</groupId>
<artifactId>flow-html-components-testbench</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Auto run clean to remove previous mode compilation -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<executions>
<execution>
<id>auto-clean</id>
<phase>initialize</phase>
<goals>
<goal>clean</goal>
</goals>
<!-- <configuration>-->
<!-- <filesets>-->
<!-- <fileset>-->
<!-- <directory>${project.basedir}</directory>-->
<!-- <includes>-->
<!-- <include>package*.json</include>-->
<!-- <include>types.d.ts</include>-->
<!-- <include>tsconfig.json</include>-->
<!-- <include>pnpm-lock.yaml</include>-->
<!-- <include>bun.lockb</include>-->
<!-- <include>node_modules/**</include>-->
<!-- </includes>-->
<!-- </fileset>-->
<!-- </filesets>-->
<!-- </configuration>-->
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Run flow plugin to build frontend -->
<plugin>
<groupId>com.vaadin</groupId>
<artifactId>flow-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>prepare-frontend</goal>
<!-- <goal>build-frontend</goal>-->
</goals>
</execution>
</executions>
<configuration>
<frontendHotdeploy>true</frontendHotdeploy>
</configuration>
</plugin>
<!-- Run jetty before integration tests, and stop after -->
<plugin>
<groupId>org.eclipse.jetty.ee10</groupId>
<artifactId>jetty-ee10-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Loading
Loading