Skip to content

Commit

Permalink
Flip argument order of takeUntil module
Browse files Browse the repository at this point in the history
The takeUntil module violates one of the first principles of functional
programming. Its argument order was (src, end) which is a data first
argument order.

This commit flips the argument order to (end, src) allowing us to better
use takeUntil in pipelines such as:

```js
const query = stream('');

// Start fetching the results but if they haven't
// arrived when a new query is emitted
// stop listening to the stream
const results = query
 .chain(q =>
    fromPromise(getResults(q))
      .pipe(takeUntil(query))
  )
```

This also fixes an issue with takeUntil where if the new endStream had a
value the dependent stream wouldn't update.
  • Loading branch information
Einar Norðfjörð committed Jul 12, 2018
1 parent 0b1a435 commit 96c915c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
logs
*.log

# vscode settings
.vscode

# Dependency directory
node_modules
bower_components
Expand Down
4 changes: 2 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ declare module 'flyd/module/switchlatest' {

declare module 'flyd/module/takeuntil' {
interface takeuntil {
<T, V>(source: flyd.Stream<T>, end: flyd.Stream<V>): flyd.Stream<T>;
<T>(source: flyd.Stream<T>): <V>(end: flyd.Stream<V>) => flyd.Stream<T>;
<T, V>(end: flyd.Stream<T>, src: flyd.Stream<V>): flyd.Stream<V>;
<T>(end: flyd.Stream<T>): <V>(src: flyd.Stream<V>) => flyd.Stream<V>;
}
const _takeuntil: takeuntil;
export = _takeuntil;
Expand Down
5 changes: 1 addition & 4 deletions module/switchlatest/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
var flyd = require('../../lib');
var takeUntil = require('../takeuntil');
var drop = require('ramda/src/drop');

var dropCurrentValue = flyd.transduce(drop(1));

module.exports = function(s) {
return flyd.combine(function(stream$, self) {
var value$ = stream$();
flyd.on(self, takeUntil(value$, dropCurrentValue(stream$)));
flyd.on(self, takeUntil(stream$, value$));
}, [s]);
};
6 changes: 3 additions & 3 deletions module/takeuntil/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ __Graph__
```
a: {---1---2---3---4}
b: {---------x------}
takeUntil(a, b): {---1---2--------}
takeUntil(b, a): {---1---2--------}
```

__Signature__

`Stream a -> Stream b -> Stream a`
`Stream a -> Stream b -> Stream b`

__Usage__

Expand All @@ -20,7 +20,7 @@ const takeUntil = require('flyd/module/takeuntil')

const source = flyd.stream()
const end = flyd.stream()
const result = takeUntil(source, end)
const result = takeUntil(end, source)

source(1)(2)
result() // 2
Expand Down
9 changes: 7 additions & 2 deletions module/takeuntil/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
var flyd = require('../../lib');
var drop = require('ramda/src/drop');

module.exports = flyd.curryN(2, function(src, term) {
return flyd.endsOn(flyd.merge(term, src.end), flyd.combine(function(src, self) {
var drop1 = flyd.transduce(drop(1));

module.exports = flyd.curryN(2, function(term, src) {
var end$ = flyd.merge(term.hasVal ? drop1(term) : term, src.end);

return flyd.endsOn(end$, flyd.combine(function(src, self) {
self(src());
}, [src]));
});
27 changes: 15 additions & 12 deletions module/takeuntil/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,39 @@ var assert = require('assert');
var takeUntil = require('../index');

describe('takeUntil', function() {
it('emits values from first stream', function() {
it('emits values from source stream', function() {
var result = [];
var source = stream();
var terminator = stream();
var s = takeUntil(source, terminator);
flyd.map(function(v) { result.push(v); }, s);
source
.pipe(takeUntil(terminator))
.map(function(v) { result.push(v); });
source(1)(2)(3);
assert.deepEqual(result, [1, 2, 3]);
});
it('ends when value emitted from second stream', function() {
it('ends when value emitted from terminator stream', function() {
var result = [];
var source = stream();
var terminator = stream();
var s = takeUntil(source, terminator);
flyd.map(function(v) { result.push(v); }, s);
s(1);
var s = source
.pipe(takeUntil(terminator))
.map(function(v) { result.push(v); });
source(1);
terminator(true);
s(2);
source(2);
assert.deepEqual(result, [1]);
assert(s.end());
});
it('ends if source stream ends', function() {
var result = [];
var source = stream();
var terminator = stream();
var s = takeUntil(source, terminator);
flyd.map(function(v) { result.push(v); }, s);
s(1);
var s = source
.pipe(takeUntil(terminator))
.map(function(v) { result.push(v); });
source(1);
source.end(true);
s(2);
source(2);
assert.deepEqual(result, [1]);
assert(s.end());
});
Expand Down

0 comments on commit 96c915c

Please sign in to comment.