I'm starting to use the Reactive Extensions (Rx) v2 and I'm having some trouble with
.Publish()
.
I have simplified test case in my WPF application, reduced to:
IObservable<Message> Messages;
MessageWatcherCancellationTokenSource = new CancellationTokenSource();
Messages = (from v in Observable.Interval(TimeSpan.FromSeconds(1)).Do(v => Debug.WriteLine("Interval {0}", v))
select new Message(ItemNumber.RespondTemperature, (byte)(15 + (10 * (v % 2)))));
Messages
.Where(m => m.ItemNumber == ItemNumber.RespondTemperature)
.ObserveOnDispatcher()
.Subscribe((Message m) => {
Debug.WriteLine("UpdateSequence " + m.ToString());
UpdateSequence(m.Payload);
}, MessageWatcherCancellationTokenSource.Token);
(Assigning to
Messages
and subscribing are adjacent
only for testing. They will not be in the final application.)
This works fine.
However, if I add
.Publish()
to the end of the assignment to
Messages
:
Messages = (from v in Observable.Interval(TimeSpan.FromSeconds(1)).Do(v => Debug.WriteLine("Interval {0}", v))
select new Message(ItemNumber.RespondTemperature, (byte)(15 + (10 * (v % 2))))).Publish();
Then
nothing happens. The only difference is the
.Publish()
The intent for the
Publish()
is that this will eventually be asynchronously received communications, parsed into
Message
instances, that can be subscribed throughout the application, each one filtering (
.Where(...)
) for the type(s) of Messages of interest for that "handler". So every subscriber must be able to see every Message.
I thought that is what Publish accomplished. Am I mistaken?
===========
Update: I added a call to
.Connect()
to the
Publish()
and that seems to have fixed it:
var published = (from v in Observable.Interval(TimeSpan.FromSeconds(1)).Do(v => Debug.WriteLine("Interval {0}", v))
select new Message(ItemNumber.RespondTemperature, (byte)(15 + (10 * (v % 2))))).Publish();
Messages = published;
PublishedMessages = published.Connect();
But I'm still not sure
why. I can find examples of subscriptions to
Publish()
ed sources that never use
Connect()
but still supposedly work.