00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_H
00022 #define __TBB_concurrent_queue_H
00023
00024 #include "tbb_stddef.h"
00025 #include "cache_aligned_allocator.h"
00026 #include <iterator>
00027 #include <new>
00028
00029 namespace tbb {
00030
00031 template<typename T, class A = cache_aligned_allocator<T> >
00032 class concurrent_queue;
00033
00035 namespace internal {
00036
00037 class concurrent_queue_rep;
00038 class concurrent_queue_iterator_rep;
00039 class concurrent_queue_iterator_base_v3;
00040 template<typename Container, typename Value> class concurrent_queue_iterator;
00041
00043
00045 class concurrent_queue_base_v3: no_copy {
00047 concurrent_queue_rep* my_rep;
00048
00049 friend class concurrent_queue_rep;
00050 friend struct micro_queue;
00051 friend class micro_queue_pop_finalizer;
00052 friend class concurrent_queue_iterator_rep;
00053 friend class concurrent_queue_iterator_base_v3;
00054 protected:
00056 struct page {
00057 page* next;
00058 uintptr mask;
00059 };
00060
00062 ptrdiff_t my_capacity;
00063
00065 size_t items_per_page;
00066
00068 size_t item_size;
00069
00070 private:
00071 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00072 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00073 protected:
00074 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00075 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00076
00078 void __TBB_EXPORTED_METHOD internal_push( const void* src );
00079
00081 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00082
00084 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00085
00087
00088 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00089
00091 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00092
00094 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00095
00097 virtual page *allocate_page() = 0;
00098
00100 virtual void deallocate_page( page *p ) = 0;
00101
00103
00104 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00105
00107 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00108
00110 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00111
00112 private:
00113 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00114 };
00115
00116 typedef concurrent_queue_base_v3 concurrent_queue_base ;
00117
00119
00120 class concurrent_queue_iterator_base_v3 {
00122
00123 concurrent_queue_iterator_rep* my_rep;
00124
00125 template<typename C, typename T, typename U>
00126 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00127
00128 template<typename C, typename T, typename U>
00129 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00130 protected:
00132 mutable void* my_item;
00133
00135 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00136
00138 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00139 assign(i);
00140 }
00141
00143 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue );
00144
00146 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00147
00149 void __TBB_EXPORTED_METHOD advance();
00150
00152 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00153 };
00154
00155 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00156
00158
00160 template<typename Container, typename Value>
00161 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00162 public std::iterator<std::forward_iterator_tag,Value> {
00163 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00164 template<typename T, class A>
00165 friend class ::tbb::concurrent_queue;
00166 #else
00167 public:
00168 #endif
00170 concurrent_queue_iterator( const concurrent_queue_base& queue ) :
00171 concurrent_queue_iterator_base_v3(queue)
00172 {
00173 }
00174 public:
00175 concurrent_queue_iterator() {}
00176
00179 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00180 concurrent_queue_iterator_base_v3(other)
00181 {}
00182
00184 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00185 assign(other);
00186 return *this;
00187 }
00188
00190 Value& operator*() const {
00191 return *static_cast<Value*>(my_item);
00192 }
00193
00194 Value* operator->() const {return &operator*();}
00195
00197 concurrent_queue_iterator& operator++() {
00198 advance();
00199 return *this;
00200 }
00201
00203 Value* operator++(int) {
00204 Value* result = &operator*();
00205 operator++();
00206 return result;
00207 }
00208 };
00209
00210
00211 template<typename C, typename T, typename U>
00212 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00213 return i.my_item==j.my_item;
00214 }
00215
00216 template<typename C, typename T, typename U>
00217 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00218 return i.my_item!=j.my_item;
00219 }
00220
00221 }
00222
00224
00226
00229 template<typename T, class A>
00230 class concurrent_queue: public internal::concurrent_queue_base_v3 {
00231 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00232
00234 typedef typename A::template rebind<char>::other page_allocator_type;
00235 page_allocator_type my_allocator;
00236
00238 class destroyer: internal::no_copy {
00239 T& my_value;
00240 public:
00241 destroyer( T& value ) : my_value(value) {}
00242 ~destroyer() {my_value.~T();}
00243 };
00244
00245 T& get_ref( page& page, size_t index ) {
00246 __TBB_ASSERT( index<items_per_page, NULL );
00247 return static_cast<T*>(static_cast<void*>(&page+1))[index];
00248 }
00249
00250 virtual void copy_item( page& dst, size_t index, const void* src ) {
00251 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00252 }
00253
00254 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00255 new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00256 }
00257
00258 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00259 T& from = get_ref(src,index);
00260 destroyer d(from);
00261 *static_cast<T*>(dst) = from;
00262 }
00263
00264 virtual page *allocate_page() {
00265 size_t n = sizeof(page) + items_per_page*item_size;
00266 page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
00267 if( !p ) internal_throw_exception();
00268 return p;
00269 }
00270
00271 virtual void deallocate_page( page *p ) {
00272 size_t n = sizeof(page) + items_per_page*item_size;
00273 my_allocator.deallocate( reinterpret_cast<char*>(p), n );
00274 }
00275
00276 public:
00278 typedef T value_type;
00279
00281 typedef A allocator_type;
00282
00284 typedef T& reference;
00285
00287 typedef const T& const_reference;
00288
00290
00292 typedef std::ptrdiff_t size_type;
00293
00295 typedef std::ptrdiff_t difference_type;
00296
00298 explicit concurrent_queue(const allocator_type &a = allocator_type()) :
00299 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00300 {
00301 }
00302
00304 ~concurrent_queue();
00305
00307 void push( const T& source ) {
00308 internal_push( &source );
00309 }
00310
00312
00313 void pop( T& destination ) {
00314 internal_pop( &destination );
00315 }
00316
00318
00320 bool push_if_not_full( const T& source ) {
00321 return internal_push_if_not_full( &source );
00322 }
00323
00325
00327 bool pop_if_present( T& destination ) {
00328 return internal_pop_if_present( &destination );
00329 }
00330
00332
00335 size_type size() const {return internal_size();}
00336
00338 bool empty() const {return size()<=0;}
00339
00341 size_type capacity() const {
00342 return my_capacity;
00343 }
00344
00346
00348 void set_capacity( size_type capacity ) {
00349 internal_set_capacity( capacity, sizeof(T) );
00350 }
00351
00353 allocator_type get_allocator() const { return this->my_allocator; }
00354
00356 void clear() ;
00357
00358 typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
00359 typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
00360
00361
00362
00363
00364 iterator begin() {return iterator(*this);}
00365 iterator end() {return iterator();}
00366 const_iterator begin() const {return const_iterator(*this);}
00367 const_iterator end() const {return const_iterator();}
00368
00370 concurrent_queue( const concurrent_queue& src, const allocator_type &a = allocator_type()) :
00371 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00372 {
00373 assign( src );
00374 }
00375
00377 template<typename InputIterator>
00378 concurrent_queue( InputIterator begin, InputIterator end, const allocator_type &a = allocator_type()) :
00379 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00380 {
00381 for( ; begin != end; ++begin )
00382 internal_push_if_not_full(&*begin);
00383 }
00384 };
00385
00386 template<typename T, class A>
00387 concurrent_queue<T,A>::~concurrent_queue() {
00388 clear();
00389 internal_finish_clear();
00390 }
00391
00392 template<typename T, class A>
00393 void concurrent_queue<T,A>::clear() {
00394 while( !empty() ) {
00395 T value;
00396 internal_pop_if_present(&value);
00397 }
00398 }
00399
00400 }
00401
00402 #endif