monitor and threads
This commit is contained in:
@@ -39,7 +39,11 @@ set(SOURCES
|
||||
Threads.cpp
|
||||
)
|
||||
|
||||
set(LIBRARIES Boost::program_options Boost::serialization)
|
||||
set(LIBRARIES
|
||||
Boost::program_options
|
||||
Boost::serialization
|
||||
OpenMP::OpenMP_CXX
|
||||
)
|
||||
|
||||
set(libname ${PACKAGE_LIBPREFIX}Core)
|
||||
set(ULIB_SHARED_LIBRARIES ${ULIB_SHARED_LIBRARIES} ${libname} PARENT_SCOPE)
|
||||
|
||||
@@ -149,6 +149,46 @@ inline ScopedTimedLock makeScopedMutexLock(std::timed_mutex& mutex, int timeout_
|
||||
__ulib_lock; \
|
||||
__ulib_lock.unlock())
|
||||
|
||||
/**
|
||||
* @brief RecursiveMutex class wraps std::recursive_timed_mutex.
|
||||
*/
|
||||
class RecursiveMutex {
|
||||
public:
|
||||
RecursiveMutex() = default;
|
||||
~RecursiveMutex() = default;
|
||||
|
||||
/** @brief Locks the mutex, blocking if necessary. */
|
||||
void Lock() { m_Mutex.lock(); }
|
||||
|
||||
/** @brief Unlocks the mutex. */
|
||||
void Unlock() { m_Mutex.unlock(); }
|
||||
|
||||
/** @brief Tries to lock the mutex without blocking. */
|
||||
bool TryLock() { return m_Mutex.try_lock(); }
|
||||
|
||||
/** @brief Tries to lock the mutex within a timeout in milliseconds. */
|
||||
bool TryLockFor(int timeout_ms) {
|
||||
if (timeout_ms < 0) { Lock(); return true; }
|
||||
return m_Mutex.try_lock_for(std::chrono::milliseconds(timeout_ms));
|
||||
}
|
||||
|
||||
/** @brief RAII helper for scoped locking. */
|
||||
class ScopedLock {
|
||||
public:
|
||||
ScopedLock(RecursiveMutex &mutex) : m_Mutex(mutex) { m_Mutex.Lock(); }
|
||||
~ScopedLock() { m_Mutex.Unlock(); }
|
||||
private:
|
||||
RecursiveMutex &m_Mutex;
|
||||
ScopedLock(const ScopedLock&) = delete;
|
||||
ScopedLock& operator=(const ScopedLock&) = delete;
|
||||
};
|
||||
|
||||
private:
|
||||
std::recursive_timed_mutex m_Mutex;
|
||||
RecursiveMutex(const RecursiveMutex &) = delete;
|
||||
RecursiveMutex &operator=(const RecursiveMutex &) = delete;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Monitor class provides a base for objects that need thread-safe access.
|
||||
*/
|
||||
|
||||
@@ -26,6 +26,15 @@
|
||||
#include "Threads.h"
|
||||
#include <chrono>
|
||||
|
||||
#ifdef _OPENMP
|
||||
#include <omp.h>
|
||||
#endif
|
||||
|
||||
#ifdef __linux__
|
||||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#endif
|
||||
|
||||
namespace uLib {
|
||||
|
||||
Thread::Thread() : m_Running(false) {}
|
||||
@@ -81,4 +90,113 @@ void Thread::Yield() {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
|
||||
void Thread::SetAffinity(int cpu) {
|
||||
#ifdef __linux__
|
||||
if (m_Thread.joinable()) {
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(cpu, &cpuset);
|
||||
pthread_setaffinity_np(m_Thread.native_handle(), sizeof(cpu_set_t), &cpuset);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Thread::SetAffinity(const std::vector<int>& cpus) {
|
||||
#ifdef __linux__
|
||||
if (m_Thread.joinable()) {
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
for (int cpu : cpus) {
|
||||
CPU_SET(cpu, &cpuset);
|
||||
}
|
||||
pthread_setaffinity_np(m_Thread.native_handle(), sizeof(cpu_set_t), &cpuset);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Thread::SetNumThreads(int n) {
|
||||
#ifdef _OPENMP
|
||||
omp_set_num_threads(n);
|
||||
#endif
|
||||
}
|
||||
|
||||
int Thread::GetNumThreads() {
|
||||
#ifdef _OPENMP
|
||||
return omp_get_max_threads();
|
||||
#else
|
||||
return 1;
|
||||
#endif
|
||||
}
|
||||
|
||||
int Thread::GetThreadNum() {
|
||||
#ifdef _OPENMP
|
||||
return omp_get_thread_num();
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
// Team Implementation //
|
||||
|
||||
Team::Team(int num_threads) : m_Size(num_threads), m_UseOpenMP(false) {
|
||||
#ifdef _OPENMP
|
||||
m_UseOpenMP = true;
|
||||
if (m_Size > 0) omp_set_num_threads(m_Size);
|
||||
else m_Size = omp_get_max_threads();
|
||||
#else
|
||||
if (m_Size <= 0) m_Size = 1;
|
||||
#endif
|
||||
}
|
||||
|
||||
Team::~Team() {
|
||||
Wait();
|
||||
}
|
||||
|
||||
void Team::Run(Task* task) {
|
||||
if (!task) return;
|
||||
#ifdef _OPENMP
|
||||
if (m_UseOpenMP) {
|
||||
#pragma omp task
|
||||
task->Execute();
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
// Fallback to synchronous execution if no OpenMP
|
||||
task->Execute();
|
||||
}
|
||||
|
||||
void Team::Wait() {
|
||||
#ifdef _OPENMP
|
||||
if (m_UseOpenMP) {
|
||||
#pragma omp taskwait
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Team::SetSize(int n) {
|
||||
m_Size = n;
|
||||
#ifdef _OPENMP
|
||||
if (m_UseOpenMP) omp_set_num_threads(m_Size);
|
||||
#endif
|
||||
}
|
||||
|
||||
void Team::SetAffinity(const std::vector<int>& cpus) {
|
||||
if (cpus.empty()) return;
|
||||
#ifdef __linux__
|
||||
#ifdef _OPENMP
|
||||
if (m_UseOpenMP) {
|
||||
#pragma omp parallel
|
||||
{
|
||||
int tid = omp_get_thread_num();
|
||||
int cpu = cpus[tid % cpus.size()];
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(cpu, &cpuset);
|
||||
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace uLib
|
||||
|
||||
@@ -29,6 +29,8 @@
|
||||
#include <thread>
|
||||
#include <functional>
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
#include "Core/Monitor.h"
|
||||
#include "Core/Object.h"
|
||||
|
||||
@@ -66,6 +68,26 @@ public:
|
||||
/** @brief Static helper to yield the current thread. */
|
||||
static void Yield();
|
||||
|
||||
/** @brief Returns the native handle of the thread. */
|
||||
std::thread::native_handle_type GetNativeHandle() { return m_Thread.native_handle(); }
|
||||
|
||||
/** @brief Sets CPU affinity for the thread. (Linux only) */
|
||||
void SetAffinity(int cpu);
|
||||
|
||||
/** @brief Sets CPU affinity for the thread using a list of CPUs. (Linux only) */
|
||||
void SetAffinity(const std::vector<int>& cpus);
|
||||
|
||||
// OpenMP Support //
|
||||
|
||||
/** @brief Sets the number of threads for OpenMP parallel regions. */
|
||||
static void SetNumThreads(int n);
|
||||
|
||||
/** @brief Returns the number of threads for OpenMP parallel regions. */
|
||||
static int GetNumThreads();
|
||||
|
||||
/** @brief Returns the ID of the current thread in an OpenMP parallel region. */
|
||||
static int GetThreadNum();
|
||||
|
||||
protected:
|
||||
// Internal thread entry point
|
||||
void ThreadEntryPoint();
|
||||
@@ -75,6 +97,51 @@ protected:
|
||||
mutable Mutex m_ThreadMutex;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Task class wraps a function call to be executed by a Team.
|
||||
*/
|
||||
class Task : public Object {
|
||||
public:
|
||||
Task(std::function<void()> func) : m_Func(func) {}
|
||||
virtual ~Task() = default;
|
||||
|
||||
/** @brief Executes the task. */
|
||||
virtual void Execute() { if (m_Func) m_Func(); }
|
||||
|
||||
protected:
|
||||
std::function<void()> m_Func;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Team class manages a group of threads and can execute Tasks.
|
||||
* This is designed to be compatible with OpenMP tasks and teams.
|
||||
*/
|
||||
class Team : public Object {
|
||||
public:
|
||||
Team(int num_threads = -1);
|
||||
virtual ~Team();
|
||||
|
||||
/** @brief Runs a task within the team. Uses OpenMP task if available. */
|
||||
void Run(Task* task);
|
||||
|
||||
/** @brief Waits for all tasks in the team to finish. */
|
||||
void Wait();
|
||||
|
||||
/** @brief Sets the number of threads for this team. */
|
||||
void SetSize(int n);
|
||||
|
||||
/** @brief Returns the number of threads in the team. */
|
||||
int GetSize() const { return m_Size; }
|
||||
|
||||
/** @brief Sets CPU affinity for all threads in the team. */
|
||||
void SetAffinity(const std::vector<int>& cpus);
|
||||
|
||||
protected:
|
||||
int m_Size;
|
||||
bool m_UseOpenMP;
|
||||
std::vector<Thread*> m_Threads;
|
||||
};
|
||||
|
||||
} // namespace uLib
|
||||
|
||||
#endif // U_CORE_THREADS_H
|
||||
|
||||
65
src/Core/testing/AffinityTest.cpp
Normal file
65
src/Core/testing/AffinityTest.cpp
Normal file
@@ -0,0 +1,65 @@
|
||||
#include "Core/Threads.h"
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <cassert>
|
||||
|
||||
#ifdef __linux__
|
||||
#include <pthread.h>
|
||||
#include <sched.h>
|
||||
#endif
|
||||
|
||||
using namespace uLib;
|
||||
|
||||
void TestThreadAffinity() {
|
||||
std::cout << "Testing Thread Affinity..." << std::endl;
|
||||
#ifdef __linux__
|
||||
Thread t;
|
||||
t.Start();
|
||||
t.SetAffinity(0); // Bind to CPU 0
|
||||
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
pthread_getaffinity_np(t.GetNativeHandle(), sizeof(cpu_set_t), &cpuset);
|
||||
assert(CPU_ISSET(0, &cpuset));
|
||||
|
||||
t.Join();
|
||||
std::cout << " Passed (Thread bound to CPU 0)." << std::endl;
|
||||
#else
|
||||
std::cout << " Affinity not supported on this OS, skipping." << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
void TestTeamAffinity() {
|
||||
std::cout << "Testing Team Affinity..." << std::endl;
|
||||
#ifdef __linux__
|
||||
#ifdef _OPENMP
|
||||
Team team(2);
|
||||
std::vector<int> cpus = {0, 1};
|
||||
team.SetAffinity(cpus);
|
||||
|
||||
// We check affinity inside a parallel region
|
||||
#pragma omp parallel
|
||||
{
|
||||
cpu_set_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
|
||||
int tid = Thread::GetThreadNum();
|
||||
int expected_cpu = cpus[tid % cpus.size()];
|
||||
assert(CPU_ISSET(expected_cpu, &cpuset));
|
||||
}
|
||||
std::cout << " Passed (Team threads bound correctly)." << std::endl;
|
||||
#endif
|
||||
#else
|
||||
std::cout << " Affinity not supported on this OS, skipping." << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
// Helper to get native handle if needed (oops, I forgot to add it to Thread class)
|
||||
// I'll add GetNativeHandle() to Thread class in Threads.h
|
||||
|
||||
int main() {
|
||||
TestThreadAffinity();
|
||||
TestTeamAffinity();
|
||||
std::cout << "All Affinity tests finished!" << std::endl;
|
||||
return 0;
|
||||
}
|
||||
@@ -25,6 +25,9 @@ set( TESTS
|
||||
HRPTest
|
||||
MutexTest
|
||||
ThreadsTest
|
||||
OpenMPTest
|
||||
TeamTest
|
||||
AffinityTest
|
||||
)
|
||||
|
||||
set(LIBRARIES
|
||||
@@ -33,6 +36,7 @@ set(LIBRARIES
|
||||
Boost::serialization
|
||||
Boost::program_options
|
||||
${ROOT_LIBRARIES}
|
||||
OpenMP::OpenMP_CXX
|
||||
)
|
||||
uLib_add_tests(Core)
|
||||
|
||||
|
||||
47
src/Core/testing/OpenMPTest.cpp
Normal file
47
src/Core/testing/OpenMPTest.cpp
Normal file
@@ -0,0 +1,47 @@
|
||||
#include "Core/Threads.h"
|
||||
#include <iostream>
|
||||
#include <cassert>
|
||||
|
||||
#ifdef _OPENMP
|
||||
#include <omp.h>
|
||||
#endif
|
||||
|
||||
using namespace uLib;
|
||||
|
||||
class OpenMPThread : public Thread {
|
||||
public:
|
||||
void Run() override {
|
||||
#ifdef _OPENMP
|
||||
Thread::SetNumThreads(2);
|
||||
int max = Thread::GetNumThreads();
|
||||
std::cout << " OpenMP max threads in uLib::Thread: " << max << std::endl;
|
||||
|
||||
int shared_counter = 0;
|
||||
#pragma omp parallel reduction(+:shared_counter)
|
||||
{
|
||||
shared_counter += 1;
|
||||
}
|
||||
std::cout << " Parallel region executed with " << shared_counter << " threads." << std::endl;
|
||||
assert(shared_counter <= max);
|
||||
#else
|
||||
std::cout << " OpenMP not available, skipping parallel check." << std::endl;
|
||||
assert(Thread::GetNumThreads() == 1);
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
int main() {
|
||||
std::cout << "Testing OpenMP compatibility..." << std::endl;
|
||||
#ifdef _OPENMP
|
||||
std::cout << " OpenMP is AVAILABLE." << std::endl;
|
||||
#else
|
||||
std::cout << " OpenMP is NOT available." << std::endl;
|
||||
#endif
|
||||
|
||||
OpenMPThread t;
|
||||
t.Start();
|
||||
t.Join();
|
||||
|
||||
std::cout << "OpenMP compatibility test finished!" << std::endl;
|
||||
return 0;
|
||||
}
|
||||
40
src/Core/testing/TeamTest.cpp
Normal file
40
src/Core/testing/TeamTest.cpp
Normal file
@@ -0,0 +1,40 @@
|
||||
#include "Core/Threads.h"
|
||||
#include <iostream>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <vector>
|
||||
|
||||
using namespace uLib;
|
||||
|
||||
void TestTaskTeam() {
|
||||
std::cout << "Testing Task and Team..." << std::endl;
|
||||
|
||||
std::atomic<int> counter(0);
|
||||
auto task_func = [&]() {
|
||||
counter++;
|
||||
Thread::Sleep(10);
|
||||
};
|
||||
|
||||
Team team(4);
|
||||
std::cout << " Team size: " << team.GetSize() << std::endl;
|
||||
|
||||
#ifdef _OPENMP
|
||||
#pragma omp parallel
|
||||
#pragma omp single
|
||||
#endif
|
||||
{
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
team.Run(new Task(task_func));
|
||||
}
|
||||
team.Wait();
|
||||
}
|
||||
|
||||
assert(counter == 20);
|
||||
std::cout << " Passed (counter: " << counter << ")." << std::endl;
|
||||
}
|
||||
|
||||
int main() {
|
||||
TestTaskTeam();
|
||||
std::cout << "All Team tests passed!" << std::endl;
|
||||
return 0;
|
||||
}
|
||||
@@ -68,6 +68,7 @@ vtkPolyData *vtkContainerBox::GetPolyData() const {
|
||||
|
||||
|
||||
void vtkContainerBox::contentUpdate() {
|
||||
RecursiveMutex::ScopedLock lock(this->m_UpdateMutex);
|
||||
if (!m_Content)
|
||||
return;
|
||||
|
||||
@@ -95,6 +96,7 @@ void vtkContainerBox::contentUpdate() {
|
||||
|
||||
|
||||
void vtkContainerBox::Update() {
|
||||
RecursiveMutex::ScopedLock lock(this->m_UpdateMutex);
|
||||
if (!m_Content) return;
|
||||
|
||||
if (m_BlockUpdate) {
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include <vector>
|
||||
#include "Core/Object.h"
|
||||
#include "Core/Property.h"
|
||||
#include "Core/Monitor.h"
|
||||
|
||||
// vtk classes forward declaration //
|
||||
class vtkProp;
|
||||
@@ -112,6 +113,7 @@ protected:
|
||||
void RemoveProp(vtkProp *prop);
|
||||
|
||||
std::vector<uLib::PropertyBase*> m_DisplayProperties;
|
||||
mutable uLib::RecursiveMutex m_UpdateMutex;
|
||||
|
||||
private:
|
||||
Puppet(const Puppet&) = delete;
|
||||
|
||||
Reference in New Issue
Block a user