Click here to Skip to main content
15,881,967 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
I'm starting to use the Reactive Extensions (Rx) v2 and I'm having some trouble with .Publish().
I have simplified test case in my WPF application, reduced to:
C#
// Class level: 
    IObservable<Message> Messages;

// in a button's ICommand.Execute() handler
        MessageWatcherCancellationTokenSource = new CancellationTokenSource();
        Messages = (from v in Observable.Interval(TimeSpan.FromSeconds(1)).Do(v => Debug.WriteLine("Interval {0}", v))
                    select new Message(ItemNumber.RespondTemperature, (byte)(15 + (10 * (v % 2)))));
        Messages
          .Where(m => m.ItemNumber == ItemNumber.RespondTemperature)
          .ObserveOnDispatcher()
          .Subscribe((Message m) => {
            Debug.WriteLine("UpdateSequence " + m.ToString());
            UpdateSequence(m.Payload);
          }, MessageWatcherCancellationTokenSource.Token);

(Assigning to Messages and subscribing are adjacent only for testing. They will not be in the final application.)

This works fine.

However, if I add .Publish() to the end of the assignment to Messages:
Messages = (from v in Observable.Interval(TimeSpan.FromSeconds(1)).Do(v => Debug.WriteLine("Interval {0}", v))
            select new Message(ItemNumber.RespondTemperature, (byte)(15 + (10 * (v % 2))))).Publish();

Then nothing happens. The only difference is the .Publish()

The intent for the Publish() is that this will eventually be asynchronously received communications, parsed into Message instances, that can be subscribed throughout the application, each one filtering (.Where(...)) for the type(s) of Messages of interest for that "handler". So every subscriber must be able to see every Message.
I thought that is what Publish accomplished. Am I mistaken?

===========
Update: I added a call to .Connect() to the Publish() and that seems to have fixed it:
C#
var published = (from v in Observable.Interval(TimeSpan.FromSeconds(1)).Do(v => Debug.WriteLine("Interval {0}", v))
                 select new Message(ItemNumber.RespondTemperature, (byte)(15 + (10 * (v % 2))))).Publish();
Messages = published;
PublishedMessages = published.Connect();  // keep track of the IDisposable for cleanup

But I'm still not sure why. I can find examples of subscriptions to Publish()ed sources that never use Connect() but still supposedly work.
Posted
Updated 18-Jan-16 10:36am
v2

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900