Welcome to CSharp Labs

A Generic MessagePump in C#

Tuesday, June 4, 2013

Every .NET application that calls Application.Run, starts a message pump to process Windows messages. Underneath it all, is a simple loop calling API functions GetMessage, TranslateMessage and DispatchMessage. I have applied the same principle to create a custom message pumping class that can process messages of an arbitrary type.

How it Works

I decided to create a central MessagePump class and started by following the ISynchronizeInvoke declarations (ISynchronizeInvoke.Invoke, ISynchronizeInvoke.BeginInvoke) in a new interface with broader method signatures. The most complex of which would allow a new message to be queued on a specified message processor:

    /// <summary>
    /// Provides a way to synchronously or asynchronously execute a message. 
    /// </summary>
    public interface ISynchronizedMessagePump
    {
        /// <summary>
        /// Gets a value indicating whether the caller can bypass the queue to execute a message.
        /// </summary>
        bool InvokeRequired { get; }

        /// <summary>
        /// Queues a message to the internal message pump thread to be asynchronously executed on the specified <see cref="T:System.Threading.IMessageProcessor`2{TMessage, TResult}"/>. 
        /// </summary>
        /// <param name="processor">The processor to execute the message on.</param>
        /// <param name="message">A message to add to the end of the queue.</param>
        /// <param name="enqueue">true to add the message to queue the message; false to immediately execute the message if <see cref="InvokeRequired"/> is false.</param>
        /// <returns>An <see cref="IAsyncResult"/> interface that represents the asynchronous operation started by calling this method.</returns>
        IAsyncResult BeginInvoke(object processor, object message, bool enqueue);

        /// <summary>
        /// Waits until the process started by calling <see cref="BeginInvoke"/> completes, and then returns the value generated by the process. 
        /// </summary>
        /// <param name="result">An <see cref="IAsyncResult"/> interface that represents the asynchronous operation started by calling <see cref="BeginInvoke"/>.</param>
        /// <returns>An Object that represents the return value generated by the asynchronous operation.</returns>
        object EndInvoke(IAsyncResult result);

        /// <summary>
        /// Synchronously queues a message to the internal message pump thread to be executed on the specified <see cref="T:System.Threading.IMessageProcessor`2{TMessage, TResult}"/> and marshals the call to the creating thread.
        /// </summary>
        /// <param name="processor">The processor to execute the message on.</param>
        /// <param name="message">A message to add to the end of the queue.</param>
        /// <param name="enqueue">true to add the message to queue the message; false to immediately execute the message if <see cref="InvokeRequired"/> is false.</param>
        /// <returns>An Object that represents the return value from the message being added, or null if the message has no return value.</returns>
        object Invoke(object processor, object message, bool enqueue);

        ...
    }

The MessagePump class is defined with two generic type parameters: the type of messages to process and the resulting type of a processed message. Messages are added to a ConcurrentQueue, .NET's thread-safe "first in-first out" collection, by calling either MessagePump.BeginInvoke or MessagePump.Invoke from any thread. The private queue contains instances of SynchronizedMessagePumpAsyncResult, implementations of IAsyncResult.

The loop procedure begins when the MessagePump class is initialized and a new thread is created that waits for a signal from a WaitHandle. When a new message is added to the queue, the WaitHandle is signaled and the loop begins processing messages on their respective IMessageProcessor:

        /// <summary>
        /// Executes messages until <see cref="Dispose"/> is called.
        /// </summary>
        private void RunMessageLoop()
        {
            try
            {
                for (; ; ) //loop
                {
                    //if no messages, wait for signal
                    if (_Messages.Count == 0)
                        _Reset.WaitOne();

                    try
                    {
                        do
                        {
                            SynchronizedMessagePumpAsyncResult msg;
                            //attempt to dequeue next message
                            if (_Messages.TryDequeue(out msg))
                            {
                                TMessage message = msg.Message;

                                try
                                {
                                    //execute the message and set the result
                                    msg.Result = (TResult)msg.Processor.Execute(message);
                                }
                                catch (Exception ex) //catch all exceptions (thrown when accessing result)
                                {
                                    //set exception property
                                    msg.Exception = ex;
                                }
                                finally
                                {
                                    //signals WaitHandle
                                    msg.IsCompleted = true;
                                }
                            }
                        }
                        while (_Messages.Count > 0); //loop while messages
                    }
                    finally
                    {
                        //allows Pulse method to signal WaitHandle
                        lock (_SyncObj)
                            _Active = false;
                    }

                    //break if disposing
                    lock (_DisposingSyncObj)
                        if (_Disposing)
                            break;
                }
            }
            finally
            {
                _Reset.Close();
                _Reset.Dispose();
            }
        }

The IMessageProcessor interface provides a way to execute and process a message. A class which implements this interface can be passed into the MessagePump class constructor or invoke methods. You can create any type of processor with the MessagePump, here is a simple processor which uses an event to consume messages:

    /// <summary>
    /// Provides a way to consume messages through an event handler.
    /// </summary>
    /// <typeparam name="TMessage">The type of message to process.</typeparam>
    /// <typeparam name="TResult">The type of result to return.</typeparam>
    public sealed class EventConsumableProcessor<TMessage, TResult> : IMessageProcessor<TMessage, TResult>
    {
        /// <summary>
        /// Occurs when a message is executed.
        /// </summary>
        public event EventHandler<MessageExecutingEventArgs<TMessage, TResult>> ExecuteMessage;

        /// <summary>
        /// Executes the specified message.
        /// </summary>
        /// <param name="message">The message to execute.</param>
        /// <returns>The executed message result or null.</returns>
        TResult IMessageProcessor<TMessage, TResult>.Execute(TMessage message)
        {
            if (ExecuteMessage != null)
            {
                //create the event data
                var result = new MessageExecutingEventArgs<TMessage, TResult>(message);

                //execute message
                ExecuteMessage(this, result);

                //return result
                return result.Result;
            }
           
            return default(TResult);
        }
    }
Using

Actual implementation may vary depending on your needs. Here is an example which synchronizes writing data to a stream and returning the position data located at:

            /// <summary>
            /// Message pump to synchronize data access.
            /// </summary>
            private MessagePump<byte[], long> _DataPump;
            /// <summary>
            /// Underlying stream to store data on.
            /// </summary>
            private MemoryStream _DataStream = new MemoryStream();

            private void Initialize()
            {
                //initialize a processor
                EventConsumableProcessor<byte[], long> processor = new EventConsumableProcessor<byte[], long>();

                //wire an event handler
                processor.ExecuteMessage += (sender, e) =>
                {
                    long position = _DataStream.Position; //get the start position
                    _DataStream.Write(e.Message, 0, e.Message.Length); //write data
                    e.Result = position; //set result to initial position
                };

                //initialize MessagePump with the processor
                _DataPump = new MessagePump<byte[], long>(processor);
            }

Data can be added asynchronously or synchronously to the stream:

            /// <summary>
            /// Asynchronously queues data to be added to the stream.
            /// </summary>
            /// <param name="data">The data to add.</param>
            /// <returns>An <see cref="IAsyncResult"/> interface that represents the asynchronous operation started by calling this method.</returns>
            public IAsyncResult AsynchronouslyAddData(byte[] data)
            {
                return _DataPump.BeginInvoke(data);
            }

            /// <summary>
            /// Synchronously adds data to the stream and returns the data position.
            /// </summary>
            /// <param name="data">The data to add.</param>
            /// <returns>The position in the stream the data is stored at.</returns>
            public long SynchronouslyAddData(byte[] data)
            {
                return _DataPump.Invoke(data);
            }

This could be expanded upon to allow reading of data as well, providing a thread-safe data pump.

Download MessagePump and supporting classes

Comments