summaryrefslogtreecommitdiffstats
path: root/xbmc/utils/ActorProtocol.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'xbmc/utils/ActorProtocol.cpp')
-rw-r--r--xbmc/utils/ActorProtocol.cpp371
1 files changed, 371 insertions, 0 deletions
diff --git a/xbmc/utils/ActorProtocol.cpp b/xbmc/utils/ActorProtocol.cpp
new file mode 100644
index 0000000..f83d2fc
--- /dev/null
+++ b/xbmc/utils/ActorProtocol.cpp
@@ -0,0 +1,371 @@
1/*
2 * Copyright (C) 2005-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
4 *
5 * SPDX-License-Identifier: LGPL-2.1-or-later
6 * See LICENSES/README.md for more information.
7 */
8
9#include "ActorProtocol.h"
10
11#include "threads/Event.h"
12
13#include <cstring>
14
15using namespace Actor;
16
17void Message::Release()
18{
19 bool skip;
20 origin.Lock();
21 skip = isSync ? !isSyncFini : false;
22 isSyncFini = true;
23 origin.Unlock();
24
25 if (skip)
26 return;
27
28 // free data buffer
29 if (data != buffer)
30 delete [] data;
31
32 payloadObj.reset();
33
34 // delete event in case of sync message
35 delete event;
36
37 origin.ReturnMessage(this);
38}
39
40bool Message::Reply(int sig, void *data /* = NULL*/, size_t size /* = 0 */)
41{
42 if (!isSync)
43 {
44 if (isOut)
45 return origin.SendInMessage(sig, data, size);
46 else
47 return origin.SendOutMessage(sig, data, size);
48 }
49
50 origin.Lock();
51
52 if (!isSyncTimeout)
53 {
54 Message *msg = origin.GetMessage();
55 msg->signal = sig;
56 msg->isOut = !isOut;
57 replyMessage = msg;
58 if (data)
59 {
60 if (size > sizeof(msg->buffer))
61 msg->data = new uint8_t[size];
62 else
63 msg->data = msg->buffer;
64 memcpy(msg->data, data, size);
65 }
66 }
67
68 origin.Unlock();
69
70 if (event)
71 event->Set();
72
73 return true;
74}
75
76Protocol::~Protocol()
77{
78 Message *msg;
79 Purge();
80 while (!freeMessageQueue.empty())
81 {
82 msg = freeMessageQueue.front();
83 freeMessageQueue.pop();
84 delete msg;
85 }
86}
87
88Message *Protocol::GetMessage()
89{
90 Message *msg;
91
92 CSingleLock lock(criticalSection);
93
94 if (!freeMessageQueue.empty())
95 {
96 msg = freeMessageQueue.front();
97 freeMessageQueue.pop();
98 }
99 else
100 msg = new Message(*this);
101
102 msg->isSync = false;
103 msg->isSyncFini = false;
104 msg->isSyncTimeout = false;
105 msg->event = NULL;
106 msg->data = NULL;
107 msg->payloadSize = 0;
108 msg->replyMessage = NULL;
109
110 return msg;
111}
112
113void Protocol::ReturnMessage(Message *msg)
114{
115 CSingleLock lock(criticalSection);
116
117 freeMessageQueue.push(msg);
118}
119
120bool Protocol::SendOutMessage(int signal,
121 const void* data /* = NULL */,
122 size_t size /* = 0 */,
123 Message* outMsg /* = NULL */)
124{
125 Message *msg;
126 if (outMsg)
127 msg = outMsg;
128 else
129 msg = GetMessage();
130
131 msg->signal = signal;
132 msg->isOut = true;
133
134 if (data)
135 {
136 if (size > sizeof(msg->buffer))
137 msg->data = new uint8_t[size];
138 else
139 msg->data = msg->buffer;
140 memcpy(msg->data, data, size);
141 }
142
143 { CSingleLock lock(criticalSection);
144 outMessages.push(msg);
145 }
146 if (containerOutEvent)
147 containerOutEvent->Set();
148
149 return true;
150}
151
152bool Protocol::SendOutMessage(int signal, CPayloadWrapBase *payload, Message *outMsg)
153{
154 Message *msg;
155 if (outMsg)
156 msg = outMsg;
157 else
158 msg = GetMessage();
159
160 msg->signal = signal;
161 msg->isOut = true;
162
163 msg->payloadObj.reset(payload);
164
165 { CSingleLock lock(criticalSection);
166 outMessages.push(msg);
167 }
168 if (containerOutEvent)
169 containerOutEvent->Set();
170
171 return true;
172}
173
174bool Protocol::SendInMessage(int signal,
175 const void* data /* = NULL */,
176 size_t size /* = 0 */,
177 Message* outMsg /* = NULL */)
178{
179 Message *msg;
180 if (outMsg)
181 msg = outMsg;
182 else
183 msg = GetMessage();
184
185 msg->signal = signal;
186 msg->isOut = false;
187
188 if (data)
189 {
190 if (size > sizeof(msg->data))
191 msg->data = new uint8_t[size];
192 else
193 msg->data = msg->buffer;
194 memcpy(msg->data, data, size);
195 }
196
197 { CSingleLock lock(criticalSection);
198 inMessages.push(msg);
199 }
200 if (containerInEvent)
201 containerInEvent->Set();
202
203 return true;
204}
205
206bool Protocol::SendInMessage(int signal, CPayloadWrapBase *payload, Message *outMsg)
207{
208 Message *msg;
209 if (outMsg)
210 msg = outMsg;
211 else
212 msg = GetMessage();
213
214 msg->signal = signal;
215 msg->isOut = false;
216
217 msg->payloadObj.reset(payload);
218
219 { CSingleLock lock(criticalSection);
220 inMessages.push(msg);
221 }
222 if (containerInEvent)
223 containerInEvent->Set();
224
225 return true;
226}
227
228bool Protocol::SendOutMessageSync(
229 int signal, Message** retMsg, int timeout, const void* data /* = NULL */, size_t size /* = 0 */)
230{
231 Message *msg = GetMessage();
232 msg->isOut = true;
233 msg->isSync = true;
234 msg->event = new CEvent;
235 msg->event->Reset();
236 SendOutMessage(signal, data, size, msg);
237
238 if (!msg->event->WaitMSec(timeout))
239 {
240 const CSingleLock lock(criticalSection);
241 if (msg->replyMessage)
242 *retMsg = msg->replyMessage;
243 else
244 {
245 *retMsg = NULL;
246 msg->isSyncTimeout = true;
247 }
248 }
249 else
250 *retMsg = msg->replyMessage;
251
252 msg->Release();
253
254 if (*retMsg)
255 return true;
256 else
257 return false;
258}
259
260bool Protocol::SendOutMessageSync(int signal, Message **retMsg, int timeout, CPayloadWrapBase *payload)
261{
262 Message *msg = GetMessage();
263 msg->isOut = true;
264 msg->isSync = true;
265 msg->event = new CEvent;
266 msg->event->Reset();
267 SendOutMessage(signal, payload, msg);
268
269 if (!msg->event->WaitMSec(timeout))
270 {
271 const CSingleLock lock(criticalSection);
272 if (msg->replyMessage)
273 *retMsg = msg->replyMessage;
274 else
275 {
276 *retMsg = NULL;
277 msg->isSyncTimeout = true;
278 }
279 }
280 else
281 *retMsg = msg->replyMessage;
282
283 msg->Release();
284
285 if (*retMsg)
286 return true;
287 else
288 return false;
289}
290
291bool Protocol::ReceiveOutMessage(Message **msg)
292{
293 CSingleLock lock(criticalSection);
294
295 if (outMessages.empty() || outDefered)
296 return false;
297
298 *msg = outMessages.front();
299 outMessages.pop();
300
301 return true;
302}
303
304bool Protocol::ReceiveInMessage(Message **msg)
305{
306 CSingleLock lock(criticalSection);
307
308 if (inMessages.empty() || inDefered)
309 return false;
310
311 *msg = inMessages.front();
312 inMessages.pop();
313
314 return true;
315}
316
317
318void Protocol::Purge()
319{
320 Message *msg;
321
322 while (ReceiveInMessage(&msg))
323 msg->Release();
324
325 while (ReceiveOutMessage(&msg))
326 msg->Release();
327}
328
329void Protocol::PurgeIn(int signal)
330{
331 Message *msg;
332 std::queue<Message*> msgs;
333
334 CSingleLock lock(criticalSection);
335
336 while (!inMessages.empty())
337 {
338 msg = inMessages.front();
339 inMessages.pop();
340 if (msg->signal != signal)
341 msgs.push(msg);
342 }
343 while (!msgs.empty())
344 {
345 msg = msgs.front();
346 msgs.pop();
347 inMessages.push(msg);
348 }
349}
350
351void Protocol::PurgeOut(int signal)
352{
353 Message *msg;
354 std::queue<Message*> msgs;
355
356 CSingleLock lock(criticalSection);
357
358 while (!outMessages.empty())
359 {
360 msg = outMessages.front();
361 outMessages.pop();
362 if (msg->signal != signal)
363 msgs.push(msg);
364 }
365 while (!msgs.empty())
366 {
367 msg = msgs.front();
368 msgs.pop();
369 outMessages.push(msg);
370 }
371}