20 #ifndef _WORKQUEUE_H_INCLUDED_
21 #define _WORKQUEUE_H_INCLUDED_
51 const std::string name;
55 unsigned n_workers_exited = 0;
58 unsigned n_threads = 0;
59 pthread_t *threads =
nullptr;
91 bool start(
unsigned nworkers,
void *(*workproc)(
void *),
void *arg)
93 const std::lock_guard<Mutex> protect(mutex);
97 assert(n_threads == 0);
98 assert(threads ==
nullptr);
101 n_threads = nworkers;
102 threads =
new pthread_t[n_threads];
104 for (
unsigned i = 0; i < nworkers; i++) {
106 if ((err = pthread_create(&threads[i], 0, workproc, arg))) {
107 LOGERR((
"WorkQueue:%s: pthread_create failed, err %d\n",
123 const std::lock_guard<Mutex> protect(mutex);
125 queue.emplace(std::forward<U>(u));
138 const std::lock_guard<Mutex> protect(mutex);
142 while (n_workers_exited < n_threads) {
144 client_cond.
wait(mutex);
149 for (
unsigned i = 0; i < n_threads; ++i) {
151 pthread_join(threads[i], &status);
159 n_workers_exited = 0;
169 const std::lock_guard<Mutex> protect(mutex);
174 while (queue.empty()) {
175 worker_cond.
wait(mutex);
180 tp = std::move(queue.front());
195 const std::lock_guard<Mutex> protect(mutex);
void workerExit()
Advertise exit and abort queue.
bool put(U &&u)
Add item to work queue, called from client.
A WorkQueue manages the synchronisation around a queue of work items, where a number of client thread...
void wait(PosixMutex &mutex)
void setTerminateAndWait()
Tell the workers to exit, and wait for them.
bool take(T &tp)
Take task from queue.
bool start(unsigned nworkers, void *(*workproc)(void *), void *arg)
Start the worker threads.
WorkQueue(const char *_name)
Create a WorkQueue.
WorkQueue & operator=(const WorkQueue &)=delete