Welcome to Doom9's Forum, THE in-place to be for everyone interested in DVD conversion. Before you start posting please read the forum rules. By posting to this forum you agree to abide by the rules. |
16th April 2019, 10:13 | #1 | Link |
Formerly davidh*****
Join Date: Jan 2004
Posts: 2,496
|
Does anyone know anything about threadpools?
I usually internally multithread my filters, but with my latest one I'm running into a problem because the overhead for creating the threads is too high, and offsets any speed advantage.
I did a bit of Googling, and found out about threadpools - create a bunch of threads once, and then using conditional variables to signal to them to do work when required. That way, the thread creation overhead is removed, because the threads are created once on construction of the filter, not per frame (or several times per frame, as in my case). But, as usual, I don't really know what I'm doing and I can't get it to work properly. It seems like there's some kind of timing issue where one of the threads sometimes fails to get the notification. What's really annoying is that as soon as I add more code to debug it, that slows things down enough that the problem doesn't happen, which makes it hard to figure out. So does anyone here understand threadpools who could help me figure out what's going wrong? |
16th April 2019, 13:48 | #2 | Link | |
HeartlessS Usurer
Join Date: Dec 2009
Location: Over the rainbow
Posts: 10,980
|
Have you seen this:- https://forum.doom9.org/showthread.php?t=164407
Quote:
__________________
I sometimes post sober. StainlessS@MediaFire ::: AND/OR ::: StainlessS@SendSpace "Some infinities are bigger than other infinities", but how many of them are infinitely bigger ??? |
|
16th April 2019, 20:32 | #4 | Link |
Software Developer
Join Date: Jun 2005
Location: Last House on Slunk Street
Posts: 13,248
|
Put simply, you create a fixed number of "worker" threads once, and you keep them running continuously – instead of creating short-lived threads for each task.
Each "worker" thread executes an infinite loop that looks like this: Code:
void thread_main(void) { while(!g_isTerminating) { Task nextTask = g_queue.take(); nextTask.execute(); } } Now, in order to have some task executed by the thread pool, you simply push the task object onto the queue, e.g. from the "main" thread, and that's it: Code:
void main(void) { /* .... */ for(int i = 0; i < numberOfTasks; ++i) { Task taskToBeExecuted = new Task(/*task-specific parameters go here*/); g_queue.push(taskToBeExecuted); } /* .... */ } BTW: Because most "standard" queue implementations, such as C++'s std::queue, are not thread-safe, you will usually need to use a mutex in order to synchronize things. Also, in order to implement the case that the queue is empty – you want your threads to sleep in that case, instead of doing a highly inefficient "busy waiting" – you will probably have to use condition variables. Well, either that, or forget everything I said and just use C++11's std::async()
__________________
Go to https://standforukraine.com/ to find legitimate Ukrainian Charities 🇺🇦✊ Last edited by LoRd_MuldeR; 16th April 2019 at 21:36. |
17th April 2019, 23:24 | #6 | Link |
Software Developer
Join Date: Jun 2005
Location: Last House on Slunk Street
Posts: 13,248
|
I just wanted to add, that this can – for example – be implemented with one mutex and two semaphores.
The mutex is used to serialized all access to the shared queue. The first semaphore is used to count the "available" tasks in the queue; it blocks "worker" threads trying to acquire the next task while the queue is empty. And the second semaphore is used to count the "free" slots in the queue; it blocks the "main" thread when trying to enqueue another task while the queue has reached maximum length. Function called by the "main" thread to schedule another task: Code:
bool ThreadPool::schedule(ITask *const task) { MTHREAD_SEM_WAIT(&m_semFree); MTHREAD_MUTEX_LOCK(&m_lock); m_taskQueue.push(task); MTHREAD_SEM_POST(&m_semUsed); MTHREAD_MUTEX_UNLOCK(&m_lock); return true; } Code:
ITask *ThreadPool::nextTask(ThreadPool* pool) { ITask *task = NULL; MTHREAD_SEM_WAIT(&pool->m_semUsed); MTHREAD_MUTEX_LOCK(&pool->m_lock); task = pool->m_taskQueue.front(); pool->m_taskQueue.pop(); MTHREAD_SEM_POST(&pool->m_semFree); MTHREAD_MUTEX_UNLOCK(&pool->m_lock); return task; }
__________________
Go to https://standforukraine.com/ to find legitimate Ukrainian Charities 🇺🇦✊ Last edited by LoRd_MuldeR; 17th April 2019 at 23:34. |
18th April 2019, 11:38 | #7 | Link |
Formerly davidh*****
Join Date: Jan 2004
Posts: 2,496
|
Well, whatever I've cobbled together, it works. A lot of tutorials didn't seem all that bothered about how to tell if the jobs had actually finished - they were just fired and forgotten, which is not much use for a filter. I gave std::async a try, but that doesn't give you much of a speed increase, at least not the way I was doing it. I've made my own threadpool class which works the way I want it to, anyway (again a bit different from other implementations).
|
19th April 2019, 12:11 | #8 | Link | |
Software Developer
Join Date: Jun 2005
Location: Last House on Slunk Street
Posts: 13,248
|
Quote:
Function called by "main" thread to wait until all pending tasks are completed, in an efficient way: Code:
bool ThreadPool::wait(void) { MTHREAD_MUTEX_LOCK(&m_lockTask); while((m_runningTasks > 0) || (!m_taskQueue.empty())) { MTHREAD_COND_WAIT(&m_condAllDone, &m_lockTask); } MTHREAD_MUTEX_UNLOCK(&m_lockTask); return true; } Code:
void ThreadPool::finalizeTask(MTHREADPOOL_NS::ThreadPool* pool, ITask* task) { MTHREAD_MUTEX_LOCK(&pool->m_lockTask); pool->m_runningTasks--; if((pool->m_runningTasks == 0) && pool->m_taskQueue.empty()) { MTHREAD_COND_BROADCAST(&pool->m_condAllDone); } MTHREAD_MUTEX_UNLOCK(&pool->m_lockTask); }
__________________
Go to https://standforukraine.com/ to find legitimate Ukrainian Charities 🇺🇦✊ Last edited by LoRd_MuldeR; 19th April 2019 at 14:25. |
|
Thread Tools | Search this Thread |
Display Modes | |
|
|