observable

Observables are similar to iterables in that they yield values until either an Exception gets raised or the sequence completes. But in contrast to iterables one can not explicitly request an Observable to yield the next value but the next value will be yielded when it is generated.

Thus a good example for an Observable is an event. Every time the event gets raised, the Observable produces its next value and sends it down the pipeline.

The power of RxPython comes from the fact that it provides a set of Operators to create, merge, split and await Observables. The operators are named like SQL keywords on purpouse to make the library easier to use and understand.

In the following documentation the operators are split up into specific domains like aggregation, blocking and multiple to make the overall navigation easier.

Some most of the names of the operators are directly ported from the C# original implementation, some were renamed to integrate with the python native names more directly. It is clear that many of the things are not implemented in the most ideomatic way. This is just because the first milestone was to get the whole library working. Further optimisations can be made later.

class rx.observable.Observable
subscribe(observer)
subscribe([onNext=None, onError=None, onCompleted=None])

If no observer is provided but instead any of the methods onNext, onError, or onCompleted an anonymous Observer will be created.

Subscribes the observer for the sequence.

class rx.observable.AnonymouseObservable(subscribe)

Represents an Observable that calls subscribe every time an Observer subscribes.

The observer will be passed as parameter to subscribe.

subscribe should return a Disposable.

class rx.observable.ConnectableObservable(source, subject)

Represents an Observable that can be connected and disconnected.

subscribe(observer)
subscribe([onNext=None, onError=None, onCompleted=None])

Returns the result of subscribing to subject.

connect()

Subscribes subject to source and returns Disposable to cancel the connection.

class rx.observable.GroupObservable

Represents an Observable that has a key. Most likely it resulted from a groupby call.

key

The key of this Observable.

Aggregation

class rx.observable.Observable
aggregate(seed, accumulator[, resultSelector=identity])

Aggregates the values of the Observable. When the source completes, resultSelector(accumulation) is yielded as next value.

all(predicate)

Yields True if predicate(value) == True for all values.

any([predicate=truePredicate])

Yields True if predicate(value) == True for any value.

average([selector=identity])

Yields the average value of selector(value) for all values.

contains(value[, equals=defaultEquals])

Yields True if equals(value, onNextValue) == True for any value.

count([predicate=truePredicate])

Yields how often predicate(value) == True.

elementAt(index)

Yields the value at index index or completes exceptionally with IndexError.

elementAtOrDefault(index[, default=None])

Yields the value at index index or default.

firstAsync([predicate=truePredicate])

Yields the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").

firstAsyncOrDefault([predicate=truePredicate, default=None])

Yields the first value where predicate(value) == True or default.

isEmpty()

Yields True if the current Observable contains no values.

lastAsync([predicate=truePredicate])

Yields the last value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").

lastAsyncOrDefault([predicate=truePredicate, default=None])

Yields the last value or default.

max([compareTo=defaultCompareTo])

Yields the maximum value. The maximum value is the value where compareTo(value, currentMax) returns 1 at the end or completes exceptionally with InvalidOperationException("No elements in observable").

maxBy(keySelector[, compareTo=defaultCompareTo])

Yields the maximum value. The maximum value is the value where compareTo(keySelector(value), currentMax) returns 1 at the end.

min([compareTo=defaultCompareTo])

Yields the minimum value. The minimum value is the value where compareTo(value, currentMin) returns -1 at the end or completes exceptionally with InvalidOperationException("No elements in observable").

minBy(keySelector[, compareTo=defaultCompareTo])

Yields the minimum value. The minimum value is the value where compareTo(keySelector(value), currentMin) returns -1 at the end.

sequenceEqual(other[, equals=defaultEquals])

Yields True if both Observables yield the same values in the same order and complete.

singleAsync([predicate=truePredicate])

Yields the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable") if no value was found or with InvalidOperationException("More than one element in observable") if more than one value was found.

singleAsyncOrDefault([predicate=truePredicate, default=None])

Yields the first value where predicate(value) == True, default if no value was found or completes exceptionally with InvalidOperationException("More than one element in observable") if more than one value was found.

sum([selector=identity])

Yields the sum of selector(value).

toDictionary([keySelector=identity, elementSelector=identity])

Yields a dict having every value inserted as dict[keySelector(value)] = elementSelector(value).

Completes exceptionally with Exception("Duplicate key: ...") if multiple values have the same key.

toList()

Yields a list containing all values.

Binding

class rx.observable.Observable
multicast(subject)

Returns a ConnectableObservable that on connect causes the current Observable to push values into subject.

multicastIndividual(subjectSelector, selector)

Returns an Observable sequence that contains the values of a sequence produced by multicasting the current Observable within the selector function.

publish([initialValue=None])

Equivalent to:

if initialValue == None:
    return self.multicast(Subject())
else:
    return self.multicast(BehaviorSubject(intialValue))
publishIndividual(selector[, initialValue=None])

Equivalent to:

if initialValue == None:
    return self.multicastIndividual(lambda: Subject(), selector)
else:
    return self.multicastIndividual(lambda: BehaviorSubject(initialValue), selector)
publishLast([selector=None])

Equivalent to:

if selector == None:
    return self.multicast(AsyncSubject())
else:
    return self.multicastIndividual(lambda: AsyncSubject(), selector)
refCount()

Returns an Observable sequence that stays connected to the current Observable as long as there is at least one subscription to the Observable sequence.

Note

Can only be used on a ConnectableObservable.

replay([bufferSize=sys.maxsize, window=sys.maxsize, selector=None, scheduler=Scheduler.currentThread])
If selector == None
Returns a ConnectableObservable that shares a single subscription to the current Observable replaying bufferSize notifications that are not older than window.
Else
Returns an Observable sequence that is the result of invoking selector on a ConnectableObservable that shares a single subscription to the current Observable replaying bufferSize notifications that are not older than window.

Blocking

class rx.observable.Observable
collect(getInitialCollector, merge[, getNewCollector=lambda _: getInitialCollector()])

The initial accumulator is getInitialCollector().

On every value accumulator = merge(accumulator, value) is called.

Returns an iterable whos next value is the current accumulator which then gets replaced by getNowCollector(accumulator).

first([predicate=truePredicate])

Returns the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").

firstOrDefault([predicate=truePredicate, default=None])

Returns the first value where predicate(value) == True or default.

forEach(onNext)

Calls onNext(value) for every value in the sequence. Blocks until the sequence ends.

forEachEnumerate(onNext)

Calls onNext(value, index) for every value in the sequence. Blocks until the sequence ends.

getIterator()
__iter__()

Returns an iterator that yields all values of the sequence.

last([predicate=truePredicate])

Returns the last value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable").

lastOrDefault([predicate=truePredicate, default=None])

Returns the last value where predicate(value) == True or default.

latest()

Returns an iterator that blocks for the next values but in contrast to getIterator() does not buffer values.

Which means that the iterator returns the value that arrived latest but it will not return a value twice.

mostRecent(initialValue)

Returns an iterator that returns values even if no new values have arrived. It is a sampling iterator.

This means that the iterator can yield duplicates.

next()

Returns an iterator that blocks until the next value arrives.

If values arrive before the iterator moves to the next value, they will be dropped. next() only starts waiting for the next value after the iterator requests for it.

singleAsync([predicate=truePredicate])

Yields the first value where predicate(value) == True or completes exceptionally with InvalidOperationException("No elements in observable") if no value was found or with InvalidOperationException("More than one element in observable") if more than one value was found.

singleAsyncOrDefault([predicate=truePredicate, default=None])

Yields the first value where predicate(value) == True, default if no value was found or completes exceptionally with InvalidOperationException("More than one element in observable") if more than one value was found.

wait()

Is a synonym for last()

Concurrent

class rx.observable.Observable
subscribeOn(scheduler)

Schedules subscriptions to the current Observable immediately on scheduler.

observeOn(scheduler)

Whenever an onNext, onError, or onCompleted event happens, the invocation of the corresponding function on all observers is scheduled on scheduler.

synchronize([gate=None])

Whenever an onNext, onError, or onCompleted event happens, the invocation of the corresponding function on all observers is synchronized with gate.

If gate == None then gate = RLock().

The synchronisation guarantees that only a single observer sees an onNext, onError, or onComplete call at the same time.

Creation

class rx.observable.Observable
static create(subscribe)

Returns an Observable that calls subscribe every time an Observer subscribes.

The observer will be passed as parameter to subscribe.

subscribe should return a Disposable.

static defer(observableFactory)

Returns an Observable that subscribes observers to the Observable returned by observableFactory.

It calls observableFactory every time a subscription happens.

static empty()

Returns an Observable that instantly completes on every subscription.

static generate(initialState, condition, iterate, resultSelector[, scheduler=Scheduler.iteration])

Returns an Observable who represents the following generator:

currentState = initialState

while condition(currentState):
        yield resultSelector(currentState)
        currentState = iterate(currentState)

The values are scheduled on scheduler.

static never()

Returns an Observable that has no values and never completes.

static range(start, count[, scheduler=Scheduler.iteration])

Returns an Observable that yields count values beginning from start and then completes.

The values are scheduled on scheduler.

static repeatValue(value[, count=None, scheduler=Scheduler.iteration])

Returns an Observable that yields value for count times and then completes.

If count == None value gets yielded indefinetly.

The values are scheduled on scheduler.

static returnValue(value[, scheduler=Scheduler.constantTimeOperations])

Returns an Observable that yields value and then completes.

The value is scheduled on scheduler.

static start(action[, scheduler=Scheduler.default])

Returns an Observable that yields the result of action() scheduled for execution on scheduler.

static throw(exception[, scheduler=Scheduler.constantTimeOperations])

Returns an Observable that completes exceptionally with exception.

The exception is scheduled on scheduler.

static using(resourceFactory, observableFactory)

Returns an Observable that whenever an observer subscribes, calls resourceFactory() then observableFactory(resource) and subscribes the observer to the Observable returned by observableFactory

The resource returned by resourceFactory must have a dispose method that is invoked once the Observable returned by observableFactory has completed.

static fromFuture(future)

Returns an Observable that yields the value or exception of future. If future is canceled the Observable completes exceptionally with Exception(“Future was cancelled”).

static fromIterable(iterable[, scheduler=Scheduler.default])

Returns an Observable that yields all values from iterable on scheduler.

static fromEvent(addHandler, removeHandler[, scheduler=Scheduler.default])

Returns an Observable that calls addHandler(onNext) when the first Observer subscribes. Further subscriber share the same underlying handler.

When the last subscriber unsubscribes, removeHandler(onNext) is called.

Imperative

class rx.observable.Observable
static case(selector, sources[, defaultSource=None])
static case(selector, sources[, scheduler=Scheduler.constantTimeOperations])

Subscribes Observer to sources[selector()]. If the key returned by selector() is not found in sources then the observer is subscribed to defaultSource.

If defaultSource == None then defaultSource = Observable.empty(scheduler).

static iterableFor(iterable, resultSelector)

Concatenates the Observable sequences obtained by running the resultSelector for each element in the given iterable.

static branch(condition, thenSource[, elseSource=None])
static branch(condition, thenSource[, scheduler=Scheduler.constantTimeOperations])

Subscribes Observer to thenSource if condition() returns True otherwise to elseSource.

If elseSource == None then elseSource = Observable.empty(scheduler).

doWhile(condition, source)

Repeats source as long as condition() returns True and at least once. condition() is checked whenever source completes.

loop(condition, source)

Repeats source as long as condition() returns True. condition() is checked whenever source completes.

Multiple

class rx.observable.Observable
amb(*others)

Subscribes Observer to the first Observable that yields a value including self.

static amb(first, *others)

See amb().

catchException(handler[, exceptionType=Exception])

Continues an Observable that is terminated by an exception that is an instance of exceptionType with the Observable produced by handler(exception).

catchFallback(*sources)

Continues an Observable that is terminated by an exception with the next Observable.

concat(*sources)

Concatenates self with all sources.

static concat(*sources)

See concat().

groupJoin(right, leftDurationSelector, rightDurationSelector, resultSelector)

Whenever a value (leftValue) is yielded from the current Observable, a new Observable sequence is created. All previously arrived rightValues are replayed on the new Observable.

Whenever a value (rightValue) is yielded from right it is yielded on all Observable sequences created from leftValues.

Observables created from leftValue will complete after leftDurationSelector(leftValue) yields the first value or completes normally.

rightValue values are remembered until rightDurationSelector(rightValue) yields the first value or completes normally.

join(right, leftDurationSelector, rightDurationSelector, resultSelector)

Whenever a value (leftValue) is yielded from the current Observable, resultSelector(leftValue, rightValue) is invoked for all previousely arrived values on right.

Whenever a value (rightValue) is yielded from right, resultSelector(leftValue, rightValue) is invoked for all previousely arrived values on the current Observable.

leftValue values are remembered until leftDurationSelector(leftValue) yields the first value or completes normally.

rightValue values are remembered until rightDurationSelector(rightValue) yields the first value or completes normally.

merge([maxConcurrency=0])

Merges all Observable values in an Observable.

If maxConcurrency > 0 then maxConcurrency events can happen at the same time.

static onErrorResumeNext(*sources)

Continues an Observable that is terminated normally or by an exception with the next Observable.

selectMany(onNextObservable[, onErrorObservable=None, onCompletedObservable=None])

Merges onNextObservable, onErrorObservable and onCompletedObservable respectively whenever the corresponding event happens.

skipUntil(other)

Skips values until other yields the first value or completes.

skipUntil(time[, scheduler=Scheduler.timeBasedOperation])

Skips values until the timer created on scheduler completes after time.

switch()

Transforms an Observable of Observable values into an Observable producing values only from the most recent Observable.

takeUntil(other)

Takes values until other yields the first value or completes.

takeUntil(time[, scheduler=Scheduler.timeBasedOperation])

Takes values until the timer created on scheduler completes after time.

zip(*others[, resultSelector=lambda *x: x])

Merges all Observable into one observable sequence by combining their elements in a pairwise fashion.

static zip(*sources[, resultSelector=lambda *x: x])

See zip()

Single

class rx.observable.Observable
asObservable()

Hides the original type of the Observable.

buffer(count[, skip=count])

Buffers count values and yields them as list. Creates a new buffer every skip values.

defaultIfEmpty([default=None])

Yields default if the current Observable is empty.

dematerialize()

Turns an Observable of Notification values into and Observable representing this notifications.

distinct([keySelector=identity])

Yields distinct values of the current Observable:

1 1 2 1 3 -> 1 2 3
distinctUntilChanged([keySelector=identity, equals=defaultEquals])

Yields distinct values of the current Observable until the value changes:

1 1 2 1 3 -> 1 2 1 3
do([onNext=noop, onError=noop, onCompleted=noop])

Invoke onNext on each value, onError on exception and onComplete on completion of the Observable.

doFinally(action)

Invokes action if the Observable completes normally or exceptionally.

groupBy([keySelector=identity, elementSelector=identity])

Yields GroupedObservable sequences that yield elementSelector(value) for all values where keySelector(value) matches their key.

groupByUntil(keySelector, elementSelector, durationSelector)

Yields GroupedObservable sequences that yield elementSelector(value) for all values where keySelector(value) matches their key.

Every GroupedObservable completes when the Observable returned by durationSelector(key) yields the first value or completes normally.

ignoreElements()

Ignores all values from the original Observable.

materialize()

Turns values, exception and completion into Notification values. Completes when the original Observable completes normally or exceptionally.

ofType(tpe)

Yields all values from the current Observable where isinstance(value, tpe) == True.

repeatSelf([count=indefinite])

Repeats the original Observable count times.

retry([count=indefinite])

Retries the original Observable count times until it does not complete exceptionally.

scan([seed=None, accumulator=None])

Applies result = accumulator(result, value) over the values of the Observable and yields each intermediate result

select(selector)

Yields selector(value) for all values in the current Observable.

selectEnumerate(selector)

Yields selector(value, index) for all values in the current Observable.

selectMany(onNext[, onError=None, onCompleted=None])

Merges the Observable sequence returned by onNext(value), onError(exception) and onCompleted().

selectManyEnumerate(onNext[, onError=None, onCompleted=None])

Merges the Observable sequence returned by onNext(value, index), onError(exception, index) and onCompleted().

skip(count)

Skips count values.

skipWhile(predicate)

Skips values while predicate(value) == True.

skipWhileEnumerate(predicate)

Skips values while predicate(value, index) == True.

skipLast(count)

Skips the last count values.

startWith(*values)

Prepends values to the Observable.

take(count)

Takes count values.

takeWhile(predicate)

Takes values while predicate(value) == True.

takeWhileEnumerate(predicate)

Takes values while predicate(value, index) == True.

takeLast(count)

Takes the last count values.

takeLastBuffer(count)

Takes the last count values and yields them as list.

where(predicate)

Yields only values where predicate(value) == True.

whereEnumerate(predicate)

Yields only values where predicate(value, index) == True.

window(count[, skip=count])

Yields an Observable every skip values that yields it self the next count values.

Time

class rx.observable.Observable
bufferWithTime(timeSpan[, timeShift=timeSpan, scheduler=Scheduler.timeBasedOperation])

Buffers values for timeSpan. Creates a new buffer every timeShift. Uses scheduler to create timers.

bufferWithTimeAndCount(timeSpan, count[, scheduler=Scheduler.timeBasedOperation])

Buffers values for timeSpan or until count many arrived. Uses scheduler to create timers.

delayRelative(dueTime[, scheduler=Scheduler.timeBasedOperation])

Delays all values and normal completion for dueTime. All events are scheduled on scheduler.

delayAbsolute(dueTime[, scheduler=Scheduler.timeBasedOperation])

Delays an Observable until dueTime. The time from now until dueTime is recorded and all values and normal completion is delayed as in delayRelative(). All events are scheduled on scheduler.

delayIndividual(delayDurationSelector[, subscriptionDelayObservable=None])

Delays subscription until subscriptionDelayObservable yields the first value or completes normally.

If subscriptionDelayObservable == None subscription happens immediately.

All values are delayed until the Observable returned by delayDurationSelector(value) yields the first value or completes normally.

delaySubscriptionRelative(dueTime[, scheduler=Scheduler.timeBasedOperation])

Delays subscription to the current observable for dueTime and schedules it on scheduler.

delaySubscriptionAbsolute(dueTime[, scheduler=Scheduler.timeBasedOperation])

Delays subscription to the current observable to dueTime and schedules it on scheduler.

static generateRelative(initialState, condition, iterate, resultSelector, timeSelector[, scheduler=Scheduler.timeBasedOperation])

Returns an Observable who represents the following generator:

currentState = initialState

while condition(currentState):
        yield resultSelector(currentState)
        currentState = iterate(currentState)

but whose values are delayed for timeSelector(value) time.

static generateAbsolute(initialState, condition, iterate, resultSelector, timeSelector[, scheduler=Scheduler.timeBasedOperation])

Returns an Observable who represents the following generator:

currentState = initialState

while condition(currentState):
        yield resultSelector(currentState)
        currentState = iterate(currentState)

but whose values are delayed to time timeSelector(value).

static interval(period[, scheduler=Scheduler.timeBasedOperation])

Returns an Observable sequence that yields an increasing index every period.

Values are scheduled on scheduler.

sampleWithTime(interval[, scheduler=Scheduler.timeBasedOperation])

Yields the latest value every interval. Can yield duplicates.

Uses scheduler to create timers.

sampleWithObservable(sampler)

Yields the latest value whenever sampler yields a value.

skipWithTime(time[, scheduler=Scheduler.timeBasedOperation])

Skips values for time. scheduler is required to create a timer.

skipLastWithTime(time[, scheduler=Scheduler.timeBasedOperation])

Skips values starting time before the Observable completes. Values are yielded on scheduler.

takeWithTime(time[, scheduler=Scheduler.timeBasedOperation])

Takes values for time. scheduler is required to create a timer.

takeLastWithTime(time[, scheduler=Scheduler.timeBasedOperation])

Takes values starting time before the Observable completes. Values are yielded on scheduler.

takeLastBufferWithTime(time[, scheduler=Scheduler.timeBasedOperation])

Takes values starting time before the Observable completes and yields them as list

throttle(dueTime[, scheduler=Scheduler.timeBasedOperation])

Ignores values which are followed by another value before dueTime elapsed.

throttleIndividual(durationSelector)

Ignores values which are followed by another value before the Observable returned by durationSelector(value) yields the first value or completes normally.

timeInterval([scheduler=Scheduler.timeBasedOperation])

Yields { value = value, interval = scheduler.now() - lastArrivalTime }

timeoutRelative(dueTime[, other=Observable.throw(Exception("Timeout in observable")), scheduler=Scheduler.timeBasedOperation])

Returns the current Observable sequence or other if the current Observable sequence did not yield any value nor complete until dueTime elapsed.

timeoutAbsolute(dueTime[, other=Observable.throw(Exception("Timeout in observable")), scheduler=Scheduler.timeBasedOperation])

Returns the current Observable sequence or other if the current Observable sequence did not yield any value nor complete until dueTime.

timeoutIndividual(durationSelector[, firstTimeout=Observable.never(), other=Observable.throw(Exception("Timeout in observable")))

Applies a timeout policy to the observable sequence based on an initial timeout duration for the first element, and a timeout duration computed for each subsequent element.

If the next element isn’t received within the computed duration starting from its predecessor, the other observable sequence is used to produce future messages from that point on.

static timerRelative(dueTime[, period=None, scheduler=Scheduler.timeBasedOperation])

Creates an Observable sequence that yields 0 after dueTime and the completes.

If period != None then the Observable does not complete but yield 1, 2, ... every period.

static timerAbsolute(dueTime[, period=None, scheduler=Scheduler.timeBasedOperation])

Creates an Observable sequence that yields 0 at dueTime and the completes.

If period != None then the Observable does not complete but yield 1, 2, ... every period.

timestamp([scheduler=Scheduler.timeBasedOperation])

Yields { value = value, timestamp = scheduler.now() }

windowWithTime(timeSpan[, timeShift=timeSpan, scheduler=Scheduler.timeBasedOperation])

Yields an Observable every timeShift that yields it self the next values for timeSpan.

windowWithTimeAndCount(timeSpan, count[, scheduler=Scheduler.timeBasedOperation])

Yields Observable values that yield the values from the current Observable for timeSpan or until count number of values have arrived.

Table Of Contents

Previous topic

notification

Next topic

observer

This Page