AAX SDK 2.8.0
Avid Audio Extensions Development Kit
Loading...
Searching...
No Matches
AAX_CAtomicQueue.h
Go to the documentation of this file.
1/*================================================================================================*/
2/*
3 * Copyright 2015, 2023-2024 Avid Technology, Inc.
4 * All rights reserved.
5 *
6 * This file is part of the Avid AAX SDK.
7 *
8 * The AAX SDK is subject to commercial or open-source licensing.
9 *
10 * By using the AAX SDK, you agree to the terms of both the Avid AAX SDK License
11 * Agreement and Avid Privacy Policy.
12 *
13 * AAX SDK License: https://developer.avid.com/aax
14 * Privacy Policy: https://www.avid.com/legal/privacy-policy-statement
15 *
16 * Or: You may also use this code under the terms of the GPL v3 (see
17 * www.gnu.org/licenses).
18 *
19 * THE AAX SDK IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
20 * EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
21 * DISCLAIMED.
22 *
23 */
24
31/*================================================================================================*/
33#ifndef AAX_CATOMICQUEUE_H
34#define AAX_CATOMICQUEUE_H
36
37
38// AAX Includes
39#include "AAX_IPointerQueue.h"
40#include "AAX_Atomic.h"
41#include "AAX_CMutex.h"
42
43// Standard Includes
44#include <cstring>
45
46
64template <typename T, size_t S>
66{
67public:
68 virtual ~AAX_CAtomicQueue() {}
70
71public:
72 static const size_t template_size = S;
73
76
77public: // AAX_IContainer
78 virtual void Clear();
79
80public: // AAX_IPointerQueue
82 virtual value_type Pop();
83 virtual value_type Peek() const;
84
85private:
86 AAX_CMutex mMutex;
87 uint32_t mReadIdx;
88 uint32_t mWriteIdx;
89 value_type mRingBuffer[S];
90};
91
92
94
95template <typename T, size_t S>
98, mMutex()
99, mReadIdx(0)
100, mWriteIdx(0)
101{
102 Clear();
103}
104
105template <typename T, size_t S>
107{
108 std::memset((void*)mRingBuffer, 0x0, sizeof(mRingBuffer));
109}
110
111template <typename T, size_t S>
113{
114 if (NULL == inElem)
115 {
117 }
118
120
121 AAX_StLock_Guard guard(mMutex);
122 //
123 // Possible failure case without mutex is because of several write threads try to modify
124 // mWriteIdx concurrently
125 //
126 // Example:
127 //
128 // -
129 // Notation:
130 // First number - write thread number
131 // Second number - value number
132 // 1/15 - 1st thread that write number 15
133 //
134 // -
135 // Queue may look like this:
136 // mReadIdx
137 // |
138 // |..... | 4/3 | 4/4 | 1/4 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
139 // |
140 // mWriteIdx
141 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
142 //
143 // -
144 // Possible operation order (w stands for mWriteIdx, r - for mReadIdx):
145 //-------------------------------------------------------
146 // thread#| action | write index value | mWriteIdx |
147 // | | internal variable | |
148 //-------------------------------------------------------
149 // 5 | w++ | 2 | 2 |
150 //-------------------------------------------------------
151 // 6 | w++ | 3 | 3 |
152 //-------------------------------------------------------
153 // 5 | false | - | 2not=3 => 2--=1 |
154 //-------------------------------------------------------
155 // read | r++ | - | - |
156 //-------------------------------------------------------
157 // read | r++ | - | - |
158 //-------------------------------------------------------
159 // -
160 // Queue state:
161 // mReadIdx
162 // |
163 // |..... | 4/3 | 0 | 0 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
164 // |
165 // mWriteIdx
166 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
167 //
168 // -
169 //-------------------------------------------------------
170 // 6 | false | - | 3not=1 => 3--=2 | // place 3 is still not empty to write
171 //-------------------------------------------------------
172 //
173 // -
174 // Now, some other thread (5, for example) can successfully write
175 // it's value to queue and move mWriteIdx forward:
176 //
177 // -
178 // Queue state:
179 // mReadIdx
180 // |
181 // |..... | 4/3 | 0 | 5/1 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
182 // |
183 // mWriteIdx
184 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
185 //
186 // -
187 // Thus, we have one place with NULL value left. In the next round mReadIdx will
188 // stuck on place #1 (queue thinks that it's empty) until one of the write threads
189 // will write the value into place #1. It could be thread #5, so we have:
190 //
191 // -
192 // Queue state:
193 // mReadIdx
194 // |
195 // |..... | 9/1 | 5/9 | 5/1 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
196 // |
197 // mWriteIdx
198 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
199 //
200 // -
201 // And we will read 5/9 before 5/1
202 //
203 //
204 // Note that read/write both begin at index 1
205 const uint32_t idx = AAX_Atomic_IncThenGet_32(mWriteIdx);
206 const uint32_t widx = idx % S;
207
208 // Do the push. If the value at the current write index is non-NULL then we have filled the buffer.
209 const bool cxResult = AAX_Atomic_CompareAndExchange_Pointer(mRingBuffer[widx], (value_type)0x0, inElem);
210
211 if (false == cxResult)
212 {
214
215 const uint32_t ridx = (0 == idx) ? S : idx-1;
216
217 // Note the write index has already been incremented, so in the event of an overflow we must
218 // return the write index to its previous location.
219 //
220 // Note: if multiple write threads encounter concurrent push overflows then the write pointer
221 // will not be fully decremented back to the overflow location, and the read index will need
222 // to increment multiple positions to clear the overflow state.
223// const bool resetResult = AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, ridx);
224 AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, ridx);
225
226// printf("AAX_CAtomicQueue: overflow - reset: %s, idx: %lu, widx: %lu, inElem: %p\n",
227// resetResult ? "yes" : " no",
228// (unsigned long)idx,
229// (unsigned long)widx,
230// inElem);
231 }
232 else
233 {
235
236 // Handle wraparound
237 //
238 // There may be multiple write threads pushing elements at the same time, so we use
239 // (wrapped index < raw index) instead of (raw index == boundary)
240 //
241 // This assumes overhead between S and UINT_32_MAX of at least as many elements as
242 // there are write threads.
243
244// bool exchResult = false;
245 if (widx < idx)
246 {
247// exchResult =
248 AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, widx);
249 }
250
251// printf("AAX_CAtomicQueue: pushed - reset: %s, idx: %lu, widx: %lu, inElem: %p\n",
252// (widx < idx) ? exchResult ? "yes" : " no" : "n/a",
253// (unsigned long)idx,
254// (unsigned long)widx,
255// inElem);
256 }
257
258 return result;
259}
260
261template <typename T, size_t S>
263{
264 // Note that read/write both begin at index 1
265 mReadIdx = (mReadIdx+1) % template_size;
266 value_type const val = AAX_Atomic_Exchange_Pointer(mRingBuffer[mReadIdx], (value_type)0x0);
267
268// printf("AAX_CAtomicQueue: popped - reset: %s, idx: %lu, val: %p\n",
269// (0x0 == val) ? "yes" : " no",
270// (unsigned long)mReadIdx,
271// val);
272
273 if (0x0 == val)
274 {
275 // If the value is NULL then no value has yet been written to this location. Decrement the read index
276 --mReadIdx; // No need to handle wraparound since the read index will be incremented before the next read
277 }
278
279 return val;
280}
281
282template <typename T, size_t S>
284{
285 // I don't think that we need a memory barrier here because:
286 // a) mReadIdx will only be modified from the read thread, and therefore presumably
287 // using the same CPU (or at least I can't see any way for mReadIndex modification
288 // ordering to be a problem between Peek() and Pop() on a single thread.)
289 // b) We don't care if mRingBuffer modifications are run out of order between the read
290 // and write threads, as long as they are "close".
291 const uint32_t testIdx = (mReadIdx+1) % template_size;
292 return AAX_Atomic_Load_Pointer(&mRingBuffer[testIdx]);
293}
294
295// Attempt to support multiple read threads
296//
297// This approach is broken in the following scenario:
298//
299// Thread | Operation
300// A Pop v enter
301// A Pop - increment/get read index (get 1)
302// A Pop - exchange pointer (get 0x0)
303// other Push ptr1
304// other Push ptr2
305// B Pop v enter
306// B Pop - increment/get read index (get 2)
307// B Pop - exchange pointer (get ptr2)
308// ERROR: popped ptr2 before ptr1
309// B Pop ^ exit
310// A Pop - decrement read index (set 1)
311// A Pop ^ exit
312// any Pop v enter
313// any Pop - increment/get read index (get 2)
314// any Pop - exchange pointer (get 0x0)
315// ERROR: should be ptr2
316// This NULL state continues for further Pop calls until either Push wraps around
317// or another pair of concurrent calls to Pop just happens to re-aligign the read
318// index by incrementing twice before any reads occur
319// any Pop - decrement read index (set 1)
320// any Pop ^ exit
321//
322// This could be fixed by incrementing the read index until either a non-NULL value is found or
323// the initial position is reached, but that would have terrible performance.
324//
325// In any case, assuming a single read thread is optimal when we want maximum performance for read
326// operations, since this requires the fewest number of atomic operations in the read methods
327/*
328template <typename T, size_t S>
329inline typename AAX_CAtomicQueue<T, S>::value_type AAX_CAtomicQueue<T, S>::Pop()
330{
331 const uint32_t idx = AAX_Atomic_IncThenGet_32(mReadIdx);
332 const uint32_t widx = idx % S;
333
334 value_type const val = AAX_Atomic_Exchange_Pointer(mRingBuffer[widx], (value_type)0x0);
335
336 if (0x0 == val)
337 {
338 // If the value is NULL then no value has yet been written to this location. Decrement the read index
339 AAX_Atomic_DecThenGet_32(mReadIdx);
340 }
341 else
342 {
343 // Handle wraparound (assumes some overhead between S and UINT_32_MAX)
344 if (widx < idx)
345 {
346 AAX_Atomic_CompareAndExchange_32(mReadIdx, idx, widx);
347 }
348 }
349
350 return val;
351}
352 */
353
355
357#endif /* defined(AAX_CATOMICQUEUE_H) */
Abstract interface for a basic FIFO queue of pointers-to-objects.
Atomic operation utilities.
uint32_t AAX_CALLBACK AAX_Atomic_IncThenGet_32(uint32_t &ioData)
Increments a 32-bit value and returns the result.
bool AAX_CALLBACK AAX_Atomic_CompareAndExchange_32(volatile uint32_t &ioValue, uint32_t inCompareValue, uint32_t inExchangeValue)
Perform a compare and exchange operation on a 32-bit value.
bool AAX_CALLBACK AAX_Atomic_CompareAndExchange_Pointer(TPointer *&ioValue, TPointer *inCompareValue, TPointer *inExchangeValue)
Perform a compare and exchange operation on a pointer value.
Definition: AAX_Atomic.h:87
TPointer *AAX_CALLBACK AAX_Atomic_Load_Pointer(TPointer const *const volatile *inValue)
Atomically loads a pointer value.
TPointer *AAX_CALLBACK AAX_Atomic_Exchange_Pointer(TPointer *&ioValue, TPointer *inExchangeValue)
Perform an exchange operation on a pointer value.
Definition: AAX_Atomic.h:63
Definition: AAX_CAtomicQueue.h:66
virtual value_type Peek() const
virtual value_type Pop()
virtual AAX_IContainer::EStatus Push(value_type inElem)
static const size_t template_size
The size used for this template instance.
Definition: AAX_CAtomicQueue.h:72
virtual ~AAX_CAtomicQueue()
Definition: AAX_CAtomicQueue.h:68
virtual void Clear()
AAX_IPointerQueue< T >::template_type template_type
The type used for this template instance.
Definition: AAX_CAtomicQueue.h:74
AAX_IPointerQueue< T >::value_type value_type
The type of values stored in this queue.
Definition: AAX_CAtomicQueue.h:75
Mutex with try lock functionality.
Definition: AAX_CMutex.h:40
Helper class for working with mutex.
Definition: AAX_CMutex.h:60
EStatus
Definition: AAX_IContainer.h:48
@ eStatus_Unsupported
Operation is unsupported.
Definition: AAX_IContainer.h:53
@ eStatus_Unavailable
An internal resource was not available.
Definition: AAX_IContainer.h:52
@ eStatus_Overflow
Internal buffer overflow.
Definition: AAX_IContainer.h:50
@ eStatus_Success
Operation succeeded.
Definition: AAX_IContainer.h:49
Definition: AAX_IPointerQueue.h:45
T * value_type
The type of values stored in this queue.
Definition: AAX_IPointerQueue.h:51
T template_type
The type used for this template instance.
Definition: AAX_IPointerQueue.h:50