From eb4214f4fe16feb0e83901b43459cb99489ea197 Mon Sep 17 00:00:00 2001 From: "zhuzhenfeng.code" Date: Fri, 7 Oct 2022 18:17:30 +0800 Subject: [PATCH 1/4] typo: doc Get not MustGet Change-Id: I1c5bce2c9e4057d0669a0c7dd5df81e39849f6f1 --- result.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/result.go b/result.go index ee707c0..fa0a3b3 100644 --- a/result.go +++ b/result.go @@ -61,7 +61,7 @@ func (r Result[T]) Error() error { return r.err } -// MustGet returns value and error. +// Get returns value and error. // Play: https://go.dev/play/p/8KyX3z6TuNo func (r Result[T]) Get() (T, error) { if r.isErr { From 0804f725065d1c536e9b780917f78b4c1af5aaa4 Mon Sep 17 00:00:00 2001 From: "zhuzhenfeng.code" Date: Fri, 7 Oct 2022 18:19:51 +0800 Subject: [PATCH 2/4] remove unused time.Now Change-Id: Ib2d617a6c20f6951559281796049615e7a0d9e1d --- option.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/option.go b/option.go index e9c5918..29bdaaa 100644 --- a/option.go +++ b/option.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "reflect" - "time" ) var optionNoSuchElement = fmt.Errorf("no such element") @@ -186,7 +185,6 @@ func (o *Option[T]) UnmarshalJSON(b []byte) error { } o.isPresent = true - time.Now() return nil } From e85639c683f9e16cab09878c1bce5d843a618394 Mon Sep 17 00:00:00 2001 From: "zhuzhenfeng.code" Date: Fri, 7 Oct 2022 18:24:42 +0800 Subject: [PATCH 3/4] fix some future doc typo Change-Id: Ic811f01a130863a81cde9e79c22a1fd8b025f1b4 --- future.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/future.go b/future.go index fba44ef..faf2345 100644 --- a/future.go +++ b/future.go @@ -46,7 +46,7 @@ func (f *Future[T]) reject(err error) { } } -// Catch is called when Future is resolved. It returns a new Future. +// Then is called when Future is resolved. It returns a new Future. func (f *Future[T]) Then(cb func(T) (T, error)) *Future[T] { f.mu.Lock() defer f.mu.Unlock() @@ -171,7 +171,7 @@ func (f *Future[T]) Result() Result[T] { return TupleToResult(f.Collect()) } -// Result wraps Collect and returns a Result. +// Either wraps Collect and returns a Either. func (f *Future[T]) Either() Either[error, T] { v, err := f.Collect() if err != nil { From 86c5a1d715a2d5b257b292275132b4fa429a8c38 Mon Sep 17 00:00:00 2001 From: Youga Hu Date: Fri, 14 Oct 2022 15:31:08 +0800 Subject: [PATCH 4/4] adjust future implement --- future.go | 160 +++++++++++++++++++++++++------------------------ future_test.go | 15 +++++ 2 files changed, 96 insertions(+), 79 deletions(-) diff --git a/future.go b/future.go index faf2345..f8e1407 100644 --- a/future.go +++ b/future.go @@ -7,14 +7,12 @@ import ( // NewFuture instanciate a new future. func NewFuture[T any](cb func(resolve func(T), reject func(error))) *Future[T] { future := Future[T]{ - mu: sync.RWMutex{}, - next: nil, + cb: cb, cancelCb: func() {}, + done: make(chan struct{}), } - go func() { - cb(future.resolve, future.reject) - }() + future.active() return &future } @@ -22,28 +20,43 @@ func NewFuture[T any](cb func(resolve func(T), reject func(error))) *Future[T] { // Future represents a value which may or may not currently be available, but will be // available at some point, or an exception if that value could not be made available. type Future[T any] struct { - mu sync.RWMutex + mu sync.Mutex - next func(T, error) + cb func(func(T), func(error)) cancelCb func() + next *Future[T] + done chan struct{} + result Result[T] +} + +func (f *Future[T]) active() { + go f.cb(f.resolve, f.reject) +} + +func (f *Future[T]) activeSync() { + f.cb(f.resolve, f.reject) } func (f *Future[T]) resolve(value T) { - f.mu.RLock() - defer f.mu.RUnlock() + f.mu.Lock() + defer f.mu.Unlock() + f.result = Ok(value) if f.next != nil { - f.next(value, nil) + f.next.activeSync() } + close(f.done) } func (f *Future[T]) reject(err error) { - f.mu.RLock() - defer f.mu.RUnlock() + f.mu.Lock() + defer f.mu.Unlock() + f.result = Err[T](err) if f.next != nil { - f.next(empty[T](), err) + f.next.activeSync() } + close(f.done) } // Then is called when Future is resolved. It returns a new Future. @@ -51,30 +64,31 @@ func (f *Future[T]) Then(cb func(T) (T, error)) *Future[T] { f.mu.Lock() defer f.mu.Unlock() - future := &Future[T]{ - mu: sync.RWMutex{}, - next: nil, + f.next = &Future[T]{ + cb: func(resolve func(T), reject func(error)) { + if f.result.IsError() { + reject(f.result.Error()) + return + } + newValue, err := cb(f.result.MustGet()) + if err != nil { + reject(err) + return + } + resolve(newValue) + }, cancelCb: func() { f.Cancel() }, + done: make(chan struct{}), } - f.next = func(value T, err error) { - if err != nil { - future.reject(err) - return - } - - newValue, err := cb(value) - if err != nil { - future.reject(err) - return - } - - future.resolve(newValue) + select { + case <-f.done: + f.next.active() + default: } - - return future + return f.next } // Catch is called when Future is rejected. It returns a new Future. @@ -82,30 +96,31 @@ func (f *Future[T]) Catch(cb func(error) (T, error)) *Future[T] { f.mu.Lock() defer f.mu.Unlock() - future := &Future[T]{ - mu: sync.RWMutex{}, - next: nil, + f.next = &Future[T]{ + cb: func(resolve func(T), reject func(error)) { + if f.result.IsOk() { + resolve(f.result.MustGet()) + return + } + newValue, err := cb(f.result.Error()) + if err != nil { + reject(err) + return + } + resolve(newValue) + }, cancelCb: func() { f.Cancel() }, + done: make(chan struct{}), } - f.next = func(value T, err error) { - if err == nil { - future.resolve(value) - return - } - - newValue, err := cb(err) - if err != nil { - future.reject(err) - return - } - - future.resolve(newValue) + select { + case <-f.done: + f.next.active() + default: } - - return future + return f.next } // Finally is called when Future is processed either resolved or rejected. It returns a new Future. @@ -113,25 +128,27 @@ func (f *Future[T]) Finally(cb func(T, error) (T, error)) *Future[T] { f.mu.Lock() defer f.mu.Unlock() - future := &Future[T]{ - mu: sync.RWMutex{}, - next: nil, + f.next = &Future[T]{ + cb: func(resolve func(T), reject func(error)) { + newValue, err := cb(f.result.Get()) + if err != nil { + reject(err) + return + } + resolve(newValue) + }, cancelCb: func() { f.Cancel() }, + done: make(chan struct{}), } - f.next = func(value T, err error) { - newValue, err := cb(value, err) - if err != nil { - future.reject(err) - return - } - - future.resolve(newValue) + select { + case <-f.done: + f.next.active() + default: } - - return future + return f.next } // Cancel cancels the Future chain. @@ -147,23 +164,8 @@ func (f *Future[T]) Cancel() { // Collect awaits and return result of the Future. func (f *Future[T]) Collect() (T, error) { - done := make(chan struct{}) - - var a T - var b error - - f.mu.Lock() - f.next = func(value T, err error) { - a = value - b = err - - done <- struct{}{} - } - f.mu.Unlock() - - <-done - - return a, b + <-f.done + return f.result.Get() } // Result wraps Collect and returns a Result. diff --git a/future_test.go b/future_test.go index c8433c1..7af6b54 100644 --- a/future_test.go +++ b/future_test.go @@ -1,6 +1,7 @@ package mo import ( + "fmt" "sync/atomic" "testing" "time" @@ -308,3 +309,17 @@ func TestFutureResultEither(t *testing.T) { is.NotNil(either.Left()) is.Equal(assert.AnError, either.MustLeft()) } + +func TestFutureCompleteBeforeThen(t *testing.T) { + completed := make(chan struct{}) + fut := NewFuture(func(resolve func(int), reject func(error)) { + resolve(1) + close(completed) + }) + + <-completed + fut.Then(func(in int) (int, error) { + fmt.Println(in) // will never been print + return in, nil + }).Collect() // deadlock +}