Branch data Line data Source code
1 : : /* 2 : : ** Copyright (C) 2020 CERN 3 : : ** 4 : : ** This software is provided 'as-is', without any express or implied 5 : : ** warranty. In no event will the authors be held liable for any damages 6 : : ** arising from the use of this software. 7 : : ** 8 : : ** Permission is granted to anyone to use this software for any purpose, 9 : : ** including commercial applications, and to alter it and redistribute it 10 : : ** freely, subject to the following restrictions: 11 : : ** 12 : : ** 1. The origin of this software must not be misrepresented; you must not 13 : : ** claim that you wrote the original software. If you use this software 14 : : ** in a product, an acknowledgment in the product documentation would be 15 : : ** appreciated but is not required. 16 : : ** 2. Altered source versions must be plainly marked as such, and must not be 17 : : ** misrepresented as being the original software. 18 : : ** 3. This notice may not be removed or altered from any source distribution. 19 : : ** 20 : : ** Created on: 2020-02-26T10:01:02+01:00 21 : : ** Author: Sylvain Fargier <sfargier> <sylvain.fargier@cern.ch> 22 : : ** 23 : : */ 24 : : 25 : : #include "Worker.hpp" 26 : : 27 : : #include <cerrno> 28 : : 29 : : #include <logger/Logger.hpp> 30 : : 31 : : #include "local-config.h" 32 : : 33 : : #ifdef HAVE_PTHREAD_NAME 34 : : #include <pthread.h> 35 : : #endif 36 : : 37 : : using namespace ccut; 38 : : using namespace logger; 39 : : 40 : 6 : Worker::Worker(const std::string &name, 41 : : std::size_t numThreads, 42 : 6 : std::size_t maxSize) : 43 : 6 : m_queue(name, maxSize) 44 : : { 45 : 6 : m_threads.reserve(numThreads); 46 : 6 : start(); 47 : 6 : } 48 : : 49 : 6 : Worker::~Worker() 50 : : { 51 : 6 : stop(); 52 : 6 : } 53 : : 54 : 1002 : const Worker::SharedTask &Worker::post(const Worker::SharedTask &task) 55 : : { 56 : 1002 : m_queue.post(task); 57 : 1002 : return task; 58 : : } 59 : : 60 : 501 : Worker::SharedTask Worker::post(const std::function<void()> &funTask) 61 : : { 62 : 501 : Worker::SharedTask task(new FunTask(funTask)); 63 : 501 : m_queue.post(task); 64 : 501 : return task; 65 : 0 : } 66 : : 67 : 6 : void Worker::start() 68 : : { 69 : 6 : if (!m_threads.empty()) 70 : 0 : return; 71 : : 72 : 32 : for (std::size_t i = 0; i < m_threads.capacity(); ++i) 73 : : { 74 : 26 : m_threads.push_back(std::unique_ptr<std::thread>( 75 : 52 : new std::thread(&Worker::thread_func, this))); 76 : : } 77 : : } 78 : : 79 : 6 : void Worker::stop() 80 : : { 81 : 6 : m_queue.clear(true); 82 : : 83 : 32 : for (std::size_t i = 0; i < m_threads.size(); ++i) 84 : : { 85 : 26 : m_threads[i]->join(); 86 : : } 87 : 6 : m_threads.clear(); 88 : 6 : } 89 : : 90 : 0 : const std::string &Worker::name() const 91 : : { 92 : 0 : return m_queue.getQueueName(); 93 : : } 94 : : 95 : 1349 : void Worker::thread_func() 96 : : { 97 : : #ifdef HAVE_PTHREAD_NAME 98 : : const std::string threadName = name().substr(0, 16); 99 : : pthread_setname_np(pthread_self(), threadName.c_str()); 100 : : #endif 101 : : 102 : : while (true) 103 : : { 104 : : try 105 : : { 106 : 1349 : Worker::SharedTask task = m_queue.pop(); 107 : : 108 : 1325 : if (task->setState(Task::RUNNING)) 109 : : { 110 : 1007 : task->run(); 111 : : } 112 : 1325 : task->setState(Task::FINISHED); 113 : 1324 : } 114 : 26 : catch (ccut::Exception &ex) 115 : : { 116 : 26 : if (ex.getErrorCode() == ErrorCode::Interrupted) 117 : : { 118 : : /* time to die */ 119 : 52 : return; 120 : : } 121 : 0 : error("ccut:worker") << ex.what(); 122 : 26 : } 123 : 1323 : } 124 : : } 125 : : 126 : 1504 : Worker::Task::Task() : m_state(Worker::Task::READY), m_error(Worker::Task::OK) 127 : 1504 : {} 128 : : 129 : 1504 : Worker::Task::~Task() 130 : : { 131 : 1504 : if (m_state != Task::FINISHED) 132 : : { 133 : 0 : abort(); 134 : : } 135 : 1504 : } 136 : : 137 : 4 : Worker::Task::State Worker::Task::state() const 138 : : { 139 : 4 : const std::lock_guard<std::mutex> lock(m_lock); 140 : 4 : return m_state; 141 : 4 : } 142 : : 143 : 2 : Worker::Task::Error Worker::Task::error() const 144 : : { 145 : 2 : const std::lock_guard<std::mutex> lock(m_lock); 146 : 2 : return m_error; 147 : 2 : } 148 : : 149 : 1 : const std::string &Worker::Task::errorString() const 150 : : { 151 : 1 : const std::lock_guard<std::mutex> lock(m_lock); 152 : 1 : return m_errorString; 153 : 1 : } 154 : : 155 : 1502 : bool Worker::Task::wait(unsigned long msecs) const 156 : : { 157 : 1502 : return wait(std::chrono::milliseconds(msecs)); 158 : : } 159 : : 160 : 1502 : bool Worker::Task::wait(const std::chrono::milliseconds &msecs) const 161 : : { 162 : 1502 : std::unique_lock<std::mutex> lock(m_lock); 163 : : 164 : 1502 : if (m_state == Task::FINISHED) 165 : 1496 : return true; 166 : : 167 : 6 : if (msecs.count()) 168 : : { 169 : 6 : m_cond.wait_for(lock, msecs); 170 : : } 171 : : else 172 : : { 173 : 0 : m_cond.wait(lock); 174 : : } 175 : 6 : return (m_state == Task::FINISHED); 176 : 1502 : } 177 : : 178 : 502 : bool Worker::Task::abort() 179 : : { 180 : 502 : std::unique_lock<std::mutex> lock(m_lock); 181 : 502 : if (m_state != Task::RUNNING) 182 : : { 183 : 497 : m_error = Task::ABORTED; 184 : 497 : m_state = Task::FINISHED; 185 : 497 : lock.unlock(); 186 : 497 : m_cond.notify_all(); 187 : 497 : return true; 188 : : } 189 : 5 : return false; 190 : 502 : } 191 : : 192 : 2645 : bool Worker::Task::setState(State state) 193 : : { 194 : 2645 : std::unique_lock<std::mutex> lock(m_lock); 195 : 2646 : switch (state) 196 : : { 197 : 0 : case Task::READY: return false; 198 : 1323 : case Task::RUNNING: 199 : 1323 : if (m_state != Task::READY) 200 : 317 : return false; 201 : 1006 : m_state = Task::RUNNING; 202 : 1006 : break; 203 : 1325 : case Task::FINISHED: 204 : 1325 : m_state = Task::FINISHED; 205 : 1325 : lock.unlock(); 206 : 1325 : m_cond.notify_all(); 207 : 1325 : break; 208 : : } 209 : : 210 : 2329 : return true; 211 : 2646 : } 212 : : 213 : 1 : void Worker::Task::setError(const std::string &error) 214 : : { 215 : 1 : std::unique_lock<std::mutex> lock(m_lock); 216 : 1 : m_state = Task::FINISHED; 217 : 1 : m_errorString = error; 218 : 1 : m_error = Task::FAILED; 219 : 1 : lock.unlock(); 220 : 1 : m_cond.notify_all(); 221 : 1 : } 222 : : 223 : 501 : Worker::FunTask::FunTask(const std::function<void()> &fun) : m_fun(fun) {} 224 : : 225 : 500 : void Worker::FunTask::run() 226 : : { 227 : : try 228 : : { 229 : 500 : m_fun(); 230 : : } 231 : 1 : catch (const std::string &error) 232 : : { 233 : 1 : setError(error); 234 : 1 : } 235 : 501 : }