summaryrefslogtreecommitdiffstats
path: root/xbmc/utils/JobManager.cpp
diff options
context:
space:
mode:
authormanuel <manuel@mausz.at>2020-10-19 00:52:24 +0200
committermanuel <manuel@mausz.at>2020-10-19 00:52:24 +0200
commitbe933ef2241d79558f91796cc5b3a161f72ebf9c (patch)
treefe3ab2f130e20c99001f2d7a81d610c78c96a3f4 /xbmc/utils/JobManager.cpp
parent5f8335c1e49ce108ef3481863833c98efa00411b (diff)
downloadkodi-pvr-build-be933ef2241d79558f91796cc5b3a161f72ebf9c.tar.gz
kodi-pvr-build-be933ef2241d79558f91796cc5b3a161f72ebf9c.tar.bz2
kodi-pvr-build-be933ef2241d79558f91796cc5b3a161f72ebf9c.zip
sync with upstream
Diffstat (limited to 'xbmc/utils/JobManager.cpp')
-rw-r--r--xbmc/utils/JobManager.cpp423
1 files changed, 423 insertions, 0 deletions
diff --git a/xbmc/utils/JobManager.cpp b/xbmc/utils/JobManager.cpp
new file mode 100644
index 0000000..3c8e04b
--- /dev/null
+++ b/xbmc/utils/JobManager.cpp
@@ -0,0 +1,423 @@
1/*
2 * Copyright (C) 2005-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
4 *
5 * SPDX-License-Identifier: GPL-2.0-or-later
6 * See LICENSES/README.md for more information.
7 */
8
9#include "JobManager.h"
10
11#include "threads/SingleLock.h"
12#include "utils/XTimeUtils.h"
13#include "utils/log.h"
14
15#include <algorithm>
16#include <functional>
17#include <stdexcept>
18
19bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
20{
21 if (m_callback)
22 return m_callback->OnJobProgress(progress, total, this);
23 return false;
24}
25
26CJobWorker::CJobWorker(CJobManager *manager) : CThread("JobWorker")
27{
28 m_jobManager = manager;
29 Create(true); // start work immediately, and kill ourselves when we're done
30}
31
32CJobWorker::~CJobWorker()
33{
34 // while we should already be removed from the job manager, if an exception
35 // occurs during processing that we haven't caught, we may skip over that step.
36 // Thus, before we go out of scope, ensure the job manager knows we're gone.
37 m_jobManager->RemoveWorker(this);
38 if(!IsAutoDelete())
39 StopThread();
40}
41
42void CJobWorker::Process()
43{
44 SetPriority( GetMinPriority() );
45 while (true)
46 {
47 // request an item from our manager (this call is blocking)
48 CJob *job = m_jobManager->GetNextJob(this);
49 if (!job)
50 break;
51
52 bool success = false;
53 try
54 {
55 success = job->DoWork();
56 }
57 catch (...)
58 {
59 CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, job->GetType());
60 }
61 m_jobManager->OnJobComplete(success, job);
62 }
63}
64
65void CJobQueue::CJobPointer::CancelJob()
66{
67 CJobManager::GetInstance().CancelJob(m_id);
68 m_id = 0;
69}
70
71CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
72: m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
73{
74}
75
76CJobQueue::~CJobQueue()
77{
78 CancelJobs();
79}
80
81void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
82{
83 CSingleLock lock(m_section);
84 // check if this job is in our processing list
85 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
86 if (i != m_processing.end())
87 m_processing.erase(i);
88 // request a new job be queued
89 QueueNextJob();
90}
91
92void CJobQueue::CancelJob(const CJob *job)
93{
94 CSingleLock lock(m_section);
95 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
96 if (i != m_processing.end())
97 {
98 i->CancelJob();
99 m_processing.erase(i);
100 return;
101 }
102 Queue::iterator j = find(m_jobQueue.begin(), m_jobQueue.end(), job);
103 if (j != m_jobQueue.end())
104 {
105 j->FreeJob();
106 m_jobQueue.erase(j);
107 }
108}
109
110bool CJobQueue::AddJob(CJob *job)
111{
112 CSingleLock lock(m_section);
113 // check if we have this job already. If so, we're done.
114 if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
115 find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
116 {
117 delete job;
118 return false;
119 }
120
121 if (m_lifo)
122 m_jobQueue.push_back(CJobPointer(job));
123 else
124 m_jobQueue.push_front(CJobPointer(job));
125 QueueNextJob();
126
127 return true;
128}
129
130void CJobQueue::QueueNextJob()
131{
132 CSingleLock lock(m_section);
133 if (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
134 {
135 CJobPointer &job = m_jobQueue.back();
136 job.m_id = CJobManager::GetInstance().AddJob(job.m_job, this, m_priority);
137 m_processing.push_back(job);
138 m_jobQueue.pop_back();
139 }
140}
141
142void CJobQueue::CancelJobs()
143{
144 CSingleLock lock(m_section);
145 for_each(m_processing.begin(), m_processing.end(), [](CJobPointer& jp) { jp.CancelJob(); });
146 for_each(m_jobQueue.begin(), m_jobQueue.end(), [](CJobPointer& jp) { jp.FreeJob(); });
147 m_jobQueue.clear();
148 m_processing.clear();
149}
150
151bool CJobQueue::IsProcessing() const
152{
153 return CJobManager::GetInstance().m_running && (!m_processing.empty() || !m_jobQueue.empty());
154}
155
156bool CJobQueue::QueueEmpty() const
157{
158 CSingleLock lock(m_section);
159 return m_jobQueue.empty();
160}
161
162CJobManager &CJobManager::GetInstance()
163{
164 static CJobManager sJobManager;
165 return sJobManager;
166}
167
168CJobManager::CJobManager()
169{
170 m_jobCounter = 0;
171 m_running = true;
172 m_pauseJobs = false;
173}
174
175void CJobManager::Restart()
176{
177 CSingleLock lock(m_section);
178
179 if (m_running)
180 throw std::logic_error("CJobManager already running");
181 m_running = true;
182}
183
184void CJobManager::CancelJobs()
185{
186 CSingleLock lock(m_section);
187 m_running = false;
188
189 // clear any pending jobs
190 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
191 {
192 for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), [](CWorkItem& wi) { wi.FreeJob(); });
193 m_jobQueue[priority].clear();
194 }
195
196 // cancel any callbacks on jobs still processing
197 for_each(m_processing.begin(), m_processing.end(), [](CWorkItem& wi) { wi.Cancel(); });
198
199 // tell our workers to finish
200 while (m_workers.size())
201 {
202 lock.Leave();
203 m_jobEvent.Set();
204 std::this_thread::yield(); // yield after setting the event to give the workers some time to die
205 lock.Enter();
206 }
207}
208
209unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
210{
211 CSingleLock lock(m_section);
212
213 if (!m_running)
214 return 0;
215
216 // increment the job counter, ensuring 0 (invalid job) is never hit
217 m_jobCounter++;
218 if (m_jobCounter == 0)
219 m_jobCounter++;
220
221 // create a work item for this job
222 CWorkItem work(job, m_jobCounter, priority, callback);
223 m_jobQueue[priority].push_back(work);
224
225 StartWorkers(priority);
226 return work.m_id;
227}
228
229void CJobManager::CancelJob(unsigned int jobID)
230{
231 CSingleLock lock(m_section);
232
233 // check whether we have this job in the queue
234 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
235 {
236 JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
237 if (i != m_jobQueue[priority].end())
238 {
239 delete i->m_job;
240 m_jobQueue[priority].erase(i);
241 return;
242 }
243 }
244 // or if we're processing it
245 Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
246 if (it != m_processing.end())
247 it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
248}
249
250void CJobManager::StartWorkers(CJob::PRIORITY priority)
251{
252 CSingleLock lock(m_section);
253
254 // check how many free threads we have
255 if (m_processing.size() >= GetMaxWorkers(priority))
256 return;
257
258 // do we have any sleeping threads?
259 if (m_processing.size() < m_workers.size())
260 {
261 m_jobEvent.Set();
262 return;
263 }
264
265 // everyone is busy - we need more workers
266 m_workers.push_back(new CJobWorker(this));
267}
268
269CJob *CJobManager::PopJob()
270{
271 CSingleLock lock(m_section);
272 for (int priority = CJob::PRIORITY_DEDICATED; priority >= CJob::PRIORITY_LOW_PAUSABLE; --priority)
273 {
274 // Check whether we're pausing pausable jobs
275 if (priority == CJob::PRIORITY_LOW_PAUSABLE && m_pauseJobs)
276 continue;
277
278 if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
279 {
280 // pop the job off the queue
281 CWorkItem job = m_jobQueue[priority].front();
282 m_jobQueue[priority].pop_front();
283
284 // add to the processing vector
285 m_processing.push_back(job);
286 job.m_job->m_callback = this;
287 return job.m_job;
288 }
289 }
290 return NULL;
291}
292
293void CJobManager::PauseJobs()
294{
295 CSingleLock lock(m_section);
296 m_pauseJobs = true;
297}
298
299void CJobManager::UnPauseJobs()
300{
301 CSingleLock lock(m_section);
302 m_pauseJobs = false;
303}
304
305bool CJobManager::IsProcessing(const CJob::PRIORITY &priority) const
306{
307 CSingleLock lock(m_section);
308
309 if (m_pauseJobs)
310 return false;
311
312 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
313 {
314 if (priority == it->m_priority)
315 return true;
316 }
317 return false;
318}
319
320int CJobManager::IsProcessing(const std::string &type) const
321{
322 int jobsMatched = 0;
323 CSingleLock lock(m_section);
324
325 if (m_pauseJobs)
326 return 0;
327
328 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
329 {
330 if (type == std::string(it->m_job->GetType()))
331 jobsMatched++;
332 }
333 return jobsMatched;
334}
335
336CJob *CJobManager::GetNextJob(const CJobWorker *worker)
337{
338 CSingleLock lock(m_section);
339 while (m_running)
340 {
341 // grab a job off the queue if we have one
342 CJob *job = PopJob();
343 if (job)
344 return job;
345 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
346 lock.Leave();
347 bool newJob = m_jobEvent.WaitMSec(30000);
348 lock.Enter();
349 if (!newJob)
350 break;
351 }
352 // ensure no jobs have come in during the period after
353 // timeout and before we held the lock
354 CJob *job = PopJob();
355 if (job)
356 return job;
357 // have no jobs
358 RemoveWorker(worker);
359 return NULL;
360}
361
362bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
363{
364 CSingleLock lock(m_section);
365 // find the job in the processing queue, and check whether it's cancelled (no callback)
366 Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
367 if (i != m_processing.end())
368 {
369 CWorkItem item(*i);
370 lock.Leave(); // leave section prior to call
371 if (item.m_callback)
372 {
373 item.m_callback->OnJobProgress(item.m_id, progress, total, job);
374 return false;
375 }
376 }
377 return true; // couldn't find the job, or it's been cancelled
378}
379
380void CJobManager::OnJobComplete(bool success, CJob *job)
381{
382 CSingleLock lock(m_section);
383 // remove the job from the processing queue
384 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
385 if (i != m_processing.end())
386 {
387 // tell any listeners we're done with the job, then delete it
388 CWorkItem item(*i);
389 lock.Leave();
390 try
391 {
392 if (item.m_callback)
393 item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
394 }
395 catch (...)
396 {
397 CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, item.m_job->GetType());
398 }
399 lock.Enter();
400 Processing::iterator j = find(m_processing.begin(), m_processing.end(), job);
401 if (j != m_processing.end())
402 m_processing.erase(j);
403 lock.Leave();
404 item.FreeJob();
405 }
406}
407
408void CJobManager::RemoveWorker(const CJobWorker *worker)
409{
410 CSingleLock lock(m_section);
411 // remove our worker
412 Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
413 if (i != m_workers.end())
414 m_workers.erase(i); // workers auto-delete
415}
416
417unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority)
418{
419 static const unsigned int max_workers = 5;
420 if (priority == CJob::PRIORITY_DEDICATED)
421 return 10000; // A large number..
422 return max_workers - (CJob::PRIORITY_HIGH - priority);
423}