Branch data Line data Source code
1 : : /* 2 : : ** Copyright (C) 2020 CERN 3 : : ** 4 : : ** This software is provided 'as-is', without any express or implied 5 : : ** warranty. In no event will the authors be held liable for any damages 6 : : ** arising from the use of this software. 7 : : ** 8 : : ** Permission is granted to anyone to use this software for any purpose, 9 : : ** including commercial applications, and to alter it and redistribute it 10 : : ** freely, subject to the following restrictions: 11 : : ** 12 : : ** 1. The origin of this software must not be misrepresented; you must not 13 : : ** claim that you wrote the original software. If you use this software 14 : : ** in a product, an acknowledgment in the product documentation would be 15 : : ** appreciated but is not required. 16 : : ** 2. Altered source versions must be plainly marked as such, and must not be 17 : : ** misrepresented as being the original software. 18 : : ** 3. This notice may not be removed or altered from any source distribution. 19 : : ** 20 : : ** Created on: 2020-10-06T10:01:02+01:00 21 : : ** Author: Matteo Ferrari <matteof> <matteo.ferari.1@cern.ch> 22 : : ** 23 : : */ 24 : : 25 : : #include "Thread.hpp" 26 : : 27 : : #include <cstring> 28 : : 29 : : #include <logger/Logger.hpp> 30 : : #include <pthread.h> 31 : : 32 : : #include "local-config.h" // for HAVE_PTHREAD_NAME 33 : : 34 : : using namespace logger; 35 : : 36 : : namespace ccut { 37 : : 38 : 21 : Thread::Thread(const std::string &name) : 39 : 21 : m_name(name), 40 : 21 : m_started(false), 41 : 21 : m_waked(false), 42 : 21 : m_policy(SCHED_OTHER), 43 : 21 : m_priority(-1), 44 : 42 : m_interval(std::chrono::milliseconds::zero()) 45 : 21 : {} 46 : : 47 : 0 : Thread::Thread(Thread &&other) noexcept : 48 : 0 : m_name(other.m_name), 49 : 0 : m_started(false), 50 : 0 : m_waked(false), 51 : 0 : m_policy(other.m_policy), 52 : 0 : m_priority(other.m_priority), 53 : 0 : m_interval(other.m_interval) 54 : 0 : {} 55 : : 56 : 21 : Thread::~Thread() 57 : : { 58 : 21 : stop(); 59 : 21 : } 60 : : 61 : 13 : void Thread::start() 62 : : { 63 : 13 : std::lock_guard<std::mutex> lock(m_mutex); 64 : 13 : if (m_started.load()) 65 : : { 66 : 0 : warning("ccut:thread") << "Can't start thread [" << m_name << "]. " 67 : 0 : << "Thread is already started!"; 68 : 0 : return; 69 : : } 70 : : 71 : 13 : m_started.store(true); 72 : 13 : m_waked = false; 73 : 13 : m_thread.reset(new std::thread(&Thread::_thread_func, this)); 74 : : 75 : 13 : if (m_policy != SCHED_OTHER || m_priority != -1) 76 : : { 77 : 0 : _set_thread_priority(m_policy, m_priority); 78 : : } 79 : 13 : } 80 : : 81 : 37 : void Thread::stop() 82 : : { 83 : 37 : std::unique_lock<std::mutex> lock(m_mutex); 84 : 37 : m_started.store(false); 85 : : 86 : 37 : std::unique_ptr<std::thread> th; 87 : 37 : th.swap(m_thread); 88 : 37 : lock.unlock(); 89 : 37 : wake(); 90 : : 91 : 37 : if (th) 92 : 13 : th->join(); 93 : 37 : } 94 : : 95 : 1 : void Thread::run() 96 : : { 97 : 1 : std::unique_lock<std::mutex> lock(m_mutex); 98 : 1 : if (m_started.load()) 99 : 0 : return; 100 : : 101 : 1 : m_started.store(true); 102 : 1 : m_waked = false; 103 : 1 : lock.unlock(); 104 : 1 : _thread_func(); 105 : 1 : } 106 : : 107 : 47 : void Thread::wake() 108 : : { 109 : 47 : std::lock_guard<std::mutex> lock(m_mutex); 110 : 47 : m_waked = true; 111 : 47 : m_cond.notify_all(); 112 : 47 : } 113 : : 114 : 1 : void Thread::setPriority(int policy, int32_t priority) 115 : : { 116 : 1 : std::lock_guard<std::mutex> lock(m_mutex); 117 : 1 : if (_set_thread_priority(policy, priority)) 118 : : { 119 : 0 : m_policy = policy; 120 : 0 : m_priority = priority; 121 : : } 122 : 1 : } 123 : : 124 : 1 : void Thread::setInterval(const std::chrono::milliseconds &ms) 125 : : { 126 : 1 : std::lock_guard<std::mutex> lock(m_mutex); 127 : 1 : m_interval = ms; 128 : 1 : } 129 : : 130 : 14 : void Thread::_thread_func() 131 : : { 132 : : #ifdef HAVE_PTHREAD_NAME 133 : : if (!m_name.empty()) 134 : : { 135 : : const std::string threadName = m_name.substr(0, 16); 136 : : pthread_setname_np(pthread_self(), threadName.c_str()); 137 : : } 138 : : #endif 139 : : 140 : 14 : thread_enter(); 141 : 33 : while (m_started.load()) 142 : : { 143 : 19 : thread_func(); 144 : : 145 : 19 : std::unique_lock<std::mutex> lock(m_mutex); 146 : 19 : if (!m_waked) 147 : : { 148 : 12 : if (m_interval.count() == 0) 149 : 7 : m_cond.wait(lock); 150 : : else 151 : 5 : m_cond.wait_for(lock, m_interval); 152 : : } 153 : 19 : m_waked = false; 154 : 19 : } 155 : 14 : thread_exit(); 156 : 14 : } 157 : : 158 : 1 : bool Thread::_set_thread_priority(int policy, int32_t priority) 159 : : { 160 : 1 : if (m_started.load()) 161 : : { 162 : : sched_param sch; 163 : : int currentPolicy; 164 : 1 : pthread_getschedparam(m_thread->native_handle(), ¤tPolicy, &sch); 165 : 1 : sch.sched_priority = priority; 166 : 1 : if (pthread_setschedparam(m_thread->native_handle(), policy, &sch)) 167 : : { 168 : 2 : error("ccut:thread") 169 : 1 : << "Failed to set policy and proprity to thread [" << m_name 170 : 1 : << "]: " 171 : 1 : << "Error: " << std::strerror(errno); 172 : 1 : return false; 173 : : } 174 : : } 175 : 0 : return true; 176 : : } 177 : : 178 : : } // namespace ccut