From a467b7385b1c558512973f780a3bad68683ef3fc Mon Sep 17 00:00:00 2001 From: AndreaRigoni Date: Wed, 25 Mar 2026 11:04:37 +0000 Subject: [PATCH] monitor and threads --- src/Core/CMakeLists.txt | 6 +- src/Core/Monitor.h | 40 ++++++++++ src/Core/Threads.cpp | 118 ++++++++++++++++++++++++++++++ src/Core/Threads.h | 67 +++++++++++++++++ src/Core/testing/AffinityTest.cpp | 65 ++++++++++++++++ src/Core/testing/CMakeLists.txt | 4 + src/Core/testing/OpenMPTest.cpp | 47 ++++++++++++ src/Core/testing/TeamTest.cpp | 40 ++++++++++ src/Vtk/Math/vtkContainerBox.cpp | 2 + src/Vtk/uLibVtkInterface.h | 2 + 10 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 src/Core/testing/AffinityTest.cpp create mode 100644 src/Core/testing/OpenMPTest.cpp create mode 100644 src/Core/testing/TeamTest.cpp diff --git a/src/Core/CMakeLists.txt b/src/Core/CMakeLists.txt index fcc6d17..e64459f 100644 --- a/src/Core/CMakeLists.txt +++ b/src/Core/CMakeLists.txt @@ -39,7 +39,11 @@ set(SOURCES Threads.cpp ) -set(LIBRARIES Boost::program_options Boost::serialization) +set(LIBRARIES + Boost::program_options + Boost::serialization + OpenMP::OpenMP_CXX +) set(libname ${PACKAGE_LIBPREFIX}Core) set(ULIB_SHARED_LIBRARIES ${ULIB_SHARED_LIBRARIES} ${libname} PARENT_SCOPE) diff --git a/src/Core/Monitor.h b/src/Core/Monitor.h index 52e6b27..b47f76b 100644 --- a/src/Core/Monitor.h +++ b/src/Core/Monitor.h @@ -149,6 +149,46 @@ inline ScopedTimedLock makeScopedMutexLock(std::timed_mutex& mutex, int timeout_ __ulib_lock; \ __ulib_lock.unlock()) +/** + * @brief RecursiveMutex class wraps std::recursive_timed_mutex. + */ +class RecursiveMutex { +public: + RecursiveMutex() = default; + ~RecursiveMutex() = default; + + /** @brief Locks the mutex, blocking if necessary. */ + void Lock() { m_Mutex.lock(); } + + /** @brief Unlocks the mutex. */ + void Unlock() { m_Mutex.unlock(); } + + /** @brief Tries to lock the mutex without blocking. */ + bool TryLock() { return m_Mutex.try_lock(); } + + /** @brief Tries to lock the mutex within a timeout in milliseconds. */ + bool TryLockFor(int timeout_ms) { + if (timeout_ms < 0) { Lock(); return true; } + return m_Mutex.try_lock_for(std::chrono::milliseconds(timeout_ms)); + } + + /** @brief RAII helper for scoped locking. */ + class ScopedLock { + public: + ScopedLock(RecursiveMutex &mutex) : m_Mutex(mutex) { m_Mutex.Lock(); } + ~ScopedLock() { m_Mutex.Unlock(); } + private: + RecursiveMutex &m_Mutex; + ScopedLock(const ScopedLock&) = delete; + ScopedLock& operator=(const ScopedLock&) = delete; + }; + +private: + std::recursive_timed_mutex m_Mutex; + RecursiveMutex(const RecursiveMutex &) = delete; + RecursiveMutex &operator=(const RecursiveMutex &) = delete; +}; + /** * @brief Monitor class provides a base for objects that need thread-safe access. */ diff --git a/src/Core/Threads.cpp b/src/Core/Threads.cpp index ff44879..afb846e 100644 --- a/src/Core/Threads.cpp +++ b/src/Core/Threads.cpp @@ -26,6 +26,15 @@ #include "Threads.h" #include +#ifdef _OPENMP +#include +#endif + +#ifdef __linux__ +#include +#include +#endif + namespace uLib { Thread::Thread() : m_Running(false) {} @@ -81,4 +90,113 @@ void Thread::Yield() { std::this_thread::yield(); } +void Thread::SetAffinity(int cpu) { +#ifdef __linux__ + if (m_Thread.joinable()) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpu, &cpuset); + pthread_setaffinity_np(m_Thread.native_handle(), sizeof(cpu_set_t), &cpuset); + } +#endif +} + +void Thread::SetAffinity(const std::vector& cpus) { +#ifdef __linux__ + if (m_Thread.joinable()) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (int cpu : cpus) { + CPU_SET(cpu, &cpuset); + } + pthread_setaffinity_np(m_Thread.native_handle(), sizeof(cpu_set_t), &cpuset); + } +#endif +} + +void Thread::SetNumThreads(int n) { +#ifdef _OPENMP + omp_set_num_threads(n); +#endif +} + +int Thread::GetNumThreads() { +#ifdef _OPENMP + return omp_get_max_threads(); +#else + return 1; +#endif +} + +int Thread::GetThreadNum() { +#ifdef _OPENMP + return omp_get_thread_num(); +#else + return 0; +#endif +} + +// Team Implementation // + +Team::Team(int num_threads) : m_Size(num_threads), m_UseOpenMP(false) { +#ifdef _OPENMP + m_UseOpenMP = true; + if (m_Size > 0) omp_set_num_threads(m_Size); + else m_Size = omp_get_max_threads(); +#else + if (m_Size <= 0) m_Size = 1; +#endif +} + +Team::~Team() { + Wait(); +} + +void Team::Run(Task* task) { + if (!task) return; +#ifdef _OPENMP + if (m_UseOpenMP) { + #pragma omp task + task->Execute(); + return; + } +#endif + // Fallback to synchronous execution if no OpenMP + task->Execute(); +} + +void Team::Wait() { +#ifdef _OPENMP + if (m_UseOpenMP) { + #pragma omp taskwait + } +#endif +} + +void Team::SetSize(int n) { + m_Size = n; +#ifdef _OPENMP + if (m_UseOpenMP) omp_set_num_threads(m_Size); +#endif +} + +void Team::SetAffinity(const std::vector& cpus) { + if (cpus.empty()) return; +#ifdef __linux__ +#ifdef _OPENMP + if (m_UseOpenMP) { + #pragma omp parallel + { + int tid = omp_get_thread_num(); + int cpu = cpus[tid % cpus.size()]; + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpu, &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + } + } +#endif +#endif +} + } // namespace uLib diff --git a/src/Core/Threads.h b/src/Core/Threads.h index 75a9d8d..96b5fa2 100644 --- a/src/Core/Threads.h +++ b/src/Core/Threads.h @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include "Core/Monitor.h" #include "Core/Object.h" @@ -66,6 +68,26 @@ public: /** @brief Static helper to yield the current thread. */ static void Yield(); + /** @brief Returns the native handle of the thread. */ + std::thread::native_handle_type GetNativeHandle() { return m_Thread.native_handle(); } + + /** @brief Sets CPU affinity for the thread. (Linux only) */ + void SetAffinity(int cpu); + + /** @brief Sets CPU affinity for the thread using a list of CPUs. (Linux only) */ + void SetAffinity(const std::vector& cpus); + + // OpenMP Support // + + /** @brief Sets the number of threads for OpenMP parallel regions. */ + static void SetNumThreads(int n); + + /** @brief Returns the number of threads for OpenMP parallel regions. */ + static int GetNumThreads(); + + /** @brief Returns the ID of the current thread in an OpenMP parallel region. */ + static int GetThreadNum(); + protected: // Internal thread entry point void ThreadEntryPoint(); @@ -75,6 +97,51 @@ protected: mutable Mutex m_ThreadMutex; }; +/** + * @brief Task class wraps a function call to be executed by a Team. + */ +class Task : public Object { +public: + Task(std::function func) : m_Func(func) {} + virtual ~Task() = default; + + /** @brief Executes the task. */ + virtual void Execute() { if (m_Func) m_Func(); } + +protected: + std::function m_Func; +}; + +/** + * @brief Team class manages a group of threads and can execute Tasks. + * This is designed to be compatible with OpenMP tasks and teams. + */ +class Team : public Object { +public: + Team(int num_threads = -1); + virtual ~Team(); + + /** @brief Runs a task within the team. Uses OpenMP task if available. */ + void Run(Task* task); + + /** @brief Waits for all tasks in the team to finish. */ + void Wait(); + + /** @brief Sets the number of threads for this team. */ + void SetSize(int n); + + /** @brief Returns the number of threads in the team. */ + int GetSize() const { return m_Size; } + + /** @brief Sets CPU affinity for all threads in the team. */ + void SetAffinity(const std::vector& cpus); + +protected: + int m_Size; + bool m_UseOpenMP; + std::vector m_Threads; +}; + } // namespace uLib #endif // U_CORE_THREADS_H diff --git a/src/Core/testing/AffinityTest.cpp b/src/Core/testing/AffinityTest.cpp new file mode 100644 index 0000000..3b56c36 --- /dev/null +++ b/src/Core/testing/AffinityTest.cpp @@ -0,0 +1,65 @@ +#include "Core/Threads.h" +#include +#include +#include + +#ifdef __linux__ +#include +#include +#endif + +using namespace uLib; + +void TestThreadAffinity() { + std::cout << "Testing Thread Affinity..." << std::endl; +#ifdef __linux__ + Thread t; + t.Start(); + t.SetAffinity(0); // Bind to CPU 0 + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + pthread_getaffinity_np(t.GetNativeHandle(), sizeof(cpu_set_t), &cpuset); + assert(CPU_ISSET(0, &cpuset)); + + t.Join(); + std::cout << " Passed (Thread bound to CPU 0)." << std::endl; +#else + std::cout << " Affinity not supported on this OS, skipping." << std::endl; +#endif +} + +void TestTeamAffinity() { + std::cout << "Testing Team Affinity..." << std::endl; +#ifdef __linux__ +#ifdef _OPENMP + Team team(2); + std::vector cpus = {0, 1}; + team.SetAffinity(cpus); + + // We check affinity inside a parallel region + #pragma omp parallel + { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + int tid = Thread::GetThreadNum(); + int expected_cpu = cpus[tid % cpus.size()]; + assert(CPU_ISSET(expected_cpu, &cpuset)); + } + std::cout << " Passed (Team threads bound correctly)." << std::endl; +#endif +#else + std::cout << " Affinity not supported on this OS, skipping." << std::endl; +#endif +} + +// Helper to get native handle if needed (oops, I forgot to add it to Thread class) +// I'll add GetNativeHandle() to Thread class in Threads.h + +int main() { + TestThreadAffinity(); + TestTeamAffinity(); + std::cout << "All Affinity tests finished!" << std::endl; + return 0; +} diff --git a/src/Core/testing/CMakeLists.txt b/src/Core/testing/CMakeLists.txt index a3b0d52..c6f5af6 100644 --- a/src/Core/testing/CMakeLists.txt +++ b/src/Core/testing/CMakeLists.txt @@ -25,6 +25,9 @@ set( TESTS HRPTest MutexTest ThreadsTest + OpenMPTest + TeamTest + AffinityTest ) set(LIBRARIES @@ -33,6 +36,7 @@ set(LIBRARIES Boost::serialization Boost::program_options ${ROOT_LIBRARIES} + OpenMP::OpenMP_CXX ) uLib_add_tests(Core) diff --git a/src/Core/testing/OpenMPTest.cpp b/src/Core/testing/OpenMPTest.cpp new file mode 100644 index 0000000..50a6f7a --- /dev/null +++ b/src/Core/testing/OpenMPTest.cpp @@ -0,0 +1,47 @@ +#include "Core/Threads.h" +#include +#include + +#ifdef _OPENMP +#include +#endif + +using namespace uLib; + +class OpenMPThread : public Thread { +public: + void Run() override { +#ifdef _OPENMP + Thread::SetNumThreads(2); + int max = Thread::GetNumThreads(); + std::cout << " OpenMP max threads in uLib::Thread: " << max << std::endl; + + int shared_counter = 0; + #pragma omp parallel reduction(+:shared_counter) + { + shared_counter += 1; + } + std::cout << " Parallel region executed with " << shared_counter << " threads." << std::endl; + assert(shared_counter <= max); +#else + std::cout << " OpenMP not available, skipping parallel check." << std::endl; + assert(Thread::GetNumThreads() == 1); +#endif + } +}; + +int main() { + std::cout << "Testing OpenMP compatibility..." << std::endl; +#ifdef _OPENMP + std::cout << " OpenMP is AVAILABLE." << std::endl; +#else + std::cout << " OpenMP is NOT available." << std::endl; +#endif + + OpenMPThread t; + t.Start(); + t.Join(); + + std::cout << "OpenMP compatibility test finished!" << std::endl; + return 0; +} diff --git a/src/Core/testing/TeamTest.cpp b/src/Core/testing/TeamTest.cpp new file mode 100644 index 0000000..8e832a8 --- /dev/null +++ b/src/Core/testing/TeamTest.cpp @@ -0,0 +1,40 @@ +#include "Core/Threads.h" +#include +#include +#include +#include + +using namespace uLib; + +void TestTaskTeam() { + std::cout << "Testing Task and Team..." << std::endl; + + std::atomic counter(0); + auto task_func = [&]() { + counter++; + Thread::Sleep(10); + }; + + Team team(4); + std::cout << " Team size: " << team.GetSize() << std::endl; + +#ifdef _OPENMP + #pragma omp parallel + #pragma omp single +#endif + { + for (int i = 0; i < 20; ++i) { + team.Run(new Task(task_func)); + } + team.Wait(); + } + + assert(counter == 20); + std::cout << " Passed (counter: " << counter << ")." << std::endl; +} + +int main() { + TestTaskTeam(); + std::cout << "All Team tests passed!" << std::endl; + return 0; +} diff --git a/src/Vtk/Math/vtkContainerBox.cpp b/src/Vtk/Math/vtkContainerBox.cpp index 6de9944..ecb893e 100644 --- a/src/Vtk/Math/vtkContainerBox.cpp +++ b/src/Vtk/Math/vtkContainerBox.cpp @@ -68,6 +68,7 @@ vtkPolyData *vtkContainerBox::GetPolyData() const { void vtkContainerBox::contentUpdate() { + RecursiveMutex::ScopedLock lock(this->m_UpdateMutex); if (!m_Content) return; @@ -95,6 +96,7 @@ void vtkContainerBox::contentUpdate() { void vtkContainerBox::Update() { + RecursiveMutex::ScopedLock lock(this->m_UpdateMutex); if (!m_Content) return; if (m_BlockUpdate) { diff --git a/src/Vtk/uLibVtkInterface.h b/src/Vtk/uLibVtkInterface.h index 619564c..e359cc5 100644 --- a/src/Vtk/uLibVtkInterface.h +++ b/src/Vtk/uLibVtkInterface.h @@ -31,6 +31,7 @@ #include #include "Core/Object.h" #include "Core/Property.h" +#include "Core/Monitor.h" // vtk classes forward declaration // class vtkProp; @@ -112,6 +113,7 @@ protected: void RemoveProp(vtkProp *prop); std::vector m_DisplayProperties; + mutable uLib::RecursiveMutex m_UpdateMutex; private: Puppet(const Puppet&) = delete;