[C#] First steps into threading with C#, is this threadsafe?

Discussion in 'Mixed Languages' started by Stannieman, Oct 14, 2015.

  1. Stannieman

    Stannieman MDL Guru

    Sep 4, 2009
    2,232
    1,800
    90
    #1 Stannieman, Oct 14, 2015
    Last edited by a moderator: Apr 20, 2017
    Hi all.

    I'm writing a buffer/queue for 1 producer and multiple consumers.
    The catch is that, unlike with a standard BlockingQueue, all my consumers must read all bytes in the correct order. That's because different consumers use exactly the same input data do to different things.

    So I write a class that implements not only a buffer/queue, but also keeps a list of consumers and whether a consumer has already read what's currently in the buffer. Only when all consumers have read the producer can overwrite the buffer with new data, after which all consumers can read that again. So basically it's more a broadcaster than a buffer.
    If you want the queue to wait for a consumer to read, the consumer has to be registered first. If you deregister a consumer it will always get null returned when reading.

    Below is my code, it may be a bit over commented.
    Question 1: Is this thread safe.
    Question 2: Am I reinventing the wheel? Are there better ways to do it, like built in .NET stuff? If so I'd rather use the built in things, but then I'm still happy I've learned something new.

    Code:
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Collections;
    using System.Threading;
    
    namespace HashingTest
    {
        public class AllConsumersReadQueue
        {
            private ManualResetEventSlim _resetEventRead, _resetEventWrite;
            private byte[] _buffer;
            private IList<ConsumerReadStatus> _consumerStatusList;
            private bool _canWrite;
    
            public AllConsumersReadQueue()
            {
                _resetEventRead = new ManualResetEventSlim();
                _resetEventWrite = new ManualResetEventSlim();
                _consumerStatusList = new List<ConsumerReadStatus>();
                // At the beginning the queue is empty, and the consumer list knows
                // no consumer should read the current bytes in queue, so it's safe to write.
                _canWrite = true;
            }
    
            /**
                This method writes a new byte array into the queue.
            */
            public void addBytes(byte[] newBytes)
            {
                // If we can't write (not all consumers have read), let the sproducer wait.
                while (!_canWrite)
                {
                    _resetEventWrite.Wait();
                }
                // We can write, so lock the consumer list.
                lock (_consumerStatusList)
                {
                    // We can't write anymore after this write (consumers lust read first).
                    _canWrite = false;
                    // Tell the list that no consumer has read the new bytes yet.
                    foreach (ConsumerReadStatus status in _consumerStatusList)
                        status.HasRead = false;
                    // Set the new bytes and signal all consumers that they can start reading (we have new data now).
                    _buffer = newBytes;
                    _resetEventRead.Set();
                }
            }
    
            /**
               This method returns the bytes in the queue to a consumer.
           */
            public byte[] readBytes(IdentifiableConsumer consumer)
            {
                // If the consumer has no valid id (it's not registered) we return it null.
                if (consumer.Id < 0)
                    return null;
                else
                {
                    // Consumer is registered, if it can not read (has already read the current data) it must wait.
                    while (!_consumerStatusList[consumer.Id].CanRead)
                    {
                        _resetEventRead.Wait();
                    }
                    // We can read, so lock the list (required because we'll modify it).
                    lock (_consumerStatusList)
                    {
                        // Consumer could have been deregisterd when it was waiting, so we must check again.
                        if (consumer.Id >= 0)
                        {
                            // Tell the list that we have read the current bytes.
                            _consumerStatusList[consumer.Id].HasRead = true;
    
                            // This method will clear the write lock if all consumers have read.
                            ClearWriteLockOnAllRead();
    
                            return _buffer;
                        }
                        else
                            // Again we return null if the id unknown invalid.
                            return null;
                    }
                }
            }
    
            /**
                This method clears the writelock when all consumers have read.
            */
            private void ClearWriteLockOnAllRead()
            {
                // We assume all consumers have read
                bool allHaveRead = true;
    
                // Check if there's a consumer that hasn't read yet.
                foreach (ConsumerReadStatus status in _consumerStatusList && allHaveRead)
                    allHaveRead = status.HasRead;
    
                // No consumer has not read? This means we were the last one, so release write lock (producer can write again).
                if (allHaveRead)
                    _resetEventWrite.Set();
            }
            
            /**
                Registers a consumer, this makes sure the queue will not overwrite it's
                data until the registered consumer has read it.
            */
            public void RegisterConsumer(IdentifiableConsumer consumer)
            {
                // Lock list, we'll modify it.
                lock (_consumerStatusList)
                {
                    // Initialize a new ConsumerReadStatus instance and adds it to the list
                    _consumerStatusList.Add(new ConsumerReadStatus(consumer));
                    // Assigns a new Id to the consumer.
                    consumer.Id = _consumerStatusList.Count - 1;
                }
            }
    
            /**
                Registers a consumer. When a consumer is deregistered the queue will no longer
                wait for the consumer to read data
            */
            public void DeregisterConsumer(IdentifiableConsumer consumer)
            {
                // Lock list, we'll modify it.
                lock (_consumerStatusList)
                {
                    // Set's the id of the consumer to a negative value so the read method
                    // will know it's not a registered consumer.
                    consumer.Id = -1;
                    // Remove the consumer from the list.
                    _consumerStatusList.RemoveAt(consumer.Id);
    
                    // Set all the ids of the consumers to the new indexes of the list.
                    for (int i = consumer.Id; i < _consumerStatusList.Count; i++)
                        _consumerStatusList.Consumer.Id = i;
                }
            }
        }
    
    
        /**
            This class contains a reference to a consumer and an indicator telling whether
            the consumer has already read the bytes currently in the queue.
        */
        public class ConsumerReadStatus
        {
            private IdentifiableConsumer _consumer;
    
            public ConsumerReadStatus(IdentifiableConsumer consumer)
            {
                _consumer = consumer;
                HasRead = true;
            }
    
            public bool HasRead { get; set; }
    
            public IdentifiableConsumer Consumer
            {
                get
                {
                    return _consumer;
                }
            }
        }
    }
    


    The reason for letting the consumers having an id is that I can directly use that as in index for my list, otherwise I'd have to walk through the list every time to find the HasRead flag that belongs to the producer. The only "not so safe" thing here is that this is only reliable when only this particular instance of this class alters the Id property, and when creating the consumer the Id must be negative.
     
    Stop hovering to collapse... Click to collapse... Hover to expand... Click to expand...
  2. Michaela Joy

    Michaela Joy MDL Crazy Lady

    Jul 26, 2012
    3,618
    3,910
    120
    Stop hovering to collapse... Click to collapse... Hover to expand... Click to expand...
  3. Stannieman

    Stannieman MDL Guru

    Sep 4, 2009
    2,232
    1,800
    90
    #3 Stannieman, Oct 16, 2015
    Last edited: Oct 16, 2015
    (OP)
    I've already read a lot about threading. This is the first thing I make and I'm just not sure if I'm doing it right.
    As for the article implying it's not thread safe: That would be the case if I hadn't done all the threading stuff.
    My code is/tries to be a synchronized implementation, which should be thread safe if done correctly.
     
    Stop hovering to collapse... Click to collapse... Hover to expand... Click to expand...