linq.rx.js 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. // extension for RxJS
  2. (function (root) {
  3. if (root.Enumerable == null) {
  4. throw new Error("can't find Enumerable. linq.rx.js must load after linq.js");
  5. }
  6. if (root.Rx == null) {
  7. throw new Error("can't find Rx. linq.rx.js must load after RxJS");
  8. }
  9. var Enumerable = root.Enumerable;
  10. var Rx = root.Rx;
  11. Enumerable.prototype.toObservable = function (scheduler) {
  12. /// <summary>Converts an enumerable sequence to an observable sequence.</summary>
  13. /// <param type="Scheduler" name="scheduler" optional="true">Rx.Scheduler. Default is CurrentThread.</param>
  14. var source = this;
  15. if (scheduler == null) scheduler = Rx.Scheduler.currentThread;
  16. return Rx.Observable.createWithDisposable(function (observer) {
  17. var disposable = new Rx.SingleAssignmentDisposable();
  18. var enumerator = source.getEnumerator();
  19. var calledOnCompleted;
  20. var cancelable = scheduler.scheduleRecursive(function (self) {
  21. var hasNext = false;
  22. var current;
  23. try {
  24. if (disposable.isDisposed) return;
  25. hasNext = enumerator.moveNext();
  26. if (hasNext) current = enumerator.current();
  27. else enumerator.dispose();
  28. }
  29. catch (e) {
  30. try {
  31. enumerator.dispose();
  32. }
  33. finally {
  34. observer.onError(e);
  35. }
  36. return;
  37. }
  38. if (hasNext) {
  39. observer.onNext(current);
  40. self(); // loop
  41. }
  42. else {
  43. observer.onCompleted();
  44. }
  45. });
  46. disposable.disposable(cancelable);
  47. return disposable;
  48. });
  49. };
  50. Rx.Observable.prototype.toEnumerable = function () {
  51. /// <summary>Converts an observable sequence to an enumerable sequence. Notice:cold observable only.</summary>
  52. var obs = this;
  53. return Enumerable.defer(function () {
  54. var array = [];
  55. obs.subscribe(function (x) { array.push(x) }).dispose();
  56. return array;
  57. });
  58. };
  59. })(this);