concurrent_queue.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_H
00022 #define __TBB_concurrent_queue_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "cache_aligned_allocator.h"
00026 #include <iterator>
00027 #include <new>
00028 
00029 namespace tbb {
00030 
00031 template<typename T, class A = cache_aligned_allocator<T> > 
00032 class concurrent_queue;
00033 
00035 namespace internal {
00036 
00037 class concurrent_queue_rep;
00038 class concurrent_queue_iterator_rep;
00039 class concurrent_queue_iterator_base_v3;
00040 template<typename Container, typename Value> class concurrent_queue_iterator;
00041 
00043 
00045 class concurrent_queue_base_v3: no_copy {
00047     concurrent_queue_rep* my_rep;
00048 
00049     friend class concurrent_queue_rep;
00050     friend struct micro_queue;
00051     friend class micro_queue_pop_finalizer;
00052     friend class concurrent_queue_iterator_rep;
00053     friend class concurrent_queue_iterator_base_v3;
00054 protected:
00056     struct page {
00057         page* next;
00058         uintptr mask; 
00059     };
00060 
00062     ptrdiff_t my_capacity;
00063    
00065     size_t items_per_page;
00066 
00068     size_t item_size;
00069 
00070 private:
00071     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00072     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00073 protected:
00074     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00075     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00076 
00078     void __TBB_EXPORTED_METHOD internal_push( const void* src );
00079 
00081     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00082 
00084     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00085 
00087 
00088     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00089 
00091     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00092 
00094     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00095 
00097     virtual page *allocate_page() = 0;
00098 
00100     virtual void deallocate_page( page *p ) = 0;
00101 
00103     /* note that the name may be misleading, but it remains so due to a historical accident. */
00104     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00105 
00107     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00108 
00110     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00111 
00112 private:
00113     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00114 };
00115 
00116 typedef concurrent_queue_base_v3 concurrent_queue_base ;
00117 
00119 
00120 class concurrent_queue_iterator_base_v3 {
00122 
00123     concurrent_queue_iterator_rep* my_rep;
00124 
00125     template<typename C, typename T, typename U>
00126     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00127 
00128     template<typename C, typename T, typename U>
00129     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00130 protected:
00132     mutable void* my_item;
00133 
00135     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00136 
00138     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00139         assign(i);
00140     }
00141 
00143     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue );
00144 
00146     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00147 
00149     void __TBB_EXPORTED_METHOD advance();
00150 
00152     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00153 };
00154 
00155 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00156 
00158 
00160 template<typename Container, typename Value>
00161 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00162         public std::iterator<std::forward_iterator_tag,Value> {
00163 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00164     template<typename T, class A>
00165     friend class ::tbb::concurrent_queue;
00166 #else
00167 public: // workaround for MSVC
00168 #endif 
00170     concurrent_queue_iterator( const concurrent_queue_base& queue ) :
00171         concurrent_queue_iterator_base_v3(queue)
00172     {
00173     }
00174 public:
00175     concurrent_queue_iterator() {}
00176 
00179     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00180         concurrent_queue_iterator_base_v3(other)
00181     {}
00182 
00184     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00185         assign(other);
00186         return *this;
00187     }
00188 
00190     Value& operator*() const {
00191         return *static_cast<Value*>(my_item);
00192     }
00193 
00194     Value* operator->() const {return &operator*();}
00195 
00197     concurrent_queue_iterator& operator++() {
00198         advance();
00199         return *this;
00200     }
00201 
00203     Value* operator++(int) {
00204         Value* result = &operator*();
00205         operator++();
00206         return result;
00207     }
00208 }; // concurrent_queue_iterator
00209 
00210 
00211 template<typename C, typename T, typename U>
00212 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00213     return i.my_item==j.my_item;
00214 }
00215 
00216 template<typename C, typename T, typename U>
00217 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00218     return i.my_item!=j.my_item;
00219 }
00220 
00221 } // namespace internal;
00222 
00224 
00226 
00229 template<typename T, class A>
00230 class concurrent_queue: public internal::concurrent_queue_base_v3 {
00231     template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
00232 
00234     typedef typename A::template rebind<char>::other page_allocator_type;
00235     page_allocator_type my_allocator;
00236 
00238     class destroyer: internal::no_copy {
00239         T& my_value;
00240     public:
00241         destroyer( T& value ) : my_value(value) {}
00242         ~destroyer() {my_value.~T();}          
00243     };
00244 
00245     T& get_ref( page& page, size_t index ) {
00246         __TBB_ASSERT( index<items_per_page, NULL );
00247         return static_cast<T*>(static_cast<void*>(&page+1))[index];
00248     }
00249 
00250     /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
00251         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00252     }
00253 
00254     /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00255         new( &get_ref(dst,dindex) ) T( static_cast<const T*>(static_cast<const void*>(&src+1))[sindex] );
00256     }
00257 
00258     /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00259         T& from = get_ref(src,index);
00260         destroyer d(from);
00261         *static_cast<T*>(dst) = from;
00262     }
00263 
00264     /*overide*/ virtual page *allocate_page() {
00265         size_t n = sizeof(page) + items_per_page*item_size;
00266         page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
00267         if( !p ) internal_throw_exception(); 
00268         return p;
00269     }
00270 
00271     /*override*/ virtual void deallocate_page( page *p ) {
00272         size_t n = sizeof(page) + items_per_page*item_size;
00273         my_allocator.deallocate( reinterpret_cast<char*>(p), n );
00274     }
00275 
00276 public:
00278     typedef T value_type;
00279 
00281     typedef A allocator_type;
00282 
00284     typedef T& reference;
00285 
00287     typedef const T& const_reference;
00288 
00290 
00292     typedef std::ptrdiff_t size_type;
00293 
00295     typedef std::ptrdiff_t difference_type;
00296 
00298     explicit concurrent_queue(const allocator_type  &a = allocator_type()) : 
00299         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00300     {
00301     }
00302 
00304     ~concurrent_queue();
00305 
00307     void push( const T& source ) {
00308         internal_push( &source );
00309     }
00310 
00312 
00313     void pop( T& destination ) {
00314         internal_pop( &destination );
00315     }
00316 
00318 
00320     bool push_if_not_full( const T& source ) {
00321         return internal_push_if_not_full( &source );
00322     }
00323 
00325 
00327     bool pop_if_present( T& destination ) {
00328         return internal_pop_if_present( &destination );
00329     }
00330 
00332 
00335     size_type size() const {return internal_size();}
00336 
00338     bool empty() const {return size()<=0;}
00339 
00341     size_type capacity() const {
00342         return my_capacity;
00343     }
00344 
00346 
00348     void set_capacity( size_type capacity ) {
00349         internal_set_capacity( capacity, sizeof(T) );
00350     }
00351 
00353     allocator_type get_allocator() const { return this->my_allocator; }
00354 
00356     void clear() ;
00357 
00358     typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
00359     typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
00360 
00361     //------------------------------------------------------------------------
00362     // The iterators are intended only for debugging.  They are slow and not thread safe.
00363     //------------------------------------------------------------------------
00364     iterator begin() {return iterator(*this);}
00365     iterator end() {return iterator();}
00366     const_iterator begin() const {return const_iterator(*this);}
00367     const_iterator end() const {return const_iterator();}
00368     
00370     concurrent_queue( const concurrent_queue& src, const allocator_type &a = allocator_type()) : 
00371         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00372     {
00373         assign( src );
00374     }
00375 
00377     template<typename InputIterator>
00378     concurrent_queue( InputIterator begin, InputIterator end, const allocator_type &a = allocator_type()) :
00379         concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
00380     {
00381         for( ; begin != end; ++begin )
00382             internal_push_if_not_full(&*begin);
00383     }
00384 }; 
00385 
00386 template<typename T, class A>
00387 concurrent_queue<T,A>::~concurrent_queue() {
00388     clear();
00389     internal_finish_clear();
00390 }
00391 
00392 template<typename T, class A>
00393 void concurrent_queue<T,A>::clear() {
00394     while( !empty() ) {
00395         T value;
00396         internal_pop_if_present(&value);
00397     }
00398 }
00399 
00400 } // namespace tbb
00401 
00402 #endif /* __TBB_concurrent_queue_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.