AAX SDK 2.6.1
Avid Audio Extensions Development Kit
Loading...
Searching...
No Matches
AAX_CAtomicQueue.h
Go to the documentation of this file.
1/*================================================================================================*/
2/*
3 * Copyright 2015, 2023 Avid Technology, Inc.
4 * All rights reserved.
5 *
6 * CONFIDENTIAL: this document contains confidential information of Avid. Do
7 * not disclose to any third party. Use of the information contained in this
8 * document is subject to an Avid SDK license.
9 *
10 */
11
18/*================================================================================================*/
20#ifndef AAX_CATOMICQUEUE_H
21#define AAX_CATOMICQUEUE_H
23
24
25// AAX Includes
26#include "AAX_IPointerQueue.h"
27#include "AAX_Atomic.h"
28#include "AAX_CMutex.h"
29
30// Standard Includes
31#include <cstring>
32
33
51template <typename T, size_t S>
53{
54public:
55 virtual ~AAX_CAtomicQueue() {}
57
58public:
59 static const size_t template_size = S;
60
63
64public: // AAX_IContainer
65 virtual void Clear();
66
67public: // AAX_IPointerQueue
69 virtual value_type Pop();
70 virtual value_type Peek() const;
71
72private:
73 AAX_CMutex mMutex;
74 uint32_t mReadIdx;
75 uint32_t mWriteIdx;
76 value_type mRingBuffer[S];
77};
78
79
81
82template <typename T, size_t S>
85, mMutex()
86, mReadIdx(0)
87, mWriteIdx(0)
88{
89 Clear();
90}
91
92template <typename T, size_t S>
94{
95 std::memset((void*)mRingBuffer, 0x0, sizeof(mRingBuffer));
96}
97
98template <typename T, size_t S>
100{
101 if (NULL == inElem)
102 {
104 }
105
107
108 AAX_StLock_Guard guard(mMutex);
109 //
110 // Possible failure case without mutex is because of several write threads try to modify
111 // mWriteIdx concurrently
112 //
113 // Example:
114 //
115 // -
116 // Notation:
117 // First number - write thread number
118 // Second number - value number
119 // 1/15 - 1st thread that write number 15
120 //
121 // -
122 // Queue may look like this:
123 // mReadIdx
124 // |
125 // |..... | 4/3 | 4/4 | 1/4 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
126 // |
127 // mWriteIdx
128 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
129 //
130 // -
131 // Possible operation order (w stands for mWriteIdx, r - for mReadIdx):
132 //-------------------------------------------------------
133 // thread#| action | write index value | mWriteIdx |
134 // | | internal variable | |
135 //-------------------------------------------------------
136 // 5 | w++ | 2 | 2 |
137 //-------------------------------------------------------
138 // 6 | w++ | 3 | 3 |
139 //-------------------------------------------------------
140 // 5 | false | - | 2not=3 => 2--=1 |
141 //-------------------------------------------------------
142 // read | r++ | - | - |
143 //-------------------------------------------------------
144 // read | r++ | - | - |
145 //-------------------------------------------------------
146 // -
147 // Queue state:
148 // mReadIdx
149 // |
150 // |..... | 4/3 | 0 | 0 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
151 // |
152 // mWriteIdx
153 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
154 //
155 // -
156 //-------------------------------------------------------
157 // 6 | false | - | 3not=1 => 3--=2 | // place 3 is still not empty to write
158 //-------------------------------------------------------
159 //
160 // -
161 // Now, some other thread (5, for example) can successfully write
162 // it's value to queue and move mWriteIdx forward:
163 //
164 // -
165 // Queue state:
166 // mReadIdx
167 // |
168 // |..... | 4/3 | 0 | 5/1 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
169 // |
170 // mWriteIdx
171 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
172 //
173 // -
174 // Thus, we have one place with NULL value left. In the next round mReadIdx will
175 // stuck on place #1 (queue thinks that it's empty) until one of the write threads
176 // will write the value into place #1. It could be thread #5, so we have:
177 //
178 // -
179 // Queue state:
180 // mReadIdx
181 // |
182 // |..... | 9/1 | 5/9 | 5/1 | 1/5 | 2/7 | 2/8 | 2/9 | .....|
183 // |
184 // mWriteIdx
185 // place# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | .....|
186 //
187 // -
188 // And we will read 5/9 before 5/1
189 //
190 //
191 // Note that read/write both begin at index 1
192 const uint32_t idx = AAX_Atomic_IncThenGet_32(mWriteIdx);
193 const uint32_t widx = idx % S;
194
195 // Do the push. If the value at the current write index is non-NULL then we have filled the buffer.
196 const bool cxResult = AAX_Atomic_CompareAndExchange_Pointer(mRingBuffer[widx], (value_type)0x0, inElem);
197
198 if (false == cxResult)
199 {
201
202 const uint32_t ridx = (0 == idx) ? S : idx-1;
203
204 // Note the write index has already been incremented, so in the event of an overflow we must
205 // return the write index to its previous location.
206 //
207 // Note: if multiple write threads encounter concurrent push overflows then the write pointer
208 // will not be fully decremented back to the overflow location, and the read index will need
209 // to increment multiple positions to clear the overflow state.
210// const bool resetResult = AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, ridx);
211 AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, ridx);
212
213// printf("AAX_CAtomicQueue: overflow - reset: %s, idx: %lu, widx: %lu, inElem: %p\n",
214// resetResult ? "yes" : " no",
215// (unsigned long)idx,
216// (unsigned long)widx,
217// inElem);
218 }
219 else
220 {
222
223 // Handle wraparound
224 //
225 // There may be multiple write threads pushing elements at the same time, so we use
226 // (wrapped index < raw index) instead of (raw index == boundary)
227 //
228 // This assumes overhead between S and UINT_32_MAX of at least as many elements as
229 // there are write threads.
230
231// bool exchResult = false;
232 if (widx < idx)
233 {
234// exchResult =
235 AAX_Atomic_CompareAndExchange_32(mWriteIdx, idx, widx);
236 }
237
238// printf("AAX_CAtomicQueue: pushed - reset: %s, idx: %lu, widx: %lu, inElem: %p\n",
239// (widx < idx) ? exchResult ? "yes" : " no" : "n/a",
240// (unsigned long)idx,
241// (unsigned long)widx,
242// inElem);
243 }
244
245 return result;
246}
247
248template <typename T, size_t S>
250{
251 // Note that read/write both begin at index 1
252 mReadIdx = (mReadIdx+1) % template_size;
253 value_type const val = AAX_Atomic_Exchange_Pointer(mRingBuffer[mReadIdx], (value_type)0x0);
254
255// printf("AAX_CAtomicQueue: popped - reset: %s, idx: %lu, val: %p\n",
256// (0x0 == val) ? "yes" : " no",
257// (unsigned long)mReadIdx,
258// val);
259
260 if (0x0 == val)
261 {
262 // If the value is NULL then no value has yet been written to this location. Decrement the read index
263 --mReadIdx; // No need to handle wraparound since the read index will be incremented before the next read
264 }
265
266 return val;
267}
268
269template <typename T, size_t S>
271{
272 // I don't think that we need a memory barrier here because:
273 // a) mReadIdx will only be modified from the read thread, and therefore presumably
274 // using the same CPU (or at least I can't see any way for mReadIndex modification
275 // ordering to be a problem between Peek() and Pop() on a single thread.)
276 // b) We don't care if mRingBuffer modifications are run out of order between the read
277 // and write threads, as long as they are "close".
278 const uint32_t testIdx = (mReadIdx+1) % template_size;
279 return AAX_Atomic_Load_Pointer(&mRingBuffer[testIdx]);
280}
281
282// Attempt to support multiple read threads
283//
284// This approach is broken in the following scenario:
285//
286// Thread | Operation
287// A Pop v enter
288// A Pop - increment/get read index (get 1)
289// A Pop - exchange pointer (get 0x0)
290// other Push ptr1
291// other Push ptr2
292// B Pop v enter
293// B Pop - increment/get read index (get 2)
294// B Pop - exchange pointer (get ptr2)
295// ERROR: popped ptr2 before ptr1
296// B Pop ^ exit
297// A Pop - decrement read index (set 1)
298// A Pop ^ exit
299// any Pop v enter
300// any Pop - increment/get read index (get 2)
301// any Pop - exchange pointer (get 0x0)
302// ERROR: should be ptr2
303// This NULL state continues for further Pop calls until either Push wraps around
304// or another pair of concurrent calls to Pop just happens to re-aligign the read
305// index by incrementing twice before any reads occur
306// any Pop - decrement read index (set 1)
307// any Pop ^ exit
308//
309// This could be fixed by incrementing the read index until either a non-NULL value is found or
310// the initial position is reached, but that would have terrible performance.
311//
312// In any case, assuming a single read thread is optimal when we want maximum performance for read
313// operations, since this requires the fewest number of atomic operations in the read methods
314/*
315template <typename T, size_t S>
316inline typename AAX_CAtomicQueue<T, S>::value_type AAX_CAtomicQueue<T, S>::Pop()
317{
318 const uint32_t idx = AAX_Atomic_IncThenGet_32(mReadIdx);
319 const uint32_t widx = idx % S;
320
321 value_type const val = AAX_Atomic_Exchange_Pointer(mRingBuffer[widx], (value_type)0x0);
322
323 if (0x0 == val)
324 {
325 // If the value is NULL then no value has yet been written to this location. Decrement the read index
326 AAX_Atomic_DecThenGet_32(mReadIdx);
327 }
328 else
329 {
330 // Handle wraparound (assumes some overhead between S and UINT_32_MAX)
331 if (widx < idx)
332 {
333 AAX_Atomic_CompareAndExchange_32(mReadIdx, idx, widx);
334 }
335 }
336
337 return val;
338}
339 */
340
342
344#endif /* defined(AAX_CATOMICQUEUE_H) */
Abstract interface for a basic FIFO queue of pointers-to-objects.
Atomic operation utilities.
uint32_t AAX_CALLBACK AAX_Atomic_IncThenGet_32(uint32_t &ioData)
Increments a 32-bit value and returns the result.
bool AAX_CALLBACK AAX_Atomic_CompareAndExchange_32(volatile uint32_t &ioValue, uint32_t inCompareValue, uint32_t inExchangeValue)
Perform a compare and exchange operation on a 32-bit value.
bool AAX_CALLBACK AAX_Atomic_CompareAndExchange_Pointer(TPointer *&ioValue, TPointer *inCompareValue, TPointer *inExchangeValue)
Perform a compare and exchange operation on a pointer value.
Definition: AAX_Atomic.h:74
TPointer *AAX_CALLBACK AAX_Atomic_Load_Pointer(TPointer const *const volatile *inValue)
Atomically loads a pointer value.
TPointer *AAX_CALLBACK AAX_Atomic_Exchange_Pointer(TPointer *&ioValue, TPointer *inExchangeValue)
Perform an exchange operation on a pointer value.
Definition: AAX_Atomic.h:50
Definition: AAX_CAtomicQueue.h:53
virtual value_type Peek() const
virtual value_type Pop()
virtual AAX_IContainer::EStatus Push(value_type inElem)
static const size_t template_size
The size used for this template instance.
Definition: AAX_CAtomicQueue.h:59
virtual ~AAX_CAtomicQueue()
Definition: AAX_CAtomicQueue.h:55
virtual void Clear()
AAX_IPointerQueue< T >::template_type template_type
The type used for this template instance.
Definition: AAX_CAtomicQueue.h:61
AAX_IPointerQueue< T >::value_type value_type
The type of values stored in this queue.
Definition: AAX_CAtomicQueue.h:62
Mutex with try lock functionality.
Definition: AAX_CMutex.h:27
Helper class for working with mutex.
Definition: AAX_CMutex.h:47
EStatus
Definition: AAX_IContainer.h:35
@ eStatus_Unsupported
Operation is unsupported.
Definition: AAX_IContainer.h:40
@ eStatus_Unavailable
An internal resource was not available.
Definition: AAX_IContainer.h:39
@ eStatus_Overflow
Internal buffer overflow.
Definition: AAX_IContainer.h:37
@ eStatus_Success
Operation succeeded.
Definition: AAX_IContainer.h:36
Definition: AAX_IPointerQueue.h:32
T * value_type
The type of values stored in this queue.
Definition: AAX_IPointerQueue.h:38
T template_type
The type used for this template instance.
Definition: AAX_IPointerQueue.h:37