Skip to content

Commit

Permalink
Merge branch 'master' of github.com:samber/mo
Browse files Browse the repository at this point in the history
  • Loading branch information
samber committed Nov 28, 2022
2 parents 1eb9f77 + 1eb2bf5 commit 1a6b71f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 84 deletions.
164 changes: 83 additions & 81 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,131 +7,148 @@ import (
// NewFuture instanciate a new future.
func NewFuture[T any](cb func(resolve func(T), reject func(error))) *Future[T] {
future := Future[T]{
mu: sync.RWMutex{},
next: nil,
cb: cb,
cancelCb: func() {},
done: make(chan struct{}),
}

go func() {
cb(future.resolve, future.reject)
}()
future.active()

return &future
}

// Future represents a value which may or may not currently be available, but will be
// available at some point, or an exception if that value could not be made available.
type Future[T any] struct {
mu sync.RWMutex
mu sync.Mutex

next func(T, error)
cb func(func(T), func(error))
cancelCb func()
next *Future[T]
done chan struct{}
result Result[T]
}

func (f *Future[T]) active() {
go f.cb(f.resolve, f.reject)
}

func (f *Future[T]) activeSync() {
f.cb(f.resolve, f.reject)
}

func (f *Future[T]) resolve(value T) {
f.mu.RLock()
defer f.mu.RUnlock()
f.mu.Lock()
defer f.mu.Unlock()

f.result = Ok(value)
if f.next != nil {
f.next(value, nil)
f.next.activeSync()
}
close(f.done)
}

func (f *Future[T]) reject(err error) {
f.mu.RLock()
defer f.mu.RUnlock()
f.mu.Lock()
defer f.mu.Unlock()

f.result = Err[T](err)
if f.next != nil {
f.next(empty[T](), err)
f.next.activeSync()
}
close(f.done)
}

// Catch is called when Future is resolved. It returns a new Future.
// Then is called when Future is resolved. It returns a new Future.
func (f *Future[T]) Then(cb func(T) (T, error)) *Future[T] {
f.mu.Lock()
defer f.mu.Unlock()

future := &Future[T]{
mu: sync.RWMutex{},
next: nil,
f.next = &Future[T]{
cb: func(resolve func(T), reject func(error)) {
if f.result.IsError() {
reject(f.result.Error())
return
}
newValue, err := cb(f.result.MustGet())
if err != nil {
reject(err)
return
}
resolve(newValue)
},
cancelCb: func() {
f.Cancel()
},
done: make(chan struct{}),
}

f.next = func(value T, err error) {
if err != nil {
future.reject(err)
return
}

newValue, err := cb(value)
if err != nil {
future.reject(err)
return
}

future.resolve(newValue)
select {
case <-f.done:
f.next.active()
default:
}

return future
return f.next
}

// Catch is called when Future is rejected. It returns a new Future.
func (f *Future[T]) Catch(cb func(error) (T, error)) *Future[T] {
f.mu.Lock()
defer f.mu.Unlock()

future := &Future[T]{
mu: sync.RWMutex{},
next: nil,
f.next = &Future[T]{
cb: func(resolve func(T), reject func(error)) {
if f.result.IsOk() {
resolve(f.result.MustGet())
return
}
newValue, err := cb(f.result.Error())
if err != nil {
reject(err)
return
}
resolve(newValue)
},
cancelCb: func() {
f.Cancel()
},
done: make(chan struct{}),
}

f.next = func(value T, err error) {
if err == nil {
future.resolve(value)
return
}

newValue, err := cb(err)
if err != nil {
future.reject(err)
return
}

future.resolve(newValue)
select {
case <-f.done:
f.next.active()
default:
}

return future
return f.next
}

// Finally is called when Future is processed either resolved or rejected. It returns a new Future.
func (f *Future[T]) Finally(cb func(T, error) (T, error)) *Future[T] {
f.mu.Lock()
defer f.mu.Unlock()

future := &Future[T]{
mu: sync.RWMutex{},
next: nil,
f.next = &Future[T]{
cb: func(resolve func(T), reject func(error)) {
newValue, err := cb(f.result.Get())
if err != nil {
reject(err)
return
}
resolve(newValue)
},
cancelCb: func() {
f.Cancel()
},
done: make(chan struct{}),
}

f.next = func(value T, err error) {
newValue, err := cb(value, err)
if err != nil {
future.reject(err)
return
}

future.resolve(newValue)
select {
case <-f.done:
f.next.active()
default:
}

return future
return f.next
}

// Cancel cancels the Future chain.
Expand All @@ -147,31 +164,16 @@ func (f *Future[T]) Cancel() {

// Collect awaits and return result of the Future.
func (f *Future[T]) Collect() (T, error) {
done := make(chan struct{})

var a T
var b error

f.mu.Lock()
f.next = func(value T, err error) {
a = value
b = err

done <- struct{}{}
}
f.mu.Unlock()

<-done

return a, b
<-f.done
return f.result.Get()
}

// Result wraps Collect and returns a Result.
func (f *Future[T]) Result() Result[T] {
return TupleToResult(f.Collect())
}

// Result wraps Collect and returns a Result.
// Either wraps Collect and returns a Either.
func (f *Future[T]) Either() Either[error, T] {
v, err := f.Collect()
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions future_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mo

import (
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -308,3 +309,17 @@ func TestFutureResultEither(t *testing.T) {
is.NotNil(either.Left())
is.Equal(assert.AnError, either.MustLeft())
}

func TestFutureCompleteBeforeThen(t *testing.T) {
completed := make(chan struct{})
fut := NewFuture(func(resolve func(int), reject func(error)) {
resolve(1)
close(completed)
})

<-completed
fut.Then(func(in int) (int, error) {
fmt.Println(in) // will never been print
return in, nil
}).Collect() // deadlock
}
2 changes: 0 additions & 2 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"reflect"
"time"
)

var optionNoSuchElement = fmt.Errorf("no such element")
Expand Down Expand Up @@ -186,7 +185,6 @@ func (o *Option[T]) UnmarshalJSON(b []byte) error {
}

o.isPresent = true
time.Now()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion result.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (r Result[T]) Error() error {
return r.err
}

// MustGet returns value and error.
// Get returns value and error.
// Play: https://go.dev/play/p/8KyX3z6TuNo
func (r Result[T]) Get() (T, error) {
if r.isErr {
Expand Down

0 comments on commit 1a6b71f

Please sign in to comment.