LCOV - code coverage report
Current view: top level - src - Queue.hpp (source / functions) Hit Total Coverage
Test: lcov.info Lines: 52 53 98.1 %
Date: 2025-04-27 01:14:20 Functions: 29 37 78.4 %
Branches: 0 0 -

           Branch data     Line data    Source code
       1                 :            : /*
       2                 :            :  * Queue.h
       3                 :            :  *  Template queue for posting data between threads
       4                 :            :  *  Created on: Oct 20, 2014
       5                 :            :  *      Author: mdonze
       6                 :            :  */
       7                 :            : 
       8                 :            : #ifndef QUEUE_H_
       9                 :            : #define QUEUE_H_
      10                 :            : 
      11                 :            : #include <cerrno>
      12                 :            : #include <chrono>
      13                 :            : #include <condition_variable>
      14                 :            : #include <cstddef>
      15                 :            : #include <mutex>
      16                 :            : #include <sstream>
      17                 :            : 
      18                 :            : #include <queue>
      19                 :            : 
      20                 :            : #include "Exception.hpp"
      21                 :            : 
      22                 :            : namespace ccut {
      23                 :            : /**
      24                 :            :  * @addtogroup Threading
      25                 :            :  * @{
      26                 :            :  */
      27                 :            : 
      28                 :            : template<typename T>
      29                 :            : class QueueDeleter
      30                 :            : {
      31                 :            : public:
      32                 :            :     QueueDeleter() = default;
      33                 :        187 :     void operator()(const T &) const {}
      34                 :            : };
      35                 :            : 
      36                 :            : /*
      37                 :            :  * holly crap ! partial class template specialization ! wizz !
      38                 :            :  */
      39                 :            : template<typename T>
      40                 :            : class QueueDeleter<T *>
      41                 :            : {
      42                 :            : public:
      43                 :            :     QueueDeleter() = default;
      44                 :          1 :     void operator()(T *const &t) const { delete t; }
      45                 :            : };
      46                 :            : 
      47                 :            : /**
      48                 :            :  * Queue for posting messages
      49                 :            :  *
      50                 :            :  */
      51                 :            : template<typename T>
      52                 :            : class Queue
      53                 :            : {
      54                 :            : public:
      55                 :            :     /**
      56                 :            :      * @brief Queue constructor
      57                 :            :      * @param[in] queueName name of the queue (used in exception messages)
      58                 :            :      * @param[in] maxSize   maximum size of the queue
      59                 :            :      */
      60                 :            :     Queue(const std::string &queueName, std::size_t maxSize);
      61                 :            : 
      62                 :            :     /* disable copy*/
      63                 :            :     Queue(const Queue<T> &) = delete;
      64                 :            :     Queue<T> &operator=(const Queue<T> &) = delete;
      65                 :            :     /* allow rvalue construct */
      66                 :            :     Queue(Queue<T> &&);
      67                 :            : 
      68                 :            :     virtual ~Queue();
      69                 :            : 
      70                 :            :     /**
      71                 :            :      * @brief post an element in the queue
      72                 :            :      * @throws Exception if queue is full
      73                 :            :      */
      74                 :            :     int post(const T &service);
      75                 :            :     int post(T &&service);
      76                 :            : 
      77                 :            :     /**
      78                 :            :      * @brief pop an element from the queue
      79                 :            :      * @details waits until an event is queued
      80                 :            :      */
      81                 :            :     T pop();
      82                 :            : 
      83                 :            :     /**
      84                 :            :      * @brief pop an element from the queue, throwing on timeout
      85                 :            :      * @param[in]  timeOut timeout value in miliseconds
      86                 :            :      *
      87                 :            :      * @throws Exception on timeout
      88                 :            :      */
      89                 :            :     T pop(unsigned long timeOut);
      90                 :            :     T pop(const std::chrono::milliseconds &msecs);
      91                 :            : 
      92                 :            :     /**
      93                 :            :      * @brief clear the queue
      94                 :            :      * @param[in] flush awake pending threads
      95                 :            :      * @details awoken threads will throw an Exception
      96                 :            :      */
      97                 :            :     void clear(bool flush = false);
      98                 :            : 
      99                 :            :     /**
     100                 :            :      * @brief return number of items in the queue
     101                 :            :      * @return number of items in the queue
     102                 :            :      */
     103                 :            :     std::size_t count() const;
     104                 :            : 
     105                 :            :     /**
     106                 :            :      * @brief retrieve queue name
     107                 :            :      */
     108                 :          0 :     inline const std::string &getQueueName() const { return m_queueName; }
     109                 :            : 
     110                 :            : protected:
     111                 :            :     std::size_t m_maxSize;
     112                 :            :     std::string m_queueName;
     113                 :            :     std::queue<T> m_queue;
     114                 :            :     mutable std::mutex m_mutex;
     115                 :            :     std::condition_variable m_condVar;
     116                 :            : };
     117                 :            : 
     118                 :            : template<typename T>
     119                 :         12 : Queue<T>::Queue(const std::string &queueName, std::size_t maxSize) :
     120                 :         12 :     m_maxSize(maxSize),
     121                 :         12 :     m_queueName(queueName)
     122                 :         12 : {}
     123                 :            : 
     124                 :            : template<typename T>
     125                 :            : Queue<T>::Queue(Queue &&other) :
     126                 :            :     m_maxSize(other.m_maxSize),
     127                 :            :     m_queueName(std::move(other.m_queueName))
     128                 :            : {}
     129                 :            : 
     130                 :            : template<typename T>
     131                 :         12 : Queue<T>::~Queue()
     132                 :            : {
     133                 :         12 :     clear();
     134                 :         12 : }
     135                 :            : 
     136                 :            : template<typename T>
     137                 :       1504 : int Queue<T>::post(const T &element)
     138                 :            : {
     139                 :       1504 :     return post(T(element)); /* use move implementation */
     140                 :            : }
     141                 :            : 
     142                 :            : template<typename T>
     143                 :       3569 : int Queue<T>::post(T &&element)
     144                 :            : {
     145                 :       3569 :     const std::lock_guard<std::mutex> lock(m_mutex);
     146                 :       3569 :     if (m_queue.size() >= m_maxSize)
     147                 :            :     {
     148                 :          2 :         QueueDeleter<T>()(element);
     149                 :          2 :         std::ostringstream oss;
     150                 :          2 :         oss << "Queue::post : Queue " << m_queueName
     151                 :          2 :             << ((m_maxSize == 0) ? " is flushed!" : " is full!");
     152                 :          2 :         throw Exception(ErrorCode::Overflow, oss.str());
     153                 :          2 :     }
     154                 :       3567 :     m_queue.push(std::move(element));
     155                 :       3567 :     m_condVar.notify_one();
     156                 :       7134 :     return m_queue.size();
     157                 :       3569 : }
     158                 :            : 
     159                 :            : template<typename T>
     160                 :       3407 : T Queue<T>::pop()
     161                 :            : {
     162                 :       3407 :     std::unique_lock<std::mutex> lock(m_mutex);
     163                 :            :     // Wait for the queue to have something
     164                 :       3410 :     if (m_queue.empty())
     165                 :            :     {
     166                 :         79 :         if (m_maxSize != 0)
     167                 :         71 :             m_condVar.wait(lock);
     168                 :         79 :         if (m_queue.empty() || (m_maxSize == 0))
     169                 :            :         {
     170                 :         29 :             std::ostringstream oss;
     171                 :         29 :             oss << "Queue::pop : Queue " << m_queueName << " explicit wakeup";
     172                 :         29 :             throw Exception(ErrorCode::Interrupted, oss.str());
     173                 :         29 :         }
     174                 :            :     }
     175                 :       3381 :     T ret = std::move(m_queue.front());
     176                 :       3381 :     m_queue.pop();
     177                 :       6759 :     return ret;
     178                 :       3410 : }
     179                 :            : 
     180                 :            : template<typename T>
     181                 :            : T Queue<T>::pop(unsigned long timeOut)
     182                 :            : {
     183                 :            :     return pop(std::chrono::milliseconds(timeOut));
     184                 :            : }
     185                 :            : 
     186                 :            : template<typename T>
     187                 :            : T Queue<T>::pop(const std::chrono::milliseconds &msecs)
     188                 :            : {
     189                 :            :     std::unique_lock<std::mutex> lock(m_mutex);
     190                 :            :     // Wait for the queue to have something
     191                 :            :     if (m_queue.empty())
     192                 :            :     {
     193                 :            :         if ((m_maxSize != 0) &&
     194                 :            :             (m_condVar.wait_for(lock, msecs) == std::cv_status::timeout))
     195                 :            :         {
     196                 :            :             std::ostringstream oss;
     197                 :            :             oss << "Queue::pop : Queue " << m_queueName
     198                 :            :                 << " timeout while waiting for element!";
     199                 :            :             throw Exception(ErrorCode::Timeout, oss.str());
     200                 :            :         }
     201                 :            :         else if (m_queue.empty() || // cppcheck-suppress knownConditionTrueFalse
     202                 :            :                  (m_maxSize == 0))
     203                 :            :         {
     204                 :            :             std::ostringstream oss;
     205                 :            :             oss << "Queue::pop : Queue " << m_queueName << " explicit wakeup";
     206                 :            :             throw Exception(ErrorCode::Interrupted, oss.str());
     207                 :            :         }
     208                 :            :     }
     209                 :            :     T ret = std::move(m_queue.front());
     210                 :            :     m_queue.pop();
     211                 :            :     return ret;
     212                 :            : }
     213                 :            : 
     214                 :            : template<typename T>
     215                 :          4 : std::size_t Queue<T>::count() const
     216                 :            : {
     217                 :          4 :     const std::lock_guard<std::mutex> lock(m_mutex);
     218                 :          8 :     return m_queue.size();
     219                 :          4 : }
     220                 :            : 
     221                 :            : template<typename T>
     222                 :         20 : void Queue<T>::clear(bool flush)
     223                 :            : {
     224                 :         20 :     const std::lock_guard<std::mutex> lock(m_mutex);
     225                 :        206 :     while (!m_queue.empty())
     226                 :            :     {
     227                 :        186 :         T ret = std::move(m_queue.front());
     228                 :        186 :         m_queue.pop();
     229                 :        186 :         QueueDeleter<T>()(ret);
     230                 :            :     }
     231                 :         20 :     if (flush)
     232                 :            :     {
     233                 :          8 :         m_maxSize = 0;
     234                 :          8 :         m_condVar.notify_all();
     235                 :            :     }
     236                 :         20 : }
     237                 :            : 
     238                 :            : /** @} */
     239                 :            : } // namespace ccut
     240                 :            : 
     241                 :            : #endif /* QUEUE_H_ */

Generated by: LCOV version 1.14