diff --git a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/WebAsyncTaskInfo.java b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/WebAsyncTaskInfo.java index 27ad56cb38..e168a54597 100644 --- a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/WebAsyncTaskInfo.java +++ b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/WebAsyncTaskInfo.java @@ -16,16 +16,19 @@ */ package io.cloudbeaver.model; +import org.jkiss.code.NotNull; import org.jkiss.dbeaver.model.runtime.AbstractJob; /** - * Web connection info + * Web async task info */ public class WebAsyncTaskInfo { - private String id; - private String name; - private boolean running; + @NotNull + private final String id; + @NotNull + private final String name; + private boolean running = false; private Object result; private Object extendedResult; private String status; @@ -33,27 +36,21 @@ public class WebAsyncTaskInfo { private AbstractJob job; - public WebAsyncTaskInfo(String id, String name) { + public WebAsyncTaskInfo(@NotNull String id, @NotNull String name) { this.id = id; this.name = name; } + @NotNull public String getId() { return id; } - public void setId(String id) { - this.id = id; - } - + @NotNull public String getName() { return name; } - public void setName(String name) { - this.name = name; - } - public boolean isRunning() { return running; } diff --git a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java index 9a27954e00..2fa8b79262 100644 --- a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java +++ b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/WebSession.java @@ -26,11 +26,13 @@ import io.cloudbeaver.model.WebServerMessage; import io.cloudbeaver.model.app.ServletApplication; import io.cloudbeaver.model.app.ServletAuthApplication; +import io.cloudbeaver.model.session.monitor.TaskProgressMonitor; import io.cloudbeaver.model.user.WebUser; import io.cloudbeaver.service.DBWSessionHandler; import io.cloudbeaver.service.sql.WebSQLConstants; import io.cloudbeaver.utils.CBModelConstants; import io.cloudbeaver.utils.WebDataSourceUtils; +import io.cloudbeaver.utils.WebEventUtils; import org.eclipse.core.runtime.IAdaptable; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; @@ -57,7 +59,6 @@ import org.jkiss.dbeaver.model.runtime.AbstractJob; import org.jkiss.dbeaver.model.runtime.BaseProgressMonitor; import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor; -import org.jkiss.dbeaver.model.runtime.ProxyProgressMonitor; import org.jkiss.dbeaver.model.security.SMAdminController; import org.jkiss.dbeaver.model.security.SMConstants; import org.jkiss.dbeaver.model.security.SMController; @@ -511,7 +512,7 @@ public DBRProgressMonitor getProgressMonitor() { /////////////////////////////////////////////////////// // Async model - public WebAsyncTaskInfo getAsyncTask(String taskId, String taskName, boolean create) { + public WebAsyncTaskInfo getAsyncTask(@NotNull String taskId, @NotNull String taskName, boolean create) { synchronized (asyncTasks) { WebAsyncTaskInfo taskInfo = asyncTasks.get(taskId); if (taskInfo == null && create) { @@ -528,7 +529,6 @@ public WebAsyncTaskInfo asyncTaskStatus(String taskId, boolean removeOnFinish) t if (taskInfo == null) { throw new DBWebException("Task '" + taskId + "' not found"); } - taskInfo.setRunning(taskInfo.getJob() != null && !taskInfo.getJob().isFinished()); if (removeOnFinish && !taskInfo.isRunning()) { asyncTasks.remove(taskId); } @@ -551,7 +551,7 @@ public boolean asyncTaskCancel(String taskId) throws DBWebException { return true; } - public WebAsyncTaskInfo createAndRunAsyncTask(String taskName, WebAsyncTaskProcessor runnable) { + public WebAsyncTaskInfo createAndRunAsyncTask(@NotNull String taskName, @NotNull WebAsyncTaskProcessor runnable) { int taskId = TASK_ID.incrementAndGet(); WebAsyncTaskInfo asyncTask = getAsyncTask(String.valueOf(taskId), taskName, true); @@ -560,7 +560,8 @@ public WebAsyncTaskInfo createAndRunAsyncTask(String taskName, WebAsyncTaskProce protected IStatus run(DBRProgressMonitor monitor) { int curTaskCount = taskCount.incrementAndGet(); - TaskProgressMonitor taskMonitor = new TaskProgressMonitor(monitor, asyncTask); + DBRProgressMonitor taskMonitor = new TaskProgressMonitor(monitor, WebSession.this, asyncTask); + try { Number queryLimit = application.getAppConfiguration().getResourceQuota(WebSQLConstants.QUOTA_PROP_QUERY_LIMIT); if (queryLimit != null && curTaskCount > queryLimit.intValue()) { @@ -572,7 +573,6 @@ protected IStatus run(DBRProgressMonitor monitor) { asyncTask.setResult(runnable.getResult()); asyncTask.setExtendedResult(runnable.getExtendedResults()); asyncTask.setStatus("Finished"); - asyncTask.setRunning(false); } catch (InvocationTargetException e) { addSessionError(e.getTargetException()); asyncTask.setJobError(e.getTargetException()); @@ -580,6 +580,8 @@ protected IStatus run(DBRProgressMonitor monitor) { asyncTask.setJobError(e); } finally { taskCount.decrementAndGet(); + asyncTask.setRunning(false); + WebEventUtils.sendAsyncTaskEvent(WebSession.this, asyncTask); } return Status.OK_STATUS; } @@ -989,27 +991,6 @@ public void subTask(String name) { } } - private static class TaskProgressMonitor extends ProxyProgressMonitor { - - private final WebAsyncTaskInfo asyncTask; - - public TaskProgressMonitor(DBRProgressMonitor original, WebAsyncTaskInfo asyncTask) { - super(original); - this.asyncTask = asyncTask; - } - - @Override - public void beginTask(String name, int totalWork) { - super.beginTask(name, totalWork); - asyncTask.setStatus(name); - } - - @Override - public void subTask(String name) { - super.subTask(name); - asyncTask.setStatus(name); - } - } private record PersistentAttribute(Object value) { } diff --git a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/monitor/TaskProgressMonitor.java b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/monitor/TaskProgressMonitor.java new file mode 100644 index 0000000000..d8be4778a7 --- /dev/null +++ b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/model/session/monitor/TaskProgressMonitor.java @@ -0,0 +1,55 @@ +/* + * DBeaver - Universal Database Manager + * Copyright (C) 2010-2024 DBeaver Corp and others + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudbeaver.model.session.monitor; + +import io.cloudbeaver.model.WebAsyncTaskInfo; +import io.cloudbeaver.model.session.WebSession; +import io.cloudbeaver.utils.WebEventUtils; +import org.jkiss.code.NotNull; +import org.jkiss.dbeaver.model.runtime.DBRProgressMonitor; +import org.jkiss.dbeaver.model.runtime.ProxyProgressMonitor; + +/** + * Task progress monitor. + * Used by async GQL requests. + */ +public class TaskProgressMonitor extends ProxyProgressMonitor { + + @NotNull + private final WebAsyncTaskInfo asyncTask; + private final WebSession webSession; + + public TaskProgressMonitor(DBRProgressMonitor original, @NotNull WebSession webSession, @NotNull WebAsyncTaskInfo asyncTask) { + super(original); + this.webSession = webSession; + this.asyncTask = asyncTask; + } + + @Override + public void beginTask(String name, int totalWork) { + super.beginTask(name, totalWork); + asyncTask.setStatus(name); + WebEventUtils.sendAsyncTaskEvent(webSession, asyncTask); + } + + @Override + public void subTask(String name) { + super.subTask(name); + asyncTask.setStatus(name); + WebEventUtils.sendAsyncTaskEvent(webSession, asyncTask); + } +} \ No newline at end of file diff --git a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/utils/WebEventUtils.java b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/utils/WebEventUtils.java index 2e51485347..9a83accafd 100644 --- a/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/utils/WebEventUtils.java +++ b/server/bundles/io.cloudbeaver.model/src/io/cloudbeaver/utils/WebEventUtils.java @@ -16,7 +16,9 @@ */ package io.cloudbeaver.utils; +import io.cloudbeaver.model.WebAsyncTaskInfo; import io.cloudbeaver.model.session.WebSession; +import org.jkiss.code.NotNull; import org.jkiss.dbeaver.model.app.DBPProject; import org.jkiss.dbeaver.model.websocket.WSConstants; import org.jkiss.dbeaver.model.websocket.event.WSEvent; @@ -25,6 +27,7 @@ import org.jkiss.dbeaver.model.websocket.event.datasource.WSDatasourceFolderEvent; import org.jkiss.dbeaver.model.websocket.event.resource.WSResourceProperty; import org.jkiss.dbeaver.model.websocket.event.resource.WSResourceUpdatedEvent; +import org.jkiss.dbeaver.model.websocket.event.session.WSSessionTaskInfoEvent; import java.util.List; @@ -182,4 +185,14 @@ public static void addRmResourceUpdatedEvent( ServletAppUtils.getServletApplication().getEventController().addEvent(event); } + public static void sendAsyncTaskEvent(@NotNull WebSession webSession, @NotNull WebAsyncTaskInfo taskInfo) { + webSession.addSessionEvent( + new WSSessionTaskInfoEvent( + taskInfo.getId(), + taskInfo.getStatus(), + taskInfo.isRunning() + ) + ); + } + } \ No newline at end of file diff --git a/server/bundles/io.cloudbeaver.server/schema/service.events.graphqls b/server/bundles/io.cloudbeaver.server/schema/service.events.graphqls index a970eae7f4..b82da85fdd 100644 --- a/server/bundles/io.cloudbeaver.server/schema/service.events.graphqls +++ b/server/bundles/io.cloudbeaver.server/schema/service.events.graphqls @@ -33,7 +33,9 @@ enum CBServerEventId { cb_object_permissions_updated, cb_subject_permissions_updated, - cb_database_output_log_updated + cb_database_output_log_updated, + + cb_session_task_info_updated @since(version: "24.3.1") } # Events sent by client @@ -56,6 +58,9 @@ enum CBEventTopic { cb_object_permissions, cb_subject_permissions, cb_database_output_log, + + cb_session_task, @since(version: "24.3.1") + cb_datasource_connection, cb_delete_temp_folder } @@ -182,6 +187,14 @@ type WSOutputLogInfo { # Add more fields as needed } +# Async task info status event +type WSAsyncTaskInfo @since(version: "24.3.1") { + id: CBServerEventId! + taskId: ID! + statusName: String + running: Boolean! +} + # Datasource disconnect event type WSDataSourceDisconnectEvent implements CBServerEvent { id: CBServerEventId! diff --git a/server/bundles/io.cloudbeaver.server/src/io/cloudbeaver/service/sql/WebSQLQueryResults.java b/server/bundles/io.cloudbeaver.server/src/io/cloudbeaver/service/sql/WebSQLQueryResults.java index 8dd7690a51..1140ed2304 100644 --- a/server/bundles/io.cloudbeaver.server/src/io/cloudbeaver/service/sql/WebSQLQueryResults.java +++ b/server/bundles/io.cloudbeaver.server/src/io/cloudbeaver/service/sql/WebSQLQueryResults.java @@ -25,11 +25,9 @@ import org.jkiss.dbeaver.model.data.DBDDocument; import org.jkiss.dbeaver.model.exec.DBCException; import org.jkiss.dbeaver.model.meta.Property; -import org.jkiss.utils.Pair; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * Web SQL query results. diff --git a/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContext.ts b/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContext.ts index 923974606e..2416dd02ee 100644 --- a/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContext.ts +++ b/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContext.ts @@ -8,7 +8,8 @@ import { computed, makeObservable, observable } from 'mobx'; import type { ITask, TaskScheduler } from '@cloudbeaver/core-executor'; -import type { AsyncTaskInfo, AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import type { AsyncTaskInfo, GraphQLService } from '@cloudbeaver/core-sdk'; import type { ConnectionExecutionContextResource, IConnectionExecutionContextInfo } from './ConnectionExecutionContextResource.js'; import type { IConnectionExecutionContext } from './IConnectionExecutionContext.js'; diff --git a/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContextService.ts b/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContextService.ts index 1cdf42acb3..3d367a3c64 100644 --- a/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContextService.ts +++ b/webapp/packages/core-connections/src/ConnectionExecutionContext/ConnectionExecutionContextService.ts @@ -8,7 +8,8 @@ import { injectable } from '@cloudbeaver/core-di'; import { TaskScheduler } from '@cloudbeaver/core-executor'; import { CachedMapAllKey, ResourceKeyUtils } from '@cloudbeaver/core-resource'; -import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import { GraphQLService } from '@cloudbeaver/core-sdk'; import { MetadataMap } from '@cloudbeaver/core-utils'; import type { IConnectionInfoParams } from '../CONNECTION_INFO_PARAM_SCHEMA.js'; diff --git a/webapp/packages/core-sdk/src/AsyncTask/AsyncTask.ts b/webapp/packages/core-root/src/AsyncTask/AsyncTask.ts similarity index 75% rename from webapp/packages/core-sdk/src/AsyncTask/AsyncTask.ts rename to webapp/packages/core-root/src/AsyncTask/AsyncTask.ts index 92da43a00c..3b5e84d0fa 100644 --- a/webapp/packages/core-sdk/src/AsyncTask/AsyncTask.ts +++ b/webapp/packages/core-root/src/AsyncTask/AsyncTask.ts @@ -8,14 +8,10 @@ import { computed, makeObservable, observable } from 'mobx'; import { type ISyncExecutor, SyncExecutor } from '@cloudbeaver/core-executor'; +import { type AsyncTaskInfo, ServerInternalError, type WsAsyncTaskInfo } from '@cloudbeaver/core-sdk'; import { uuid } from '@cloudbeaver/core-utils'; -import type { AsyncTaskInfo } from '../sdk.js'; -import { ServerInternalError } from '../ServerInternalError.js'; - export class AsyncTask { - readonly id: string; - get cancelled(): boolean { return this._cancelled; } @@ -32,8 +28,13 @@ export class AsyncTask { return this.innerPromise; } + get id(): string { + return this._id; + } + readonly onStatusChange: ISyncExecutor; + private _id: string; private _cancelled: boolean; private taskInfo: AsyncTaskInfo | null; private resolve!: (value: AsyncTaskInfo) => void; @@ -41,11 +42,11 @@ export class AsyncTask { private readonly innerPromise: Promise; private updatingAsync: boolean; private readonly init: () => Promise; - private readonly cancel: (info: AsyncTaskInfo) => Promise; + private readonly cancel: (id: string) => Promise; private initPromise: Promise | null; - constructor(init: () => Promise, cancel: (info: AsyncTaskInfo) => Promise) { - this.id = uuid(); + constructor(init: () => Promise, cancel: (id: string) => Promise) { + this._id = uuid(); this.init = init; this.cancel = cancel; this._cancelled = false; @@ -78,14 +79,20 @@ export class AsyncTask { } async updateInfoAsync(getter: (task: AsyncTask) => Promise): Promise { + const init = this.info === null; + if (this.updatingAsync) { + if (!init) { + /* With websockets we encounter a situation when we receive status update before the task is initialized. + We save the update in pendingEvents, but we can't update the task because updatingAsync is still true, + so we need to wait a bit before retrying */ + setTimeout(() => this.updateInfoAsync.call(this, getter), 100); + } return; } this.updatingAsync = true; try { - const init = this.info === null; - if (this._cancelled && init) { throw new Error('Task was cancelled'); } @@ -119,6 +126,13 @@ export class AsyncTask { } } + public updateStatus(info: WsAsyncTaskInfo): void { + if (this.taskInfo) { + this.taskInfo.status = info.statusName; + this.onStatusChange.execute(this.taskInfo); + } + } + private updateInfo(info: AsyncTaskInfo): void { this.taskInfo = info; @@ -134,7 +148,7 @@ export class AsyncTask { private async cancelTask(): Promise { if (this.info) { - await this.cancel(this.info); + await this.cancel(this.info.id); } } } diff --git a/webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoEventHandler.ts b/webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoEventHandler.ts new file mode 100644 index 0000000000..00673e5ead --- /dev/null +++ b/webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoEventHandler.ts @@ -0,0 +1,23 @@ +/* + * CloudBeaver - Cloud Database Manager + * Copyright (C) 2020-2024 DBeaver Corp and others + * + * Licensed under the Apache License, Version 2.0. + * you may not use this file except in compliance with the License. + */ +import { injectable } from '@cloudbeaver/core-di'; +import type { WsAsyncTaskInfo } from '@cloudbeaver/core-sdk'; + +import { TopicEventHandler } from '../ServerEventEmitter/TopicEventHandler.js'; +import { type ISessionEvent, type SessionEventId, SessionEventSource, SessionEventTopic } from '../SessionEventSource.js'; + +@injectable() +export class AsyncTaskInfoEventHandler extends TopicEventHandler { + constructor(sessionEventSource: SessionEventSource) { + super(SessionEventTopic.CbSessionTask, sessionEventSource); + } + + map(event: any): WsAsyncTaskInfo { + return event; + } +} diff --git a/webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoService.ts b/webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoService.ts new file mode 100644 index 0000000000..9f94d3331c --- /dev/null +++ b/webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoService.ts @@ -0,0 +1,157 @@ +/* + * CloudBeaver - Cloud Database Manager + * Copyright (C) 2020-2024 DBeaver Corp and others + * + * Licensed under the Apache License, Version 2.0. + * you may not use this file except in compliance with the License. + */ +import type { Subscription } from 'rxjs'; + +import { Disposable, injectable } from '@cloudbeaver/core-di'; +import { type AsyncTaskInfo, GraphQLService, type WsAsyncTaskInfo } from '@cloudbeaver/core-sdk'; + +import type { Unsubscribe } from '../ServerEventEmitter/IServerEventEmitter.js'; +import { ServerEventId } from '../SessionEventSource.js'; +import { AsyncTask } from './AsyncTask.js'; +import { AsyncTaskInfoEventHandler } from './AsyncTaskInfoEventHandler.js'; + +@injectable() +export class AsyncTaskInfoService extends Disposable { + private readonly tasks: Map; + private readonly taskIdAliases: Map; + private readonly pendingEvents: Map; + private connection: Subscription | null; + private onEventUnsubscribe: Unsubscribe | null; + + constructor( + private readonly graphQLService: GraphQLService, + private readonly asyncTaskInfoEventHandler: AsyncTaskInfoEventHandler, + ) { + super(); + this.tasks = new Map(); + this.taskIdAliases = new Map(); + this.pendingEvents = new Map(); + this.connection = null; + this.handleEvent = this.handleEvent.bind(this); + + this.onEventUnsubscribe = asyncTaskInfoEventHandler.onEvent(ServerEventId.CbSessionTaskInfoUpdated, this.handleEvent); + } + + private async updateTask(task: AsyncTask, data: WsAsyncTaskInfo) { + if (data.running === false) { + await task.updateInfoAsync(async () => { + const { taskInfo } = await this.graphQLService.sdk.getAsyncTaskInfo({ + taskId: data.taskId, + removeOnFinish: false, + }); + + return taskInfo; + }); + } else { + task.updateStatus(data); + } + } + + private async handleEvent(data: WsAsyncTaskInfo) { + const task = this.getTask(data.taskId); + + if (!task) { + this.pendingEvents.set(data.taskId, data); + return; + } + + await this.updateTask(task, data); + } + + override dispose(): void { + this.connection?.unsubscribe(); + this.onEventUnsubscribe?.(); + } + + create(getter: () => Promise): AsyncTask { + const task = new AsyncTask(getter, this.cancelTask.bind(this)); + + this.tasks.set(task.id, task); + task.onStatusChange.addHandler(info => { + if (this.taskIdAliases.get(info.id)) { + return; + } + + this.taskIdAliases.set(info.id, task.id); + + const pendingEvent = this.pendingEvents.get(info.id); + if (pendingEvent) { + this.pendingEvents.delete(info.id); + this.updateTask(task, pendingEvent); + } + }); + + if (this.tasks.size === 1) { + this.connection = this.asyncTaskInfoEventHandler.eventsSubject.connect(); + } + + return task; + } + + private getTask(taskId: string): AsyncTask | undefined { + let task = this.tasks.get(taskId); + + if (!task) { + const internalId = this.taskIdAliases.get(taskId); + + if (internalId) { + task = this.tasks.get(internalId); + } + } + + return task; + } + + async run(task: AsyncTask): Promise { + if (task.info === null) { + await task.run(); + } + + return task.promise; + } + + async remove(taskId: string): Promise { + const task = this.getTask(taskId); + + if (!task) { + return; + } + + if (task.pending) { + throw new Error('Cant remove unfinished task'); + } + this.tasks.delete(task.id); + if (task.info) { + this.taskIdAliases.delete(task.info.id); + this.pendingEvents.delete(task.info.id); + } + if (this.tasks.size === 0) { + this.connection?.unsubscribe(); + this.connection = null; + } + + if (task.info !== null) { + await this.graphQLService.sdk.getAsyncTaskInfo({ + taskId: task.info.id, + removeOnFinish: true, + }); + } + } + + async cancel(taskId: string): Promise { + const task = this.getTask(taskId); + + await task?.cancelAsync(); + } + + private async cancelTask(id: string): Promise { + await this.graphQLService.sdk.asyncTaskCancel({ + taskId: id, + }); + } +} diff --git a/webapp/packages/core-root/src/ServerEventEmitter/IServerEventEmitter.ts b/webapp/packages/core-root/src/ServerEventEmitter/IServerEventEmitter.ts index e1676861ec..3bd5897217 100644 --- a/webapp/packages/core-root/src/ServerEventEmitter/IServerEventEmitter.ts +++ b/webapp/packages/core-root/src/ServerEventEmitter/IServerEventEmitter.ts @@ -10,7 +10,7 @@ import type { Observable } from 'rxjs'; import type { ISyncExecutor } from '@cloudbeaver/core-executor'; export type IServerEventCallback = (data: T) => any; -export type Subscription = () => void; +export type Unsubscribe = () => void; export interface IBaseServerEvent { id: TID; @@ -25,9 +25,9 @@ export interface IServerEventEmitter< > { readonly onInit: ISyncExecutor; - onEvent(id: TEventID, callback: IServerEventCallback, mapTo?: (event: TEvent) => T): Subscription; + onEvent(id: TEventID, callback: IServerEventCallback, mapTo?: (event: TEvent) => T): Unsubscribe; - on(callback: IServerEventCallback, mapTo?: (event: TEvent) => T, filter?: (event: TEvent) => boolean): Subscription; + on(callback: IServerEventCallback, mapTo?: (event: TEvent) => T, filter?: (event: TEvent) => boolean): Unsubscribe; multiplex(topicId: TTopic, mapTo?: (event: TEvent) => T): Observable; diff --git a/webapp/packages/core-root/src/ServerEventEmitter/TopicEventHandler.ts b/webapp/packages/core-root/src/ServerEventEmitter/TopicEventHandler.ts index cd0c19714d..0b264835a8 100644 --- a/webapp/packages/core-root/src/ServerEventEmitter/TopicEventHandler.ts +++ b/webapp/packages/core-root/src/ServerEventEmitter/TopicEventHandler.ts @@ -11,11 +11,11 @@ import { type ISyncExecutor, SyncExecutor } from '@cloudbeaver/core-executor'; import type { IResource } from '@cloudbeaver/core-resource'; import { compose } from '@cloudbeaver/core-utils'; -import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Subscription } from './IServerEventEmitter.js'; +import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Unsubscribe } from './IServerEventEmitter.js'; interface ISubscribedResourceInfo { listeners: number; - subscription: Subscription; + disposeSubscription: Unsubscribe; } export abstract class TopicEventHandler< @@ -28,7 +28,7 @@ export abstract class TopicEventHandler< readonly onInit: ISyncExecutor; readonly eventsSubject: Connectable; - private subscription: Subscription | null; + private disposeSubscription: Unsubscribe | null; private readonly activeResources: Array>; private readonly subscribedResources: Map, ISubscribedResourceInfo>; private readonly serverSubject?: Observable; @@ -41,7 +41,7 @@ export abstract class TopicEventHandler< this.subject = new Subject(); this.activeResources = []; this.subscribedResources = new Map(); - this.subscription = null; + this.disposeSubscription = null; this.serverSubject = this.emitter.multiplex(topic, this.map); this.eventsSubject = connectable(merge(this.subject, this.serverSubject), { connector: () => new Subject(), @@ -60,7 +60,7 @@ export abstract class TopicEventHandler< callback: IServerEventCallback, mapTo: (event: TEvent) => T = event => event as unknown as T, resource?: IResource, - ): Subscription { + ): Unsubscribe { if (resource) { this.registerResource(resource); } @@ -86,7 +86,7 @@ export abstract class TopicEventHandler< mapTo: (param: TEvent) => T = event => event as unknown as T, filterFn: (param: TEvent) => boolean = () => true, resource?: IResource, - ): Subscription { + ): Unsubscribe { if (resource) { this.registerResource(resource); } @@ -118,10 +118,10 @@ export abstract class TopicEventHandler< if (resource.useTracker.isResourceInUse) { this.activeResources.push(resource); - if (!this.subscription) { + if (!this.disposeSubscription) { // console.log('Subscribe: ', resource.getName()); const sub = this.eventsSubject.connect(); - this.subscription = () => sub.unsubscribe(); + this.disposeSubscription = () => sub.unsubscribe(); } } } @@ -132,8 +132,8 @@ export abstract class TopicEventHandler< if (this.activeResources.length === 0) { // console.log('Unsubscribe: ', resource.getName()); - this.subscription?.(); - this.subscription = null; + this.disposeSubscription?.(); + this.disposeSubscription = null; } } @@ -143,10 +143,10 @@ export abstract class TopicEventHandler< if (!info) { info = { listeners: 0, - subscription: this.resourceUseHandler.bind(this, resource), + disposeSubscription: this.resourceUseHandler.bind(this, resource), }; this.subscribedResources.set(resource, info); - resource.useTracker.onUse.addHandler(info.subscription); + resource.useTracker.onUse.addHandler(info.disposeSubscription); // console.log('Register: ', resource.getName()); } @@ -161,7 +161,7 @@ export abstract class TopicEventHandler< if (info.listeners === 0) { this.removeActiveResource(resource); - resource.useTracker.onUse.removeHandler(info.subscription); + resource.useTracker.onUse.removeHandler(info.disposeSubscription); this.subscribedResources.delete(resource); // console.log('Unregister: ', resource.getName()); } diff --git a/webapp/packages/core-root/src/SessionEventSource.ts b/webapp/packages/core-root/src/SessionEventSource.ts index 8cdbd674d8..47246afaeb 100644 --- a/webapp/packages/core-root/src/SessionEventSource.ts +++ b/webapp/packages/core-root/src/SessionEventSource.ts @@ -35,7 +35,7 @@ import { } from '@cloudbeaver/core-sdk'; import { NetworkStateService } from './NetworkStateService.js'; -import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Subscription } from './ServerEventEmitter/IServerEventEmitter.js'; +import type { IBaseServerEvent, IServerEventCallback, IServerEventEmitter, Unsubscribe } from './ServerEventEmitter/IServerEventEmitter.js'; import { SessionExpireService } from './SessionExpireService.js'; export { ServerEventId, SessionEventTopic, ClientEventId }; @@ -111,7 +111,7 @@ export class SessionEventSource implements IServerEventEmitter(id: SessionEventId, callback: IServerEventCallback, mapTo: (event: ISessionEvent) => T = e => e as T): Subscription { + onEvent(id: SessionEventId, callback: IServerEventCallback, mapTo: (event: ISessionEvent) => T = e => e as T): Unsubscribe { const sub = this.eventsSubject .pipe( filter(event => event.id === id), @@ -128,7 +128,7 @@ export class SessionEventSource implements IServerEventEmitter, mapTo: (event: ISessionEvent) => T = e => e as T, filterFn: (event: ISessionEvent) => boolean = () => true, - ): Subscription { + ): Unsubscribe { const sub = this.eventsSubject.pipe(filter(filterFn), map(mapTo)).subscribe(callback); return () => { diff --git a/webapp/packages/core-root/src/index.ts b/webapp/packages/core-root/src/index.ts index 0568e3490c..c4ff9be7f0 100644 --- a/webapp/packages/core-root/src/index.ts +++ b/webapp/packages/core-root/src/index.ts @@ -37,4 +37,6 @@ export * from './ServerNodeService.js'; export * from './ServerResourceQuotasResource.js'; export * from './WindowEventsService.js'; export * from './ServerLicenseStatusResource.js'; +export * from './AsyncTask/AsyncTask.js'; +export * from './AsyncTask/AsyncTaskInfoService.js'; export * from './manifest.js'; diff --git a/webapp/packages/core-root/src/manifest.ts b/webapp/packages/core-root/src/manifest.ts index 097c833731..55e524427a 100644 --- a/webapp/packages/core-root/src/manifest.ts +++ b/webapp/packages/core-root/src/manifest.ts @@ -13,6 +13,8 @@ export const coreRootManifest: PluginManifest = { }, providers: [ + () => import('./AsyncTask/AsyncTaskInfoService.js').then(m => m.AsyncTaskInfoService), + () => import('./AsyncTask/AsyncTaskInfoEventHandler.js').then(m => m.AsyncTaskInfoEventHandler), () => import('./FeaturesResource.js').then(m => m.FeaturesResource), () => import('./NetworkStateService.js').then(m => m.NetworkStateService), () => import('./SessionPermissionsResource.js').then(m => m.SessionPermissionsResource), diff --git a/webapp/packages/core-sdk/src/AsyncTask/AsyncTaskInfoService.ts b/webapp/packages/core-sdk/src/AsyncTask/AsyncTaskInfoService.ts deleted file mode 100644 index a8c17e2fd5..0000000000 --- a/webapp/packages/core-sdk/src/AsyncTask/AsyncTaskInfoService.ts +++ /dev/null @@ -1,105 +0,0 @@ -/* - * CloudBeaver - Cloud Database Manager - * Copyright (C) 2020-2024 DBeaver Corp and others - * - * Licensed under the Apache License, Version 2.0. - * you may not use this file except in compliance with the License. - */ -import { injectable } from '@cloudbeaver/core-di'; -import { cancellableTimeout } from '@cloudbeaver/core-utils'; - -import { GraphQLService } from '../GraphQLService.js'; -import type { AsyncTaskInfo } from '../sdk.js'; -import { AsyncTask } from './AsyncTask.js'; - -const DELAY_BETWEEN_TRIES = 250; - -@injectable() -export class AsyncTaskInfoService { - private readonly tasks: Map; - - constructor(private readonly graphQLService: GraphQLService) { - this.tasks = new Map(); - } - - create(getter: () => Promise): AsyncTask { - const task = new AsyncTask(getter, this.cancelTask.bind(this)); - this.tasks.set(task.id, task); - - if (this.tasks.size === 1) { - setTimeout(() => this.update(), 1); - } - - return task; - } - - async run(task: AsyncTask): Promise { - if (task.info === null) { - await task.run(); - } - - return task.promise; - } - - async remove(taskId: string): Promise { - const task = this.tasks.get(taskId); - - if (!task) { - return; - } - - if (task.pending) { - throw new Error('Cant remove unfinished task'); - } - - this.tasks.delete(taskId); - - if (task.info !== null) { - await this.graphQLService.sdk.getAsyncTaskInfo({ - taskId: task.info.id, - removeOnFinish: true, - }); - } - } - - async cancel(taskId: string): Promise { - const task = this.tasks.get(taskId); - - await task?.cancelAsync(); - } - - private async update() { - while (this.tasks.size > 0) { - const tasks = Array.from(this.tasks.values()); - - for (const task of tasks) { - this.updateTask(task); - } - - await cancellableTimeout(DELAY_BETWEEN_TRIES); - } - } - - private async updateTask(task: AsyncTask): Promise { - try { - if (task.pending && task.info) { - await task.updateInfoAsync(async task => { - const { taskInfo } = await this.graphQLService.sdk.getAsyncTaskInfo({ - taskId: task.info!.id, - removeOnFinish: false, - }); - - return taskInfo; - }); - } - } catch (e: any) { - console.log('Failed to check async task status', e); - } - } - - private async cancelTask(info: AsyncTaskInfo): Promise { - await this.graphQLService.sdk.asyncTaskCancel({ - taskId: info.id, - }); - } -} diff --git a/webapp/packages/core-sdk/src/index.ts b/webapp/packages/core-sdk/src/index.ts index 5178763c9d..249468e52c 100644 --- a/webapp/packages/core-sdk/src/index.ts +++ b/webapp/packages/core-sdk/src/index.ts @@ -5,8 +5,6 @@ * Licensed under the Apache License, Version 2.0. * you may not use this file except in compliance with the License. */ -export * from './AsyncTask/AsyncTask.js'; -export * from './AsyncTask/AsyncTaskInfoService.js'; export * from './Extensions/uploadBlobResultSetExtension.js'; export * from './CustomGraphQLClient.js'; export * from './DetailsError.js'; diff --git a/webapp/packages/core-sdk/src/manifest.ts b/webapp/packages/core-sdk/src/manifest.ts index 2c4e1da5c4..95224df4ed 100644 --- a/webapp/packages/core-sdk/src/manifest.ts +++ b/webapp/packages/core-sdk/src/manifest.ts @@ -13,7 +13,6 @@ export const coreSDKManifest: PluginManifest = { }, providers: [ - () => import('./AsyncTask/AsyncTaskInfoService.js').then(m => m.AsyncTaskInfoService), () => import('./EnvironmentService.js').then(m => m.EnvironmentService), () => import('./GraphQLService.js').then(m => m.GraphQLService), ], diff --git a/webapp/packages/plugin-data-import/src/DataImportService.ts b/webapp/packages/plugin-data-import/src/DataImportService.ts index a5bdfb61d5..eac7386089 100644 --- a/webapp/packages/plugin-data-import/src/DataImportService.ts +++ b/webapp/packages/plugin-data-import/src/DataImportService.ts @@ -10,8 +10,8 @@ import { computed, makeObservable } from 'mobx'; import { ProcessSnackbar } from '@cloudbeaver/core-blocks'; import { injectable } from '@cloudbeaver/core-di'; import { NotificationService } from '@cloudbeaver/core-events'; -import { EAdminPermission, SessionPermissionsResource } from '@cloudbeaver/core-root'; -import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService, EAdminPermission, SessionPermissionsResource } from '@cloudbeaver/core-root'; +import { GraphQLService } from '@cloudbeaver/core-sdk'; import { getProgressPercent } from '@cloudbeaver/core-utils'; import { DataImportSettingsService } from './DataImportSettingsService.js'; diff --git a/webapp/packages/plugin-data-viewer-result-set-grouping/package.json b/webapp/packages/plugin-data-viewer-result-set-grouping/package.json index 3ccc8f0ee5..a36cab5dbd 100644 --- a/webapp/packages/plugin-data-viewer-result-set-grouping/package.json +++ b/webapp/packages/plugin-data-viewer-result-set-grouping/package.json @@ -24,6 +24,7 @@ "@cloudbeaver/core-di": "^0", "@cloudbeaver/core-dialogs": "^0", "@cloudbeaver/core-localization": "^0", + "@cloudbeaver/core-root": "^0", "@cloudbeaver/core-sdk": "^0", "@cloudbeaver/core-ui": "^0", "@cloudbeaver/core-utils": "^0", diff --git a/webapp/packages/plugin-data-viewer-result-set-grouping/src/useGroupingDataModel.ts b/webapp/packages/plugin-data-viewer-result-set-grouping/src/useGroupingDataModel.ts index 6b224257dd..d2afd05501 100644 --- a/webapp/packages/plugin-data-viewer-result-set-grouping/src/useGroupingDataModel.ts +++ b/webapp/packages/plugin-data-viewer-result-set-grouping/src/useGroupingDataModel.ts @@ -11,7 +11,8 @@ import { useEffect } from 'react'; import { useObjectRef, useResource } from '@cloudbeaver/core-blocks'; import { ConnectionInfoResource, createConnectionParam } from '@cloudbeaver/core-connections'; import { IServiceProvider, useService } from '@cloudbeaver/core-di'; -import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import { GraphQLService } from '@cloudbeaver/core-sdk'; import { isObjectsEqual } from '@cloudbeaver/core-utils'; import { DatabaseDataAccessMode, diff --git a/webapp/packages/plugin-data-viewer-result-set-grouping/tsconfig.json b/webapp/packages/plugin-data-viewer-result-set-grouping/tsconfig.json index 82e1f8a12b..0c7f65848f 100644 --- a/webapp/packages/plugin-data-viewer-result-set-grouping/tsconfig.json +++ b/webapp/packages/plugin-data-viewer-result-set-grouping/tsconfig.json @@ -24,6 +24,9 @@ { "path": "../core-localization/tsconfig.json" }, + { + "path": "../core-root/tsconfig.json" + }, { "path": "../core-sdk/tsconfig.json" }, diff --git a/webapp/packages/plugin-data-viewer/src/ContainerDataSource.ts b/webapp/packages/plugin-data-viewer/src/ContainerDataSource.ts index bce69e823d..f6a57d0409 100644 --- a/webapp/packages/plugin-data-viewer/src/ContainerDataSource.ts +++ b/webapp/packages/plugin-data-viewer/src/ContainerDataSource.ts @@ -10,9 +10,8 @@ import { computed, makeObservable, observable } from 'mobx'; import type { ConnectionExecutionContextService, IConnectionExecutionContext, IConnectionExecutionContextInfo } from '@cloudbeaver/core-connections'; import type { IServiceProvider } from '@cloudbeaver/core-di'; import type { ITask } from '@cloudbeaver/core-executor'; +import type { AsyncTask, AsyncTaskInfoService } from '@cloudbeaver/core-root'; import { - AsyncTask, - AsyncTaskInfoService, GraphQLService, ResultDataFormat, type SqlExecuteInfo, diff --git a/webapp/packages/plugin-data-viewer/src/DataViewerTableService.ts b/webapp/packages/plugin-data-viewer/src/DataViewerTableService.ts index 7b8ba3b633..6000fd4682 100644 --- a/webapp/packages/plugin-data-viewer/src/DataViewerTableService.ts +++ b/webapp/packages/plugin-data-viewer/src/DataViewerTableService.ts @@ -8,7 +8,8 @@ import { type Connection, ConnectionExecutionContextService, createConnectionParam } from '@cloudbeaver/core-connections'; import { injectable, IServiceProvider } from '@cloudbeaver/core-di'; import { EObjectFeature, type NavNode, NavNodeManagerService } from '@cloudbeaver/core-navigation-tree'; -import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import { GraphQLService } from '@cloudbeaver/core-sdk'; import { ContainerDataSource } from './ContainerDataSource.js'; import { DatabaseDataModel } from './DatabaseDataModel/DatabaseDataModel.js'; diff --git a/webapp/packages/plugin-data-viewer/src/ResultSet/ResultSetDataSource.ts b/webapp/packages/plugin-data-viewer/src/ResultSet/ResultSetDataSource.ts index ec4d6df294..3aa856dbf2 100644 --- a/webapp/packages/plugin-data-viewer/src/ResultSet/ResultSetDataSource.ts +++ b/webapp/packages/plugin-data-viewer/src/ResultSet/ResultSetDataSource.ts @@ -10,7 +10,8 @@ import { makeObservable, observable } from 'mobx'; import type { IConnectionExecutionContext, IConnectionExecutionContextInfo } from '@cloudbeaver/core-connections'; import type { IServiceProvider } from '@cloudbeaver/core-di'; import type { ITask } from '@cloudbeaver/core-executor'; -import type { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import type { GraphQLService } from '@cloudbeaver/core-sdk'; import { DatabaseDataSource } from '../DatabaseDataModel/DatabaseDataSource.js'; import { type IDatabaseDataOptions } from '../DatabaseDataModel/IDatabaseDataOptions.js'; diff --git a/webapp/packages/plugin-sql-editor/src/QueryDataSource.ts b/webapp/packages/plugin-sql-editor/src/QueryDataSource.ts index 4da5c9d2e6..03c0d04ae2 100644 --- a/webapp/packages/plugin-sql-editor/src/QueryDataSource.ts +++ b/webapp/packages/plugin-sql-editor/src/QueryDataSource.ts @@ -10,8 +10,8 @@ import { makeObservable, observable } from 'mobx'; import type { IConnectionExecutionContextInfo } from '@cloudbeaver/core-connections'; import type { IServiceProvider } from '@cloudbeaver/core-di'; import type { ITask } from '@cloudbeaver/core-executor'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; import { - AsyncTaskInfoService, GraphQLService, ResultDataFormat, type SqlExecuteInfo, diff --git a/webapp/packages/plugin-sql-editor/src/SqlResultTabs/ExecutionPlan/SqlExecutionPlanService.ts b/webapp/packages/plugin-sql-editor/src/SqlResultTabs/ExecutionPlan/SqlExecutionPlanService.ts index 9e0b6ef4ba..618c7b08eb 100644 --- a/webapp/packages/plugin-sql-editor/src/SqlResultTabs/ExecutionPlan/SqlExecutionPlanService.ts +++ b/webapp/packages/plugin-sql-editor/src/SqlResultTabs/ExecutionPlan/SqlExecutionPlanService.ts @@ -11,7 +11,8 @@ import { ConnectionExecutionContextService } from '@cloudbeaver/core-connections import { injectable } from '@cloudbeaver/core-di'; import { NotificationService } from '@cloudbeaver/core-events'; import type { ITask } from '@cloudbeaver/core-executor'; -import { AsyncTaskInfoService, GraphQLService, type SqlExecutionPlan } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import { GraphQLService, type SqlExecutionPlan } from '@cloudbeaver/core-sdk'; import { uuid } from '@cloudbeaver/core-utils'; import type { ISqlEditorTabState } from '../../ISqlEditorTabState.js'; diff --git a/webapp/packages/plugin-sql-editor/src/SqlResultTabs/SqlQueryService.ts b/webapp/packages/plugin-sql-editor/src/SqlResultTabs/SqlQueryService.ts index c2f370ed11..d0c389446f 100644 --- a/webapp/packages/plugin-sql-editor/src/SqlResultTabs/SqlQueryService.ts +++ b/webapp/packages/plugin-sql-editor/src/SqlResultTabs/SqlQueryService.ts @@ -10,7 +10,8 @@ import { makeObservable, observable } from 'mobx'; import { ConnectionExecutionContextService, ConnectionInfoResource, createConnectionParam } from '@cloudbeaver/core-connections'; import { injectable, IServiceProvider } from '@cloudbeaver/core-di'; import { NotificationService } from '@cloudbeaver/core-events'; -import { AsyncTaskInfoService, GraphQLService } from '@cloudbeaver/core-sdk'; +import { AsyncTaskInfoService } from '@cloudbeaver/core-root'; +import { GraphQLService } from '@cloudbeaver/core-sdk'; import { DatabaseDataAccessMode, DatabaseDataModel,