Skip to content

Commit

Permalink
CB-3508 create events for session async tasks (#3036)
Browse files Browse the repository at this point in the history
* CB-3508 create events for session async tasks

* CB-3508 remove redundant events

* CB-3508 update async web task event logic

* CB-6000 refactor: migrate AsyncTask code to core-root package

* CB-6000: fix

* CB-3508 use only update async task event

* CB-3508 add result info to async task event

* CB-6000 feat: change the way of updating task state

* CB-6000 refactor: rename type and property

* CB-6000 fix: use id from server

* CB-3508 do not have results in task info event

* CB-6000 changes after BE update

* CB-6000 fix: set only status on status update

* CB-3508 fixes after review

* CB-6000 refactor: task id

return back task id generation and saving on creation. Added second map filled on running and linking backend Id and frontend Id. Added helper method to get task

* CB-6000 refactor: handle race condition

add pending events map to save last status if it's came earlier than we saved the task

* CB-6000 refactor: AsyncTask creation

* CB-3508 fixes build

* CB-6000 fix: retry update task info later if locked

* CB-6000 refactor: add a comment about delayed update

---------

Co-authored-by: Andrey Sychev <nukemoore@gmail.com>
Co-authored-by: Sychev Andrey <44414066+SychevAndrey@users.noreply.github.com>
Co-authored-by: mr-anton-t <42037741+mr-anton-t@users.noreply.github.com>
Co-authored-by: sergeyteleshev <iamsergeyteleshev@gmail.com>
Co-authored-by: Daria Marutkina <125263541+dariamarutkina@users.noreply.github.com>
  • Loading branch information
6 people authored Dec 24, 2024
1 parent 449b66f commit 78aafaf
Show file tree
Hide file tree
Showing 29 changed files with 350 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,41 @@
*/
package io.cloudbeaver.model;

import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.model.runtime.AbstractJob;

/**
* Web connection info
* Web async task info
*/
public class WebAsyncTaskInfo {

private String id;
private String name;
private boolean running;
@NotNull
private final String id;
@NotNull
private final String name;
private boolean running = false;
private Object result;
private Object extendedResult;
private String status;
private Throwable jobError;

private AbstractJob job;

public WebAsyncTaskInfo(String id, String name) {
public WebAsyncTaskInfo(@NotNull String id, @NotNull String name) {
this.id = id;
this.name = name;
}

@NotNull
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

@NotNull
public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public boolean isRunning() {
return running;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import io.cloudbeaver.model.WebServerMessage;
import io.cloudbeaver.model.app.ServletApplication;
import io.cloudbeaver.model.app.ServletAuthApplication;
import io.cloudbeaver.model.session.monitor.TaskProgressMonitor;
import io.cloudbeaver.model.user.WebUser;
import io.cloudbeaver.service.DBWSessionHandler;
import io.cloudbeaver.service.sql.WebSQLConstants;
import io.cloudbeaver.utils.CBModelConstants;
import io.cloudbeaver.utils.WebDataSourceUtils;
import io.cloudbeaver.utils.WebEventUtils;
import org.eclipse.core.runtime.IAdaptable;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
Expand All @@ -57,7 +59,6 @@
import org.jkiss.dbeaver.model.runtime.AbstractJob;
import org.jkiss.dbeaver.model.runtime.BaseProgressMonitor;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.ProxyProgressMonitor;
import org.jkiss.dbeaver.model.security.SMAdminController;
import org.jkiss.dbeaver.model.security.SMConstants;
import org.jkiss.dbeaver.model.security.SMController;
Expand Down Expand Up @@ -511,7 +512,7 @@ public DBRProgressMonitor getProgressMonitor() {
///////////////////////////////////////////////////////
// Async model

public WebAsyncTaskInfo getAsyncTask(String taskId, String taskName, boolean create) {
public WebAsyncTaskInfo getAsyncTask(@NotNull String taskId, @NotNull String taskName, boolean create) {
synchronized (asyncTasks) {
WebAsyncTaskInfo taskInfo = asyncTasks.get(taskId);
if (taskInfo == null && create) {
Expand All @@ -528,7 +529,6 @@ public WebAsyncTaskInfo asyncTaskStatus(String taskId, boolean removeOnFinish) t
if (taskInfo == null) {
throw new DBWebException("Task '" + taskId + "' not found");
}
taskInfo.setRunning(taskInfo.getJob() != null && !taskInfo.getJob().isFinished());
if (removeOnFinish && !taskInfo.isRunning()) {
asyncTasks.remove(taskId);
}
Expand All @@ -551,7 +551,7 @@ public boolean asyncTaskCancel(String taskId) throws DBWebException {
return true;
}

public WebAsyncTaskInfo createAndRunAsyncTask(String taskName, WebAsyncTaskProcessor<?> runnable) {
public WebAsyncTaskInfo createAndRunAsyncTask(@NotNull String taskName, @NotNull WebAsyncTaskProcessor<?> runnable) {
int taskId = TASK_ID.incrementAndGet();
WebAsyncTaskInfo asyncTask = getAsyncTask(String.valueOf(taskId), taskName, true);

Expand All @@ -560,7 +560,8 @@ public WebAsyncTaskInfo createAndRunAsyncTask(String taskName, WebAsyncTaskProce
protected IStatus run(DBRProgressMonitor monitor) {
int curTaskCount = taskCount.incrementAndGet();

TaskProgressMonitor taskMonitor = new TaskProgressMonitor(monitor, asyncTask);
DBRProgressMonitor taskMonitor = new TaskProgressMonitor(monitor, WebSession.this, asyncTask);

try {
Number queryLimit = application.getAppConfiguration().getResourceQuota(WebSQLConstants.QUOTA_PROP_QUERY_LIMIT);
if (queryLimit != null && curTaskCount > queryLimit.intValue()) {
Expand All @@ -572,14 +573,15 @@ protected IStatus run(DBRProgressMonitor monitor) {
asyncTask.setResult(runnable.getResult());
asyncTask.setExtendedResult(runnable.getExtendedResults());
asyncTask.setStatus("Finished");
asyncTask.setRunning(false);
} catch (InvocationTargetException e) {
addSessionError(e.getTargetException());
asyncTask.setJobError(e.getTargetException());
} catch (Exception e) {
asyncTask.setJobError(e);
} finally {
taskCount.decrementAndGet();
asyncTask.setRunning(false);
WebEventUtils.sendAsyncTaskEvent(WebSession.this, asyncTask);
}
return Status.OK_STATUS;
}
Expand Down Expand Up @@ -989,27 +991,6 @@ public void subTask(String name) {
}
}

private static class TaskProgressMonitor extends ProxyProgressMonitor {

private final WebAsyncTaskInfo asyncTask;

public TaskProgressMonitor(DBRProgressMonitor original, WebAsyncTaskInfo asyncTask) {
super(original);
this.asyncTask = asyncTask;
}

@Override
public void beginTask(String name, int totalWork) {
super.beginTask(name, totalWork);
asyncTask.setStatus(name);
}

@Override
public void subTask(String name) {
super.subTask(name);
asyncTask.setStatus(name);
}
}

private record PersistentAttribute(Object value) {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2024 DBeaver Corp and others
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudbeaver.model.session.monitor;

import io.cloudbeaver.model.WebAsyncTaskInfo;
import io.cloudbeaver.model.session.WebSession;
import io.cloudbeaver.utils.WebEventUtils;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor;
import org.jkiss.dbeaver.model.runtime.ProxyProgressMonitor;

/**
* Task progress monitor.
* Used by async GQL requests.
*/
public class TaskProgressMonitor extends ProxyProgressMonitor {

@NotNull
private final WebAsyncTaskInfo asyncTask;
private final WebSession webSession;

public TaskProgressMonitor(DBRProgressMonitor original, @NotNull WebSession webSession, @NotNull WebAsyncTaskInfo asyncTask) {
super(original);
this.webSession = webSession;
this.asyncTask = asyncTask;
}

@Override
public void beginTask(String name, int totalWork) {
super.beginTask(name, totalWork);
asyncTask.setStatus(name);
WebEventUtils.sendAsyncTaskEvent(webSession, asyncTask);
}

@Override
public void subTask(String name) {
super.subTask(name);
asyncTask.setStatus(name);
WebEventUtils.sendAsyncTaskEvent(webSession, asyncTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package io.cloudbeaver.utils;

import io.cloudbeaver.model.WebAsyncTaskInfo;
import io.cloudbeaver.model.session.WebSession;
import org.jkiss.code.NotNull;
import org.jkiss.dbeaver.model.app.DBPProject;
import org.jkiss.dbeaver.model.websocket.WSConstants;
import org.jkiss.dbeaver.model.websocket.event.WSEvent;
Expand All @@ -25,6 +27,7 @@
import org.jkiss.dbeaver.model.websocket.event.datasource.WSDatasourceFolderEvent;
import org.jkiss.dbeaver.model.websocket.event.resource.WSResourceProperty;
import org.jkiss.dbeaver.model.websocket.event.resource.WSResourceUpdatedEvent;
import org.jkiss.dbeaver.model.websocket.event.session.WSSessionTaskInfoEvent;

import java.util.List;

Expand Down Expand Up @@ -182,4 +185,14 @@ public static void addRmResourceUpdatedEvent(
ServletAppUtils.getServletApplication().getEventController().addEvent(event);
}

public static void sendAsyncTaskEvent(@NotNull WebSession webSession, @NotNull WebAsyncTaskInfo taskInfo) {
webSession.addSessionEvent(
new WSSessionTaskInfoEvent(
taskInfo.getId(),
taskInfo.getStatus(),
taskInfo.isRunning()
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ enum CBServerEventId {
cb_object_permissions_updated,
cb_subject_permissions_updated,

cb_database_output_log_updated
cb_database_output_log_updated,

cb_session_task_info_updated @since(version: "24.3.1")
}

# Events sent by client
Expand All @@ -56,6 +58,9 @@ enum CBEventTopic {
cb_object_permissions,
cb_subject_permissions,
cb_database_output_log,

cb_session_task, @since(version: "24.3.1")

cb_datasource_connection,
cb_delete_temp_folder
}
Expand Down Expand Up @@ -182,6 +187,14 @@ type WSOutputLogInfo {
# Add more fields as needed
}

# Async task info status event
type WSAsyncTaskInfo @since(version: "24.3.1") {
id: CBServerEventId!
taskId: ID!
statusName: String
running: Boolean!
}

# Datasource disconnect event
type WSDataSourceDisconnectEvent implements CBServerEvent {
id: CBServerEventId!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import org.jkiss.dbeaver.model.data.DBDDocument;
import org.jkiss.dbeaver.model.exec.DBCException;
import org.jkiss.dbeaver.model.meta.Property;
import org.jkiss.utils.Pair;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Web SQL query results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import { computed, makeObservable, observable } from 'mobx';

import type { ITask, TaskScheduler } from '@cloudbeaver/core-executor';
import type { AsyncTaskInfo, AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk';
import { AsyncTaskInfoService } from '@cloudbeaver/core-root';
import type { AsyncTaskInfo, GraphQLService } from '@cloudbeaver/core-sdk';

import type { ConnectionExecutionContextResource, IConnectionExecutionContextInfo } from './ConnectionExecutionContextResource.js';
import type { IConnectionExecutionContext } from './IConnectionExecutionContext.js';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import { injectable } from '@cloudbeaver/core-di';
import { TaskScheduler } from '@cloudbeaver/core-executor';
import { CachedMapAllKey, ResourceKeyUtils } from '@cloudbeaver/core-resource';
import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk';
import { AsyncTaskInfoService } from '@cloudbeaver/core-root';
import { GraphQLService } from '@cloudbeaver/core-sdk';
import { MetadataMap } from '@cloudbeaver/core-utils';

import type { IConnectionInfoParams } from '../CONNECTION_INFO_PARAM_SCHEMA.js';
Expand Down
Loading

0 comments on commit 78aafaf

Please sign in to comment.