Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Create one common method for flatMap, flatMapFirst, flatMapLatest #793

Closed
xgrommx opened this issue Jun 30, 2015 · 13 comments
Closed

Create one common method for flatMap, flatMapFirst, flatMapLatest #793

xgrommx opened this issue Jun 30, 2015 · 13 comments

Comments

@xgrommx
Copy link
Contributor

xgrommx commented Jun 30, 2015

I think we need create one common method for flatMap, flatMapFirst, flatMapLatest. This code which I propose:

function f(name, source, selector, thisArg) {
    var operator = 'mergeAll';

    switch(name) {
      case 'flatMap':
        operator = 'mergeAll';
        break;
      case 'flatMapFirst':
        operator = 'switchFirst';
        break;
      case 'flatMapLatest':
        operator = 'switchLatest';
        break;
      default:
        operator = 'mergeAll';
        break;  
    }

    var selectorFunc = Rx.internals.bindCallback(selector, thisArg, 3);
    return source.map(function (x, i) {
      var result = selectorFunc(x, i, source);
      Rx.helpers.isPromise(result) && (result = Rx.Observable.fromPromise(result));
      (Rx.helpers.isArrayLike(result) || Rx.helpers.isIterable(result)) && (result = Rx.Observable.from(result));
      return result;
    })[operator]()
  };

function flatMap(name, selector, resultSelector, thisArg) {  
  if (Rx.helpers.isFunction(selector) && Rx.helpers.isFunction(resultSelector)) {
  return this[name](function (x, i) {
    var selectorResult = selector(x, i);
    Rx.helpers.isPromise(selectorResult) && (selectorResult = Rx.Observable.fromPromise(selectorResult));
    (Rx.helpers.isArrayLike(selectorResult) || Rx.helpers.isIterable(selectorResult)) && 
      (selectorResult = Rx.Observable.from(selectorResult));

    return selectorResult.map(function (y, i2) {
      return resultSelector(x, y, i, i2);
    });
  }, thisArg);
}

return Rx.helpers.isFunction(selector) ?
  f(name, this, selector, thisArg) :
  f(name, this, function () { return selector; }, thisArg);
}

Rx.Observable.prototype.flatMap = function (selector, resultSelector, thisArg) {
  return flatMap.call(this, 'flatMap', selector, resultSelector, thisArg);
};

Rx.Observable.prototype.flatMapFirst = function (selector, resultSelector, thisArg) {
  return flatMap.call(this, 'flatMapFirst', selector, resultSelector, thisArg);
};

Rx.Observable.prototype.flatMapLatest = function (selector, resultSelector, thisArg) {
  return flatMap.call(this, 'flatMapLatest', selector, resultSelector, thisArg);
};

Thoughts?

@xgrommx
Copy link
Contributor Author

xgrommx commented Jun 30, 2015

In addition probably we can get all operators using flatMapWithMaxConcurrent

@xgrommx
Copy link
Contributor Author

xgrommx commented Jun 30, 2015

concatMap could be created using flatMapWithMaxConcurrent(1, fn) and maybe should be renamed to flatMapConcat

flatMap could be created using flatMapWithMaxConcurrent(Number.POSITIVE_INFINITY, fn)

@xgrommx
Copy link
Contributor Author

xgrommx commented Jun 30, 2015

@mattpodwysocki How do you think about it?

@paulpdaniels
Copy link
Contributor

@xgrommx Have you tried this already? I think this will recurse indefinitely:

this[name](...) -> Observable.prototype[name] -> flatMap.call(this, name, ...) -> this[name](...)

@xgrommx
Copy link
Contributor Author

xgrommx commented Jun 30, 2015

@paulpdaniels Yes I tried it and it's working.

@xgrommx
Copy link
Contributor Author

xgrommx commented Jul 1, 2015

Hello @mattpodwysocki,

After this night I realized a new idea. We can create a common method for all flatMap so:

function f(source, selector, options, thisArg) {
    var selectorFunc = Rx.internals.bindCallback(selector, thisArg, 3);

    source = source.map(function (x, i) {
        var result = selectorFunc(x, i, source);
        Rx.helpers.isPromise(result) && (result = Rx.Observable.fromPromise(result));
        (Rx.helpers.isArrayLike(result) || Rx.helpers.isIterable(result)) && (result = Rx.Observable.from(result));
        return result;
    });

    if(options.first === true && options.limit === false) {
        // flatMapFirst
        return source.switchFirst();
    } else if(options.last === true && options.limit === false) {
        // flatMapLatest
        return source.switchLatest();
    } else if(options.last === false && options.first === false && options.limit !== false) {
        // flatMap or flatMapWithMaxConcurrent or flatMapConcat
        return source.merge(options.limit);
    } else if(options.last === false && options.first === false && options.limit === false) {
        return source.mergeAll();
    }
}

function flatMap(selector, resultSelector, options, thisArg) {
    if(Rx.helpers.isFunction(selector) && Rx.helpers.isFunction(resultSelector)) {
        var tempF = function (x, i) {
            var selectorResult = selector(x, i);
            Rx.helpers.isPromise(selectorResult) && (selectorResult = Rx.Observable.fromPromise(selectorResult));
            (Rx.helpers.isArrayLike(selectorResult) || Rx.helpers.isIterable(selectorResult)) &&
            (selectorResult = Rx.Observable.from(selectorResult));

            return selectorResult.map(function (y, i2) {
                return resultSelector(x, y, i, i2);
            });
        };

        if(options.first === true && options.limit === false) {
            return this.flatMapFirst(tempF, thisArg);
        } else if(options.last === true && options.limit === false) {
            return this.flatMapLatest(tempF, thisArg);
        } else if(options.last === false && options.first === false && limit !== false) {
            return this.flatMapWithMaxConcurrent(limit, tempF, thisArg);
        } else if(options.last === false && options.first === false && limit === false) {
            return this.flatMap(tempF, thisArg);
        }
    }

    return Rx.helpers.isFunction(selector) ?
        f(this, selector, options, thisArg) :
        f(this, Rx.helpers.identity, options, thisArg);
}

Rx.Observable.prototype.flatMap = function(selector, resultSelector, thisArg) {
    var options = {
        first: false,
        last: false,
        limit: false
    };

    return flatMap.call(this, selector, resultSelector, options, thisArg);
};

Rx.Observable.prototype.flatMapFirst = function(selector, resultSelector, thisArg) {
    var options = {
        first: true,
        last: false,
        limit: false
    };

    return flatMap.call(this, selector, resultSelector, options, thisArg);
};

Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisArg) {
    var options = {
        first: false,
        last: true,
        limit: false
    };

    return flatMap.call(this, selector, resultSelector, options, thisArg);
};

Rx.Observable.prototype.flatMapWithMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
    var options = {
        first: false,
        last: false,
        limit: limit
    };

    return flatMap.call(this, selector, resultSelector, options, thisArg);
};

Rx.Observable.prototype.flatMapConcat = function(selector, resultSelector, thisArg) {
    var options = {
        first: false,
        last: false,
        limit: 1
    };

    return flatMap.call(this, selector, resultSelector, options, thisArg);
};

Tests:

// flatMap or flatMapWithMaxConcurrent(Number.POSITIVE_INFINITY)

Rx.Observable.for([1, 2, 3], _ => Rx.Observable.just(_).delay(100))
  .flatMap(x => Rx.Observable.interval(40).map(_ => x).take(4))
  .subscribe(x => console.log('flatMap', x), null, null);

// => flatMap 1
// => flatMap 1
// => flatMap 1
// => flatMap 2
// => flatMap 1
// => flatMap 2
// => flatMap 2
// => flatMap 3
// => flatMap 2
// => flatMap 3
// => flatMap 3
// => flatMap 3

// flatMapFirst

Rx.Observable.for([1, 2, 3], _ => Rx.Observable.just(_).delay(100))
  .flatMapFirst(x => Rx.Observable.interval(40).map(_ => x).take(4))
  .subscribe(x => console.log('flatMapFirst', x), null, null);

// => flatMapFirst 1
// => flatMapFirst 1  
// => flatMapFirst 1  
// => flatMapFirst 1
// => flatMapFirst 3
// => flatMapFirst 3  
// => flatMapFirst 3  
// => flatMapFirst 3   

// flatMapLatest

Rx.Observable.for([1, 2, 3], _ => Rx.Observable.just(_).delay(100))
  .flatMapLatest(x => Rx.Observable.interval(40).map(_ => x).take(4))
  .subscribe(x => console.log('flatMapLatest', x), null, null);

// => flatMapLatest 1
// => flatMapLatest 1
// => flatMapLatest 2
// => flatMapLatest 2
// => flatMapLatest 3
// => flatMapLatest 3  
// => flatMapLatest 3  
// => flatMapLatest 3   

// flatMapConcat or flatMapWithMaxConcurrent(1) aka concatMap

Rx.Observable.for([1, 2, 3], _ => Rx.Observable.just(_).delay(100))
  .flatMapConcat(x => Rx.Observable.interval(40).map(_ => x).take(4))
  .subscribe(x => console.log('flatMapConcat', x), null, null);

// => flatMapConcat 1
// => flatMapConcat 1  
// => flatMapConcat 1  
// => flatMapConcat 1
// => flatMapConcat 2
// => flatMapConcat 2  
// => flatMapConcat 2  
// => flatMapConcat 2
// => flatMapConcat 3
// => flatMapConcat 3  
// => flatMapConcat 3  
// => flatMapConcat 3  

// flatMapWithMaxConcurrent

Rx.Observable.for([1, 2, 3], _ => Rx.Observable.just(_).delay(100))
  .flatMapWithMaxConcurrent(2, x => Rx.Observable.interval(40).map(_ => x).take(4))
  .subscribe(x => console.log('flatMapWithMaxConcurrent', x), null, null);

// => flatMapWithMaxConcurrent 1
// => flatMapWithMaxConcurrent 1
// => flatMapWithMaxConcurrent 1
// => flatMapWithMaxConcurrent 2
// => flatMapWithMaxConcurrent 1
// => flatMapWithMaxConcurrent 2
// => flatMapWithMaxConcurrent 2
// => flatMapWithMaxConcurrent 3
// => flatMapWithMaxConcurrent 2
// => flatMapWithMaxConcurrent 3
// => flatMapWithMaxConcurrent 3
// => flatMapWithMaxConcurrent 3

@paulpdaniels
Copy link
Contributor

@xgrommx I like the idea but you can simplify to:

function f(source, selector, thisArg) {
    var selectorFunc = Rx.internals.bindCallback(selector, thisArg, 3);

    return source.map(function (x, i) {
        var result = selectorFunc(x, i, source);
        Rx.helpers.isPromise(result) && (result = Rx.Observable.fromPromise(result));
        (Rx.helpers.isArrayLike(result) || Rx.helpers.isIterable(result)) && (result = Rx.Observable.from(result));
        return result;
    });
}

function flatMap(selector, resultSelector, thisArg) {

    var s = selector;

    if(Rx.helpers.isFunction(selector) && Rx.helpers.isFunction(resultSelector)) {
        s = function (x, i) {
            var selectorResult = selector(x, i);
            Rx.helpers.isPromise(selectorResult) && (selectorResult = Rx.Observable.fromPromise(selectorResult));
            (Rx.helpers.isArrayLike(selectorResult) || Rx.helpers.isIterable(selectorResult)) &&
            (selectorResult = Rx.Observable.from(selectorResult));

            return selectorResult.map(function (y, i2) {
                return resultSelector(x, y, i, i2);
            });
        };
    } else if (!Rx.helpers.isFunction(selector)) {
       s = Rx.helpers.identity;
    }

    return f(this, s, thisArg);
}

Rx.Observable.prototype.flatMap = function(selector, resultSelector, thisArg) {
    return flatMap.call(this, selector, resultSelector, thisArg).mergeAll();
};

Rx.Observable.prototype.flatMapFirst = function(selector, resultSelector, thisArg) {
    return flatMap.call(this, selector, resultSelector, thisArg).switchFirst();
};

Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisArg) {
    return flatMap.call(this, selector, resultSelector, thisArg).switchLatest();
};

Rx.Observable.prototype.flatMapWithMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
    return flatMap.call(this, selector, resultSelector, thisArg).merge(limit);
};

Rx.Observable.prototype.flatMapConcat = function(selector, resultSelector, thisArg) {
    return flatMap.call(this, selector, resultSelector, thisArg).merge(1);
};

I am also wondering, since flatMap is such a popular operator if it would make sense to also give it the perf treatment just like map and filter.

@xgrommx
Copy link
Contributor Author

xgrommx commented Jul 1, 2015

@paulpdaniels Awesome! Maybe it will be better way for all implementation of flatMap. All tests returned that was expected.

@xgrommx
Copy link
Contributor Author

xgrommx commented Jul 1, 2015

@paulpdaniels I was checked your implementations and they still working =)

@xgrommx
Copy link
Contributor Author

xgrommx commented Jul 3, 2015

Hi @mattpodwysocki. I could prepare pull request with the refactoring of flatMap.

@xgrommx
Copy link
Contributor Author

xgrommx commented Jul 3, 2015

Also I think mergeAll could be replaced with merge(Infinity)

@paulpdaniels
Copy link
Contributor

@xgrommx I actually started implementing this as part of a perf upgrade of flatMap. Not sure which version I should be targeting though v3 or a patch for 2.5.x

@mattpodwysocki
Copy link
Member

Since @paulpdaniels has created this PR, I think we're done here.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants