Branch data Line data Source code
1 : : /* 2 : : * Queue.h 3 : : * Template queue for posting data between threads 4 : : * Created on: Oct 20, 2014 5 : : * Author: mdonze 6 : : */ 7 : : 8 : : #ifndef QUEUE_H_ 9 : : #define QUEUE_H_ 10 : : 11 : : #include <cerrno> 12 : : #include <chrono> 13 : : #include <condition_variable> 14 : : #include <cstddef> 15 : : #include <mutex> 16 : : #include <sstream> 17 : : 18 : : #include <queue> 19 : : 20 : : #include "Exception.hpp" 21 : : 22 : : namespace ccut { 23 : : /** 24 : : * @addtogroup Threading 25 : : * @{ 26 : : */ 27 : : 28 : : template<typename T> 29 : : class QueueDeleter 30 : : { 31 : : public: 32 : : QueueDeleter() = default; 33 : 187 : void operator()(const T &) const {} 34 : : }; 35 : : 36 : : /* 37 : : * holly crap ! partial class template specialization ! wizz ! 38 : : */ 39 : : template<typename T> 40 : : class QueueDeleter<T *> 41 : : { 42 : : public: 43 : : QueueDeleter() = default; 44 : 1 : void operator()(T *const &t) const { delete t; } 45 : : }; 46 : : 47 : : /** 48 : : * Queue for posting messages 49 : : * 50 : : */ 51 : : template<typename T> 52 : : class Queue 53 : : { 54 : : public: 55 : : /** 56 : : * @brief Queue constructor 57 : : * @param[in] queueName name of the queue (used in exception messages) 58 : : * @param[in] maxSize maximum size of the queue 59 : : */ 60 : : Queue(const std::string &queueName, std::size_t maxSize); 61 : : 62 : : /* disable copy*/ 63 : : Queue(const Queue<T> &) = delete; 64 : : Queue<T> &operator=(const Queue<T> &) = delete; 65 : : /* allow rvalue construct */ 66 : : Queue(Queue<T> &&); 67 : : 68 : : virtual ~Queue(); 69 : : 70 : : /** 71 : : * @brief post an element in the queue 72 : : * @throws Exception if queue is full 73 : : */ 74 : : int post(const T &service); 75 : : int post(T &&service); 76 : : 77 : : /** 78 : : * @brief pop an element from the queue 79 : : * @details waits until an event is queued 80 : : */ 81 : : T pop(); 82 : : 83 : : /** 84 : : * @brief pop an element from the queue, throwing on timeout 85 : : * @param[in] timeOut timeout value in miliseconds 86 : : * 87 : : * @throws Exception on timeout 88 : : */ 89 : : T pop(unsigned long timeOut); 90 : : T pop(const std::chrono::milliseconds &msecs); 91 : : 92 : : /** 93 : : * @brief clear the queue 94 : : * @param[in] flush awake pending threads 95 : : * @details awoken threads will throw an Exception 96 : : */ 97 : : void clear(bool flush = false); 98 : : 99 : : /** 100 : : * @brief return number of items in the queue 101 : : * @return number of items in the queue 102 : : */ 103 : : std::size_t count() const; 104 : : 105 : : /** 106 : : * @brief retrieve queue name 107 : : */ 108 : 0 : inline const std::string &getQueueName() const { return m_queueName; } 109 : : 110 : : protected: 111 : : std::size_t m_maxSize; 112 : : std::string m_queueName; 113 : : std::queue<T> m_queue; 114 : : mutable std::mutex m_mutex; 115 : : std::condition_variable m_condVar; 116 : : }; 117 : : 118 : : template<typename T> 119 : 12 : Queue<T>::Queue(const std::string &queueName, std::size_t maxSize) : 120 : 12 : m_maxSize(maxSize), 121 : 12 : m_queueName(queueName) 122 : 12 : {} 123 : : 124 : : template<typename T> 125 : : Queue<T>::Queue(Queue &&other) : 126 : : m_maxSize(other.m_maxSize), 127 : : m_queueName(std::move(other.m_queueName)) 128 : : {} 129 : : 130 : : template<typename T> 131 : 12 : Queue<T>::~Queue() 132 : : { 133 : 12 : clear(); 134 : 12 : } 135 : : 136 : : template<typename T> 137 : 1504 : int Queue<T>::post(const T &element) 138 : : { 139 : 1504 : return post(T(element)); /* use move implementation */ 140 : : } 141 : : 142 : : template<typename T> 143 : 3569 : int Queue<T>::post(T &&element) 144 : : { 145 : 3569 : const std::lock_guard<std::mutex> lock(m_mutex); 146 : 3569 : if (m_queue.size() >= m_maxSize) 147 : : { 148 : 2 : QueueDeleter<T>()(element); 149 : 2 : std::ostringstream oss; 150 : 2 : oss << "Queue::post : Queue " << m_queueName 151 : 2 : << ((m_maxSize == 0) ? " is flushed!" : " is full!"); 152 : 2 : throw Exception(ErrorCode::Overflow, oss.str()); 153 : 2 : } 154 : 3567 : m_queue.push(std::move(element)); 155 : 3567 : m_condVar.notify_one(); 156 : 7134 : return m_queue.size(); 157 : 3569 : } 158 : : 159 : : template<typename T> 160 : 3407 : T Queue<T>::pop() 161 : : { 162 : 3407 : std::unique_lock<std::mutex> lock(m_mutex); 163 : : // Wait for the queue to have something 164 : 3410 : if (m_queue.empty()) 165 : : { 166 : 79 : if (m_maxSize != 0) 167 : 71 : m_condVar.wait(lock); 168 : 79 : if (m_queue.empty() || (m_maxSize == 0)) 169 : : { 170 : 29 : std::ostringstream oss; 171 : 29 : oss << "Queue::pop : Queue " << m_queueName << " explicit wakeup"; 172 : 29 : throw Exception(ErrorCode::Interrupted, oss.str()); 173 : 29 : } 174 : : } 175 : 3381 : T ret = std::move(m_queue.front()); 176 : 3381 : m_queue.pop(); 177 : 6759 : return ret; 178 : 3410 : } 179 : : 180 : : template<typename T> 181 : : T Queue<T>::pop(unsigned long timeOut) 182 : : { 183 : : return pop(std::chrono::milliseconds(timeOut)); 184 : : } 185 : : 186 : : template<typename T> 187 : : T Queue<T>::pop(const std::chrono::milliseconds &msecs) 188 : : { 189 : : std::unique_lock<std::mutex> lock(m_mutex); 190 : : // Wait for the queue to have something 191 : : if (m_queue.empty()) 192 : : { 193 : : if ((m_maxSize != 0) && 194 : : (m_condVar.wait_for(lock, msecs) == std::cv_status::timeout)) 195 : : { 196 : : std::ostringstream oss; 197 : : oss << "Queue::pop : Queue " << m_queueName 198 : : << " timeout while waiting for element!"; 199 : : throw Exception(ErrorCode::Timeout, oss.str()); 200 : : } 201 : : else if (m_queue.empty() || // cppcheck-suppress knownConditionTrueFalse 202 : : (m_maxSize == 0)) 203 : : { 204 : : std::ostringstream oss; 205 : : oss << "Queue::pop : Queue " << m_queueName << " explicit wakeup"; 206 : : throw Exception(ErrorCode::Interrupted, oss.str()); 207 : : } 208 : : } 209 : : T ret = std::move(m_queue.front()); 210 : : m_queue.pop(); 211 : : return ret; 212 : : } 213 : : 214 : : template<typename T> 215 : 4 : std::size_t Queue<T>::count() const 216 : : { 217 : 4 : const std::lock_guard<std::mutex> lock(m_mutex); 218 : 8 : return m_queue.size(); 219 : 4 : } 220 : : 221 : : template<typename T> 222 : 20 : void Queue<T>::clear(bool flush) 223 : : { 224 : 20 : const std::lock_guard<std::mutex> lock(m_mutex); 225 : 206 : while (!m_queue.empty()) 226 : : { 227 : 186 : T ret = std::move(m_queue.front()); 228 : 186 : m_queue.pop(); 229 : 186 : QueueDeleter<T>()(ret); 230 : : } 231 : 20 : if (flush) 232 : : { 233 : 8 : m_maxSize = 0; 234 : 8 : m_condVar.notify_all(); 235 : : } 236 : 20 : } 237 : : 238 : : /** @} */ 239 : : } // namespace ccut 240 : : 241 : : #endif /* QUEUE_H_ */