Welcome to Doom9's Forum, THE in-place to be for everyone interested in DVD conversion.

Before you start posting please read the forum rules. By posting to this forum you agree to abide by the rules.

 

Go Back   Doom9's Forum > Capturing and Editing Video > Avisynth Development

Reply
 
Thread Tools Search this Thread Display Modes
Old 16th April 2019, 11:13   #1  |  Link
wonkey_monkey
Formerly davidh*****
 
wonkey_monkey's Avatar
 
Join Date: Jan 2004
Posts: 1,822
Does anyone know anything about threadpools?

I usually internally multithread my filters, but with my latest one I'm running into a problem because the overhead for creating the threads is too high, and offsets any speed advantage.

I did a bit of Googling, and found out about threadpools - create a bunch of threads once, and then using conditional variables to signal to them to do work when required. That way, the thread creation overhead is removed, because the threads are created once on construction of the filter, not per frame (or several times per frame, as in my case).

But, as usual, I don't really know what I'm doing and I can't get it to work properly. It seems like there's some kind of timing issue where one of the threads sometimes fails to get the notification. What's really annoying is that as soon as I add more code to debug it, that slows things down enough that the problem doesn't happen, which makes it hard to figure out.

So does anyone here understand threadpools who could help me figure out what's going wrong?
__________________
My AviSynth filters / I'm the Doctor
wonkey_monkey is offline   Reply With Quote
Old 16th April 2019, 14:48   #2  |  Link
StainlessS
HeartlessS Usurer
 
StainlessS's Avatar
 
Join Date: Dec 2009
Location: Over the rainbow
Posts: 7,196
Have you seen this:- https://forum.doom9.org/showthread.php?t=164407

Quote:
Originally Posted by cretindesalpes View Post
AVSTP is a programming library for Avisynth plug-in developers. It helps supporting native multi-threading in plug-ins. It works by sharing a thread pool between multiple plug-ins, so the number of threads stays low whatever the number of instantiated plug-ins. This helps saving resources, especially when working in an Avisynth MT environment.

There is a low-level API using a C interface to access the shared functions of the library. There is also a set of C++ classes on the top of that, for convenient use.
No idea how any of that works.
__________________
I sometimes post sober.
StainlessS@MediaFire ::: AND/OR ::: StainlessS@SendSpace

"Some infinities are bigger than other infinities", but how many of them are infinitely bigger ???
StainlessS is offline   Reply With Quote
Old 16th April 2019, 17:56   #3  |  Link
jpsdr
Registered User
 
Join Date: Oct 2002
Location: France
Posts: 1,758
Take a look at the code of my plugins.
__________________
My github.
jpsdr is offline   Reply With Quote
Old 16th April 2019, 21:32   #4  |  Link
LoRd_MuldeR
Software Developer
 
LoRd_MuldeR's Avatar
 
Join Date: Jun 2005
Location: Last House on Slunk Street
Posts: 13,047
Put simply, you create a fixed number of "worker" threads once, and you keep them running continuously – instead of creating short-lived threads for each task.

Each "worker" thread executes an infinite loop that looks like this:
Code:
void thread_main(void)
{
    while(!g_isTerminating)
    {
        Task nextTask = g_queue.take();
        nextTask.execute();
    }
}
In the above code, it is assumed that the queue g_queue is global, i.e. shared between all worker threads (as well as with the rest of the application), and that it is synchronized, i.e. thread-safe. Also, it is assumed that take() returns the "next" item from the queue and removes that item from the queue at the same time – both in a single "atomic" operation. Last but not least, it is assumed that take() will block, while there is no item left in the queue.

Now, in order to have some task executed by the thread pool, you simply push the task object onto the queue, e.g. from the "main" thread, and that's it:

Code:
void main(void)
{
    /* .... */

    for(int i = 0; i < numberOfTasks; ++i)
    {
        Task taskToBeExecuted = new Task(/*task-specific parameters go here*/);
        g_queue.push(taskToBeExecuted);
    }

    /* .... */
}
This is actually an example of the well-known producer consumer pattern. Here the "main" thread is the producer, i.e. it produces tasks, and the "worker threads" are consumers, i.e. they consume tasks.

BTW: Because most "standard" queue implementations, such as C++'s std::queue, are not thread-safe, you will usually need to use a mutex in order to synchronize things. Also, in order to implement the case that the queue is empty – you want your threads to sleep in that case, instead of doing a highly inefficient "busy waiting" – you will probably have to use condition variables. Well, either that, or forget everything I said and just use C++11's std::async()
__________________
There was of course no way of knowing whether you were being watched at any given moment.
How often, or on what system, the Thought Police plugged in on any individual wire was guesswork.



Last edited by LoRd_MuldeR; 16th April 2019 at 22:36.
LoRd_MuldeR is offline   Reply With Quote
Old 16th April 2019, 23:53   #5  |  Link
wonkey_monkey
Formerly davidh*****
 
wonkey_monkey's Avatar
 
Join Date: Jan 2004
Posts: 1,822
Thanks, I'm starting to get somewhere now.
__________________
My AviSynth filters / I'm the Doctor
wonkey_monkey is offline   Reply With Quote
Old 18th April 2019, 00:24   #6  |  Link
LoRd_MuldeR
Software Developer
 
LoRd_MuldeR's Avatar
 
Join Date: Jun 2005
Location: Last House on Slunk Street
Posts: 13,047
I just wanted to add, that this can – for example – be implemented with one mutex and two semaphores.

The mutex is used to serialized all access to the shared queue. The first semaphore is used to count the "available" tasks in the queue; it blocks "worker" threads trying to acquire the next task while the queue is empty. And the second semaphore is used to count the "free" slots in the queue; it blocks the "main" thread when trying to enqueue another task while the queue has reached maximum length.

Function called by the "main" thread to schedule another task:
Code:
bool ThreadPool::schedule(ITask *const task)
{
	MTHREAD_SEM_WAIT(&m_semFree);
	MTHREAD_MUTEX_LOCK(&m_lock);

	m_taskQueue.push(task);
	MTHREAD_SEM_POST(&m_semUsed);

	MTHREAD_MUTEX_UNLOCK(&m_lock);
	return true;
}
Function called by a "worker" thread to acquire the next task to be executed:
Code:
ITask *ThreadPool::nextTask(ThreadPool* pool)
{
	ITask *task = NULL;

	MTHREAD_SEM_WAIT(&pool->m_semUsed);
	MTHREAD_MUTEX_LOCK(&pool->m_lock);

	task = pool->m_taskQueue.front();
	pool->m_taskQueue.pop();
	MTHREAD_SEM_POST(&pool->m_semFree);

	MTHREAD_MUTEX_UNLOCK(&pool->m_lock);
	return task;
}
__________________
There was of course no way of knowing whether you were being watched at any given moment.
How often, or on what system, the Thought Police plugged in on any individual wire was guesswork.



Last edited by LoRd_MuldeR; 18th April 2019 at 00:34.
LoRd_MuldeR is offline   Reply With Quote
Old 18th April 2019, 12:38   #7  |  Link
wonkey_monkey
Formerly davidh*****
 
wonkey_monkey's Avatar
 
Join Date: Jan 2004
Posts: 1,822
Well, whatever I've cobbled together, it works. A lot of tutorials didn't seem all that bothered about how to tell if the jobs had actually finished - they were just fired and forgotten, which is not much use for a filter. I gave std::async a try, but that doesn't give you much of a speed increase, at least not the way I was doing it. I've made my own threadpool class which works the way I want it to, anyway (again a bit different from other implementations).
__________________
My AviSynth filters / I'm the Doctor
wonkey_monkey is offline   Reply With Quote
Old 19th April 2019, 13:11   #8  |  Link
LoRd_MuldeR
Software Developer
 
LoRd_MuldeR's Avatar
 
Join Date: Jun 2005
Location: Last House on Slunk Street
Posts: 13,047
Quote:
Originally Posted by wonkey_monkey View Post
A lot of tutorials didn't seem all that bothered about how to tell if the jobs had actually finished - they were just fired and forgotten, which is not much use for a filter.
This depends a lot on whether you want to wait for particular tasks to complete (e.g. via "future" object), or whether you just want to wait until all scheduled tasks have completed.

Function called by "main" thread to wait until all pending tasks are completed, in an efficient way:
Code:
bool ThreadPool::wait(void)
{
	MTHREAD_MUTEX_LOCK(&m_lockTask);

	while((m_runningTasks > 0) || (!m_taskQueue.empty()))
	{
		MTHREAD_COND_WAIT(&m_condAllDone, &m_lockTask);
	}

	MTHREAD_MUTEX_UNLOCK(&m_lockTask);
	return true;
}
The corresponding function called by "worker" thread after a task has completed:
Code:
void ThreadPool::finalizeTask(MTHREADPOOL_NS::ThreadPool* pool, ITask* task)
{
	MTHREAD_MUTEX_LOCK(&pool->m_lockTask);

	pool->m_runningTasks--;
	if((pool->m_runningTasks == 0) && pool->m_taskQueue.empty())
	{
		MTHREAD_COND_BROADCAST(&pool->m_condAllDone);
	}

	MTHREAD_MUTEX_UNLOCK(&pool->m_lockTask);
}
__________________
There was of course no way of knowing whether you were being watched at any given moment.
How often, or on what system, the Thought Police plugged in on any individual wire was guesswork.



Last edited by LoRd_MuldeR; 19th April 2019 at 15:25.
LoRd_MuldeR is offline   Reply With Quote
Reply

Thread Tools Search this Thread
Search this Thread:

Advanced Search
Display Modes

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off

Forum Jump


All times are GMT +1. The time now is 08:42.


Powered by vBulletin® Version 3.8.11
Copyright ©2000 - 2019, vBulletin Solutions Inc.