Observables are similar to iterables in that they yield values until either an Exception gets raised or the sequence completes. But in contrast to iterables one can not explicitly request an Observable to yield the next value but the next value will be yielded when it is generated.
Thus a good example for an Observable is an event. Every time the event gets raised, the Observable produces its next value and sends it down the pipeline.
The power of RxPython comes from the fact that it provides a set of Operators to create, merge, split and await Observables. The operators are named like SQL keywords on purpouse to make the library easier to use and understand.
In the following documentation the operators are split up into specific domains like aggregation, blocking and multiple to make the overall navigation easier.
Some most of the names of the operators are directly ported from the C# original implementation, some were renamed to integrate with the python native names more directly. It is clear that many of the things are not implemented in the most ideomatic way. This is just because the first milestone was to get the whole library working. Further optimisations can be made later.
Represents an Observable that calls subscribe every time an Observer subscribes.
The observer will be passed as parameter to subscribe.
subscribe should return a Disposable.
Represents an Observable that can be connected and disconnected.
Returns the result of subscribing to subject.
Subscribes subject to source and returns Disposable to cancel the connection.
Represents an Observable that has a key. Most likely it resulted from a groupby call.
The key of this Observable.
Aggregates the values of the Observable. When the source completes, resultSelector(accumulation) is yielded as next value.
Yields True if predicate(value) == True for all values.
Yields True if predicate(value) == True for any value.
Yields the average value of selector(value) for all values.
Yields True if equals(value, onNextValue) == True for any value.
Yields how often predicate(value) == True.
Yields the value at index index or completes exceptionally with IndexError.
Yields the value at index index or default.
Yields the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").
Yields the first value where predicate(value) == True or default.
Yields True if the current Observable contains no values.
Yields the last value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").
Yields the last value or default.
Yields the maximum value. The maximum value is the value where compareTo(value, currentMax) returns 1 at the end or completes exceptionally with InvalidOperationException("No elements in observable").
Yields the maximum value. The maximum value is the value where compareTo(keySelector(value), currentMax) returns 1 at the end.
Yields the minimum value. The minimum value is the value where compareTo(value, currentMin) returns -1 at the end or completes exceptionally with InvalidOperationException("No elements in observable").
Yields the minimum value. The minimum value is the value where compareTo(keySelector(value), currentMin) returns -1 at the end.
Yields True if both Observables yield the same values in the same order and complete.
Yields the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable") if no value was found or with InvalidOperationException("More than one element in observable") if more than one value was found.
Yields the first value where predicate(value) == True, default if no value was found or completes exceptionally with InvalidOperationException("More than one element in observable") if more than one value was found.
Yields the sum of selector(value).
Yields a dict having every value inserted as dict[keySelector(value)] = elementSelector(value).
Completes exceptionally with Exception("Duplicate key: ...") if multiple values have the same key.
Yields a list containing all values.
Returns a ConnectableObservable that on connect causes the current Observable to push values into subject.
Returns an Observable sequence that contains the values of a sequence produced by multicasting the current Observable within the selector function.
Equivalent to:
if initialValue == None:
return self.multicast(Subject())
else:
return self.multicast(BehaviorSubject(intialValue))
Equivalent to:
if initialValue == None:
return self.multicastIndividual(lambda: Subject(), selector)
else:
return self.multicastIndividual(lambda: BehaviorSubject(initialValue), selector)
Equivalent to:
if selector == None:
return self.multicast(AsyncSubject())
else:
return self.multicastIndividual(lambda: AsyncSubject(), selector)
Returns an Observable sequence that stays connected to the current Observable as long as there is at least one subscription to the Observable sequence.
Note
Can only be used on a ConnectableObservable.
The initial accumulator is getInitialCollector().
On every value accumulator = merge(accumulator, value) is called.
Returns an iterable whos next value is the current accumulator which then gets replaced by getNowCollector(accumulator).
Returns the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").
Returns the first value where predicate(value) == True or default.
Calls onNext(value) for every value in the sequence. Blocks until the sequence ends.
Calls onNext(value, index) for every value in the sequence. Blocks until the sequence ends.
Returns the last value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").
Returns the last value where predicate(value) == True or default.
Returns an iterator that blocks for the next values but in contrast to getIterator() does not buffer values.
Which means that the iterator returns the value that arrived latest but it will not return a value twice.
Returns an iterator that returns values even if no new values have arrived. It is a sampling iterator.
This means that the iterator can yield duplicates.
Returns an iterator that blocks until the next value arrives.
If values arrive before the iterator moves to the next value, they will be dropped. next() only starts waiting for the next value after the iterator requests for it.
Yields the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable") if no value was found or with InvalidOperationException("More than one element in observable") if more than one value was found.
Yields the first value where predicate(value) == True, default if no value was found or completes exceptionally with InvalidOperationException("More than one element in observable") if more than one value was found.
Schedules subscriptions to the current Observable immediately on scheduler.
Whenever an onNext, onError, or onCompleted event happens, the invocation of the corresponding function on all observers is scheduled on scheduler.
Whenever an onNext, onError, or onCompleted event happens, the invocation of the corresponding function on all observers is synchronized with gate.
If gate == None then gate = RLock().
The synchronisation guarantees that only a single observer sees an onNext, onError, or onComplete call at the same time.
Returns an Observable that calls subscribe every time an Observer subscribes.
The observer will be passed as parameter to subscribe.
subscribe should return a Disposable.
Returns an Observable that subscribes observers to the Observable returned by observableFactory.
It calls observableFactory every time a subscription happens.
Returns an Observable that instantly completes on every subscription.
Returns an Observable who represents the following generator:
currentState = initialState
while condition(currentState):
yield resultSelector(currentState)
currentState = iterate(currentState)
The values are scheduled on scheduler.
Returns an Observable that has no values and never completes.
Returns an Observable that yields count values beginning from start and then completes.
The values are scheduled on scheduler.
Returns an Observable that yields value for count times and then completes.
If count == None value gets yielded indefinetly.
The values are scheduled on scheduler.
Returns an Observable that yields value and then completes.
The value is scheduled on scheduler.
Returns an Observable that yields the result of action() scheduled for execution on scheduler.
Returns an Observable that completes exceptionally with exception.
The exception is scheduled on scheduler.
Returns an Observable that whenever an observer subscribes, calls resourceFactory() then observableFactory(resource) and subscribes the observer to the Observable returned by observableFactory
The resource returned by resourceFactory must have a dispose method that is invoked once the Observable returned by observableFactory has completed.
Returns an Observable that yields the value or exception of future. If future is canceled the Observable completes exceptionally with Exception(“Future was cancelled”).
Returns an Observable that yields all values from iterable on scheduler.
Returns an Observable that calls addHandler(onNext) when the first Observer subscribes. Further subscriber share the same underlying handler.
When the last subscriber unsubscribes, removeHandler(onNext) is called.
Subscribes Observer to sources[selector()]. If the key returned by selector() is not found in sources then the observer is subscribed to defaultSource.
If defaultSource == None then defaultSource = Observable.empty(scheduler).
Concatenates the Observable sequences obtained by running the resultSelector for each element in the given iterable.
Subscribes Observer to thenSource if condition() returns True otherwise to elseSource.
If elseSource == None then elseSource = Observable.empty(scheduler).
Repeats source as long as condition() returns True and at least once. condition() is checked whenever source completes.
Repeats source as long as condition() returns True. condition() is checked whenever source completes.
Subscribes Observer to the first Observable that yields a value including self.
See amb().
Continues an Observable that is terminated by an exception that is an instance of exceptionType with the Observable produced by handler(exception).
Continues an Observable that is terminated by an exception with the next Observable.
Concatenates self with all sources.
See concat().
Whenever a value (leftValue) is yielded from the current Observable, a new Observable sequence is created. All previously arrived rightValues are replayed on the new Observable.
Whenever a value (rightValue) is yielded from right it is yielded on all Observable sequences created from leftValues.
Observables created from leftValue will complete after leftDurationSelector(leftValue) yields the first value or completes normally.
rightValue values are remembered until rightDurationSelector(rightValue) yields the first value or completes normally.
Whenever a value (leftValue) is yielded from the current Observable, resultSelector(leftValue, rightValue) is invoked for all previousely arrived values on right.
Whenever a value (rightValue) is yielded from right, resultSelector(leftValue, rightValue) is invoked for all previousely arrived values on the current Observable.
leftValue values are remembered until leftDurationSelector(leftValue) yields the first value or completes normally.
rightValue values are remembered until rightDurationSelector(rightValue) yields the first value or completes normally.
Merges all Observable values in an Observable.
If maxConcurrency > 0 then maxConcurrency events can happen at the same time.
Continues an Observable that is terminated normally or by an exception with the next Observable.
Merges onNextObservable, onErrorObservable and onCompletedObservable respectively whenever the corresponding event happens.
Skips values until other yields the first value or completes.
Skips values until the timer created on scheduler completes after time.
Transforms an Observable of Observable values into an Observable producing values only from the most recent Observable.
Takes values until other yields the first value or completes.
Takes values until the timer created on scheduler completes after time.
Merges all Observable into one observable sequence by combining their elements in a pairwise fashion.
See zip()
Hides the original type of the Observable.
Buffers count values and yields them as list. Creates a new buffer every skip values.
Yields default if the current Observable is empty.
Turns an Observable of Notification values into and Observable representing this notifications.
Yields distinct values of the current Observable:
1 1 2 1 3 -> 1 2 3
Yields distinct values of the current Observable until the value changes:
1 1 2 1 3 -> 1 2 1 3
Invoke onNext on each value, onError on exception and onComplete on completion of the Observable.
Invokes action if the Observable completes normally or exceptionally.
Yields GroupedObservable sequences that yield elementSelector(value) for all values where keySelector(value) matches their key.
Yields GroupedObservable sequences that yield elementSelector(value) for all values where keySelector(value) matches their key.
Every GroupedObservable completes when the Observable returned by durationSelector(key) yields the first value or completes normally.
Ignores all values from the original Observable.
Turns values, exception and completion into Notification values. Completes when the original Observable completes normally or exceptionally.
Yields all values from the current Observable where isinstance(value, tpe) == True.
Repeats the original Observable count times.
Retries the original Observable count times until it does not complete exceptionally.
Applies result = accumulator(result, value) over the values of the Observable and yields each intermediate result
Yields selector(value) for all values in the current Observable.
Yields selector(value, index) for all values in the current Observable.
Merges the Observable sequence returned by onNext(value), onError(exception) and onCompleted().
Merges the Observable sequence returned by onNext(value, index), onError(exception, index) and onCompleted().
Skips count values.
Skips values while predicate(value) == True.
Skips values while predicate(value, index) == True.
Skips the last count values.
Prepends values to the Observable.
Takes count values.
Takes values while predicate(value) == True.
Takes values while predicate(value, index) == True.
Takes the last count values.
Takes the last count values and yields them as list.
Yields only values where predicate(value) == True.
Yields only values where predicate(value, index) == True.
Yields an Observable every skip values that yields it self the next count values.
Buffers values for timeSpan. Creates a new buffer every timeShift. Uses scheduler to create timers.
Buffers values for timeSpan or until count many arrived. Uses scheduler to create timers.
Delays all values and normal completion for dueTime. All events are scheduled on scheduler.
Delays an Observable until dueTime. The time from now until dueTime is recorded and all values and normal completion is delayed as in delayRelative(). All events are scheduled on scheduler.
Delays subscription until subscriptionDelayObservable yields the first value or completes normally.
If subscriptionDelayObservable == None subscription happens immediately.
All values are delayed until the Observable returned by delayDurationSelector(value) yields the first value or completes normally.
Delays subscription to the current observable for dueTime and schedules it on scheduler.
Delays subscription to the current observable to dueTime and schedules it on scheduler.
Returns an Observable who represents the following generator:
currentState = initialState
while condition(currentState):
yield resultSelector(currentState)
currentState = iterate(currentState)
but whose values are delayed for timeSelector(value) time.
Returns an Observable who represents the following generator:
currentState = initialState
while condition(currentState):
yield resultSelector(currentState)
currentState = iterate(currentState)
but whose values are delayed to time timeSelector(value).
Returns an Observable sequence that yields an increasing index every period.
Values are scheduled on scheduler.
Yields the latest value every interval. Can yield duplicates.
Uses scheduler to create timers.
Yields the latest value whenever sampler yields a value.
Skips values for time. scheduler is required to create a timer.
Skips values starting time before the Observable completes. Values are yielded on scheduler.
Takes values for time. scheduler is required to create a timer.
Takes values starting time before the Observable completes. Values are yielded on scheduler.
Takes values starting time before the Observable completes and yields them as list
Ignores values which are followed by another value before dueTime elapsed.
Ignores values which are followed by another value before the Observable returned by durationSelector(value) yields the first value or completes normally.
Yields { value = value, interval = scheduler.now() - lastArrivalTime }
Returns the current Observable sequence or other if the current Observable sequence did not yield any value nor complete until dueTime elapsed.
Returns the current Observable sequence or other if the current Observable sequence did not yield any value nor complete until dueTime.
Applies a timeout policy to the observable sequence based on an initial timeout duration for the first element, and a timeout duration computed for each subsequent element.
If the next element isn’t received within the computed duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on.
Creates an Observable sequence that yields 0 after dueTime and the completes.
If period != None then the Observable does not complete but yield 1, 2, ... every period.
Creates an Observable sequence that yields 0 at dueTime and the completes.
If period != None then the Observable does not complete but yield 1, 2, ... every period.
Yields { value = value, timestamp = scheduler.now() }
Yields an Observable every timeShift that yields it self the next values for timeSpan.
Yields Observable values that yield the values from the current Observable for timeSpan or until count number of values have arrived.