00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_pipeline_H
00022 #define __TBB_pipeline_H
00023
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include <cstddef>
00027
00028 namespace tbb {
00029
00030 class pipeline;
00031 class filter;
00032
00034 namespace internal {
00035
00036
00037 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00038
00039 typedef unsigned long Token;
00040 typedef long tokendiff_t;
00041 class stage_task;
00042 class ordered_buffer;
00043
00044 }
00046
00048
00049 class filter: internal::no_copy {
00050 private:
00052 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(internal::intptr(-1));}
00053
00055 static const unsigned char filter_is_serial = 0x1;
00056
00058
00059
00060 static const unsigned char filter_is_out_of_order = 0x1<<4;
00061
00062 static const unsigned char current_version = __TBB_PIPELINE_VERSION(4);
00063 static const unsigned char version_mask = 0x7<<1;
00064 public:
00065 enum mode {
00067 parallel = current_version | filter_is_out_of_order,
00069 serial_in_order = current_version | filter_is_serial,
00071 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00073 serial = serial_in_order
00074 };
00075 protected:
00076 filter( bool is_serial_ ) :
00077 next_filter_in_pipeline(not_in_pipeline()),
00078 input_buffer(NULL),
00079 my_filter_mode(static_cast<unsigned char>(is_serial_ ? serial : parallel)),
00080 prev_filter_in_pipeline(not_in_pipeline()),
00081 my_pipeline(NULL)
00082 {}
00083
00084 filter( mode filter_mode ) :
00085 next_filter_in_pipeline(not_in_pipeline()),
00086 input_buffer(NULL),
00087 my_filter_mode(static_cast<unsigned char>(filter_mode)),
00088 prev_filter_in_pipeline(not_in_pipeline()),
00089 my_pipeline(NULL)
00090 {}
00091
00092 public:
00094 bool is_serial() const {
00095 return bool( my_filter_mode & filter_is_serial );
00096 }
00097
00098
00099 bool is_ordered() const {
00100 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00101 }
00102
00104
00105 virtual void* operator()( void* item ) = 0;
00106
00108
00109 virtual __TBB_EXPORTED_METHOD ~filter();
00110
00111 #if __TBB_EXCEPTIONS
00113
00115 virtual void finalize( void* ) {};
00116 #endif
00117
00118 private:
00120 filter* next_filter_in_pipeline;
00121
00123 internal::ordered_buffer* input_buffer;
00124
00125 friend class internal::stage_task;
00126 friend class pipeline;
00127
00129 const unsigned char my_filter_mode;
00130
00132 filter* prev_filter_in_pipeline;
00133
00135 pipeline* my_pipeline;
00136 };
00137
00139
00140 class pipeline {
00141 public:
00143 __TBB_EXPORTED_METHOD pipeline();
00144
00147 virtual __TBB_EXPORTED_METHOD ~pipeline();
00148
00150 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00151
00153 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00154
00155 #if __TBB_EXCEPTIONS
00157 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00158 #endif
00159
00161 void __TBB_EXPORTED_METHOD clear();
00162
00163 private:
00164 friend class internal::stage_task;
00165 friend class filter;
00166
00168 filter* filter_list;
00169
00171 filter* filter_end;
00172
00174 empty_task* end_counter;
00175
00177 atomic<internal::Token> input_tokens;
00178
00180 internal::Token token_counter;
00181
00183 bool end_of_input;
00184
00186 void remove_filter( filter& filter_ );
00187
00189 void __TBB_EXPORTED_METHOD inject_token( task& self );
00190
00191 #if __TBB_EXCEPTIONS
00193 void clear_filters();
00194 #endif
00195 };
00196
00197 }
00198
00199 #endif