pipeline.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_pipeline_H 
00022 #define __TBB_pipeline_H 
00023 
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include <cstddef>
00027 
00028 namespace tbb {
00029 
00030 class pipeline;
00031 class filter;
00032 
00034 namespace internal {
00035 
00036 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
00037 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00038 
00039 typedef unsigned long Token;
00040 typedef long tokendiff_t;
00041 class stage_task;
00042 class ordered_buffer;
00043 
00044 } // namespace internal
00046 
00048 
00049 class filter: internal::no_copy {
00050 private:
00052     static filter* not_in_pipeline() {return reinterpret_cast<filter*>(internal::intptr(-1));}
00053     
00055     static const unsigned char filter_is_serial = 0x1; 
00056 
00058     // The bit was not set for parallel filters in TBB 2.1 and earlier,
00059     // but is_ordered() function always treats parallel filters as out of order
00060     static const unsigned char filter_is_out_of_order = 0x1<<4;  
00061 
00062     static const unsigned char current_version = __TBB_PIPELINE_VERSION(4);
00063     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
00064 public:
00065     enum mode {
00067         parallel = current_version | filter_is_out_of_order, 
00069         serial_in_order = current_version | filter_is_serial,
00071         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00073         serial = serial_in_order
00074     };
00075 protected:
00076     filter( bool is_serial_ ) : 
00077         next_filter_in_pipeline(not_in_pipeline()),
00078         input_buffer(NULL),
00079         my_filter_mode(static_cast<unsigned char>(is_serial_ ? serial : parallel)),
00080         prev_filter_in_pipeline(not_in_pipeline()),
00081         my_pipeline(NULL)
00082     {}
00083     
00084     filter( mode filter_mode ) :
00085         next_filter_in_pipeline(not_in_pipeline()),
00086         input_buffer(NULL),
00087         my_filter_mode(static_cast<unsigned char>(filter_mode)),
00088         prev_filter_in_pipeline(not_in_pipeline()),
00089         my_pipeline(NULL)
00090     {}
00091 
00092 public:
00094     bool is_serial() const {
00095         return bool( my_filter_mode & filter_is_serial );
00096     }  
00097     
00098     // ! True if filter must receive stream in order.
00099     bool is_ordered() const {
00100         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00101     }
00102 
00104 
00105     virtual void* operator()( void* item ) = 0;
00106 
00108 
00109     virtual __TBB_EXPORTED_METHOD ~filter();
00110 
00111 #if __TBB_EXCEPTIONS
00113 
00115     virtual void finalize( void* /*item*/ ) {};
00116 #endif
00117 
00118 private:
00120     filter* next_filter_in_pipeline;
00121 
00123     internal::ordered_buffer* input_buffer;
00124 
00125     friend class internal::stage_task;
00126     friend class pipeline;
00127 
00129     const unsigned char my_filter_mode;
00130 
00132     filter* prev_filter_in_pipeline;
00133 
00135     pipeline* my_pipeline;
00136 };
00137 
00139 
00140 class pipeline {
00141 public:
00143     __TBB_EXPORTED_METHOD pipeline();
00144 
00147     virtual __TBB_EXPORTED_METHOD ~pipeline();
00148 
00150     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00151 
00153     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00154 
00155 #if __TBB_EXCEPTIONS
00157     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00158 #endif
00159 
00161     void __TBB_EXPORTED_METHOD clear();
00162 
00163 private:
00164     friend class internal::stage_task;
00165     friend class filter;
00166 
00168     filter* filter_list;
00169 
00171     filter* filter_end;
00172 
00174     empty_task* end_counter;
00175 
00177     atomic<internal::Token> input_tokens;
00178 
00180     internal::Token token_counter;
00181 
00183     bool end_of_input;
00184 
00186     void remove_filter( filter& filter_ );
00187 
00189     void __TBB_EXPORTED_METHOD inject_token( task& self );
00190 
00191 #if __TBB_EXCEPTIONS
00193     void clear_filters();
00194 #endif
00195 };
00196 
00197 } // tbb
00198 
00199 #endif /* __TBB_pipeline_H */

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.