Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Promise.completeAsync(). #1838

Open
wants to merge 5 commits into
base: 8.3.0-Dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions src/lime/app/Future.hx
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import lime.utils.Log;
var promise = new Promise<T>();
promise.future = this;

FutureWork.run(work, promise);
FutureWork.runSimpleJob(work, promise);
}
else
#end
Expand Down Expand Up @@ -308,7 +308,6 @@ import lime.utils.Log;
}
}

#if (lime_threads && !html5)
/**
The class that handles asynchronous `work` functions passed to `new Future()`.
**/
Expand All @@ -319,26 +318,42 @@ import lime.utils.Log;
@:dox(hide) class FutureWork
{
private static var threadPool:ThreadPool;
private static var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic}>;
private static var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic, progress:Int->Int->Dynamic}>;

public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;

@:allow(lime.app.Promise)
private static inline function cancelJob(id:Int):Void
{
threadPool.cancelJob(id);
}

#if (lime_threads && !html5)
@:allow(lime.app.Future)
private static function run<T>(work:Void->T, promise:Promise<T>):Void
private static function runSimpleJob<T>(work:Void->T, promise:Promise<T>):Void
{
run(threadPool_doWork, promise, work, MULTI_THREADED);
}
#end

@:allow(lime.app.Promise)
private static function run<T>(work:WorkFunction<State->WorkOutput->Void>, promise:Promise<T>, state:State, mode:ThreadMode):Int
{
if (threadPool == null)
{
threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);
threadPool.onProgress.add(threadPool_onProgress);

promises = new Map();
}

var jobID:Int = threadPool.run(threadPool_doWork, work);
promises[jobID] = {complete: promise.complete, error: promise.error};
var jobID:Int = threadPool.run(work, state, mode);
promises[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
return jobID;
}

// Event Handlers
Expand Down Expand Up @@ -368,6 +383,15 @@ import lime.utils.Log;
promise.error(error);
}

private static function threadPool_onProgress(progress:{progress:Int, total:Int}):Void
{
// ThreadPool doesn't enforce types, so check manually
if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt)
{
promises[threadPool.activeJob.id].progress(progress.progress, progress.total);
}
}

// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
{
Expand All @@ -392,4 +416,3 @@ import lime.utils.Log;
return threadPool != null ? threadPool.activeJobs : 0;
}
}
#end
100 changes: 88 additions & 12 deletions src/lime/app/Promise.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package lime.app;

import lime.app.Future;
import lime.system.ThreadPool;
import lime.system.WorkOutput;

/**
`Promise` is an implementation of Futures and Promises, with the exception that
in addition to "success" and "failure" states (represented as "complete" and "error"),
Expand All @@ -10,18 +14,20 @@ package lime.app;
for recipients of it's `Future` object. For example:

```haxe
function examplePromise ():Future<String> {

var promise = new Promise<String> ();
function examplePromise():Future<String>
{
var promise = new Promise<String>();

var progress = 0, total = 10;
var timer = new Timer (100);
timer.run = function () {
var timer = new Timer(100);
timer.run = function()
{

promise.progress (progress, total);
progress++;

if (progress == total) {
if (progress == total)
{

promise.complete ("Done!");
timer.stop ();
Expand All @@ -31,12 +37,11 @@ package lime.app;
};

return promise.future;

}

var future = examplePromise ();
future.onComplete (function (message) { trace (message); });
future.onProgress (function (loaded, total) { trace ("Progress: " + loaded + ", " + total); });
var future = examplePromise();
future.onComplete(function(message) { trace(message); });
future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); });
```
**/
#if !lime_debug
Expand Down Expand Up @@ -69,6 +74,8 @@ class Promise<T>
**/
public var isError(get, null):Bool;

private var jobID:Int = -1;

#if commonjs
private static function __init__()
{
Expand Down Expand Up @@ -96,11 +103,23 @@ class Promise<T>
**/
public function complete(data:T):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(complete.bind(data));
return this;
}

if (!future.isError)
{
future.isComplete = true;
future.value = data;

if (jobID != -1)
{
FutureWork.cancelJob(jobID);
jobID = -1;
}

if (future.__completeListeners != null)
{
for (listener in future.__completeListeners)
Expand All @@ -115,6 +134,45 @@ class Promise<T>
return this;
}

/**
Runs the given function asynchronously, and resolves this `Promise` with
the complete, error, and/or progress events sent by that function.
Sample usage:

```haxe
function examplePromise():Future<String>
{
var promise = new Promise<String>();
promise.completeAsync(function(state:State, output:WorkOutput):Void
{
output.sendProgress({progress:state.progress, total:10});
state.progress++;

if (state.progress == 10)
{
output.sendComplete("Done!");
}
},
{progress: 0}, MULTI_THREADED);

return promise.future;
}

var future = examplePromise();
future.onComplete(function(message) { trace(message); });
future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); });
```

@param doWork A function to perform work asynchronously. For best results,
see the guidelines in the `ThreadPool` class overview.
@param state The value to pass to `doWork`.
@param mode Which mode to run the job in: `SINGLE_THREADED` or `MULTI_THREADED`.
**/
public function completeAsync(doWork:WorkFunction<State->WorkOutput->Void>, ?state:State, ?mode:ThreadMode = MULTI_THREADED):Void
{
jobID = FutureWork.run(doWork, this, state, mode);
}

/**
Resolves this `Promise` with the complete, error and/or progress state
of another `Future`
Expand All @@ -137,11 +195,23 @@ class Promise<T>
**/
public function error(msg:Dynamic):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(error.bind(msg));
return this;
}

if (!future.isComplete)
{
future.isError = true;
future.error = msg;

if (jobID != -1)
{
FutureWork.cancelJob(jobID);
jobID = -1;
}

if (future.__errorListeners != null)
{
for (listener in future.__errorListeners)
Expand All @@ -164,6 +234,12 @@ class Promise<T>
**/
public function progress(progress:Int, total:Int):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(this.progress.bind(progress, total));
return this;
}

if (!future.isError && !future.isComplete)
{
if (future.__progressListeners != null)
Expand All @@ -179,12 +255,12 @@ class Promise<T>
}

// Get & Set Methods
@:noCompletion private function get_isComplete():Bool
@:noCompletion private inline function get_isComplete():Bool
{
return future.isComplete;
}

@:noCompletion private function get_isError():Bool
@:noCompletion private inline function get_isError():Bool
{
return future.isError;
}
Expand Down
Loading
Loading