summaryrefslogtreecommitdiffstats
path: root/xbmc/utils/EventStream.h
diff options
context:
space:
mode:
Diffstat (limited to 'xbmc/utils/EventStream.h')
-rw-r--r--xbmc/utils/EventStream.h100
1 files changed, 100 insertions, 0 deletions
diff --git a/xbmc/utils/EventStream.h b/xbmc/utils/EventStream.h
new file mode 100644
index 0000000..42a17df
--- /dev/null
+++ b/xbmc/utils/EventStream.h
@@ -0,0 +1,100 @@
1/*
2 * Copyright (C) 2016-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
4 *
5 * SPDX-License-Identifier: GPL-2.0-or-later
6 * See LICENSES/README.md for more information.
7 */
8
9#pragma once
10
11#include "EventStreamDetail.h"
12#include "JobManager.h"
13#include "threads/CriticalSection.h"
14#include "threads/SingleLock.h"
15
16#include <algorithm>
17#include <memory>
18#include <vector>
19
20
21template<typename Event>
22class CEventStream
23{
24public:
25
26 template<typename A>
27 void Subscribe(A* owner, void (A::*fn)(const Event&))
28 {
29 auto subscription = std::make_shared<detail::CSubscription<Event, A>>(owner, fn);
30 CSingleLock lock(m_criticalSection);
31 m_subscriptions.emplace_back(std::move(subscription));
32 }
33
34 template<typename A>
35 void Unsubscribe(A* obj)
36 {
37 std::vector<std::shared_ptr<detail::ISubscription<Event>>> toCancel;
38 {
39 CSingleLock lock(m_criticalSection);
40 auto it = m_subscriptions.begin();
41 while (it != m_subscriptions.end())
42 {
43 if ((*it)->IsOwnedBy(obj))
44 {
45 toCancel.push_back(*it);
46 it = m_subscriptions.erase(it);
47 }
48 else
49 {
50 ++it;
51 }
52 }
53 }
54 for (auto& subscription : toCancel)
55 subscription->Cancel();
56 }
57
58protected:
59 std::vector<std::shared_ptr<detail::ISubscription<Event>>> m_subscriptions;
60 CCriticalSection m_criticalSection;
61};
62
63
64template<typename Event>
65class CEventSource : public CEventStream<Event>
66{
67public:
68 explicit CEventSource() : m_queue(false, 1, CJob::PRIORITY_HIGH) {};
69
70 template<typename A>
71 void Publish(A event)
72 {
73 CSingleLock lock(this->m_criticalSection);
74 auto& subscriptions = this->m_subscriptions;
75 auto task = [subscriptions, event](){
76 for (auto& s: subscriptions)
77 s->HandleEvent(event);
78 };
79 lock.Leave();
80 m_queue.Submit(std::move(task));
81 }
82
83private:
84 CJobQueue m_queue;
85};
86
87template<typename Event>
88class CBlockingEventSource : public CEventStream<Event>
89{
90public:
91 template<typename A>
92 void HandleEvent(A event)
93 {
94 CSingleLock lock(this->m_criticalSection);
95 for (const auto& subscription : this->m_subscriptions)
96 {
97 subscription->HandleEvent(event);
98 }
99 }
100};