parallel_reduce.h

00001 /*
00002     Copyright 2005-2009 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_parallel_reduce_H
00022 #define __TBB_parallel_reduce_H
00023 
00024 #include "task.h"
00025 #include "aligned_space.h"
00026 #include "partitioner.h"
00027 #include <new>
00028 
00029 namespace tbb {
00030 
00032 namespace internal {
00033 
00035     void __TBB_EXPORTED_FUNC itt_store_pointer_with_release_v3( void* dst, void* src );
00036 
00038     void* __TBB_EXPORTED_FUNC itt_load_pointer_with_acquire_v3( const void* src );
00039 
00040     template<typename T> inline void parallel_reduce_store_body( T*& dst, T* src ) {
00041 #if TBB_USE_THREADING_TOOLS
00042         itt_store_pointer_with_release_v3(&dst,src);
00043 #else
00044         __TBB_store_with_release(dst,src);
00045 #endif /* TBB_USE_THREADING_TOOLS */
00046     }
00047 
00048     template<typename T> inline T* parallel_reduce_load_body( T*& src ) {
00049 #if TBB_USE_THREADING_TOOLS
00050         return static_cast<T*>(itt_load_pointer_with_acquire_v3(&src));
00051 #else
00052         return __TBB_load_with_acquire(src);
00053 #endif /* TBB_USE_THREADING_TOOLS */
00054     }
00055 
00057 
00058     template<typename Body>
00059     class finish_reduce: public task {
00060         Body* const my_body;
00061         bool has_right_zombie;
00062         aligned_space<Body,1> zombie_space;
00063         finish_reduce( Body* body ) : 
00064             my_body(body),
00065             has_right_zombie(false)
00066         {
00067         }
00068         task* execute() {
00069             if( has_right_zombie ) {
00070                 // Right child was stolen.
00071                 Body* s = zombie_space.begin();
00072                 my_body->join( *s );
00073                 s->~Body();
00074             }
00075             return NULL;
00076         }       
00077         template<typename Range,typename Body_, typename Partitioner>
00078         friend class start_reduce;
00079     };
00080 
00082 
00083     template<typename Range, typename Body, typename Partitioner>
00084     class start_reduce: public task {
00085         typedef finish_reduce<Body> finish_type;
00086         Body* my_body;
00087         Range my_range;
00088         typename Partitioner::partition_type my_partition;
00089         /*override*/ task* execute();
00090         template<typename Body_>
00091         friend class finish_reduce;
00092     
00094         start_reduce( const Range& range, Body* body, const Partitioner &partitioner ) :
00095             my_body(body),
00096             my_range(range),
00097             my_partition(partitioner)
00098         {
00099         }
00101 
00102         start_reduce( start_reduce& parent, split ) :
00103             my_body(parent.my_body),
00104             my_range(parent.my_range,split()),
00105             my_partition(parent.my_partition,split())
00106         {
00107         }
00109 
00110         /*override*/ void note_affinity( affinity_id /*id*/ ) {
00111             finish_type* p = static_cast<finish_type*>(parent() );
00112             Body* body = new( p->zombie_space.begin() ) Body(*my_body,split());
00113             p->has_right_zombie = true;
00114             my_body = body;
00115         }
00116     public:
00117 #if !__TBB_EXCEPTIONS
00118         static void run( const Range& range, Body& body, const Partitioner &partitioner ) {
00119             if( !range.empty() ) 
00120                 task::spawn_root_and_wait( *new(task::allocate_root()) start_reduce(range,&body,partitioner) );
00121         }
00122 #else /* __TBB_EXCEPTIONS */
00123         static void run( const Range& range, Body& body, const Partitioner &partitioner ) {
00124             // Bound context prevents exceptions from body to affect nesting or sibling algorithms,
00125             // and allows users to handle exceptions safely by wrapping parallel_for in the try-block.
00126             if( !range.empty() )  {
00127                 task_group_context context;
00128                 task::spawn_root_and_wait( *new(task::allocate_root(context)) start_reduce(range,&body,partitioner) );
00129             }
00130         }
00131         static void run( const Range& range, Body& body, const Partitioner &partitioner, task_group_context& context ) {
00132             if( !range.empty() ) 
00133                 task::spawn_root_and_wait( *new(task::allocate_root(context)) start_reduce(range,&body,partitioner) );
00134         }
00135 #endif /* __TBB_EXCEPTIONS */
00136     };
00137 
00138     template<typename Range, typename Body, typename Partitioner>
00139     task* start_reduce<Range,Body,Partitioner>::execute() {
00140         if( !my_range.is_divisible() || my_partition.should_execute_range(*this) ) {
00141             (*my_body)( my_range );
00142             return NULL;
00143         } else {
00144             finish_type& c = *new( allocate_continuation()) finish_type(my_body);
00145             recycle_as_child_of(c);
00146             c.set_ref_count(2);    
00147             start_reduce& b = *new( c.allocate_child() ) start_reduce(*this,split());
00148             spawn(b);
00149             return this;
00150         }
00151     }
00152  
00154 
00155     typedef char reduction_context;
00156 
00158 
00159     template<typename Body>
00160     class finish_reduce_with_affinity: public task {
00162         Body* my_body;
00163         bool has_right_zombie;
00164         const reduction_context my_context;
00165         aligned_space<Body,1> zombie_space;
00166         finish_reduce_with_affinity( char context ) : 
00167             my_body(NULL),
00168             has_right_zombie(false),
00169             my_context(context)
00170         {
00171         }
00172         task* execute() {
00173             if( has_right_zombie ) {
00174                 // Right child was stolen.
00175                 Body* s = zombie_space.begin();
00176                 my_body->join( *s );
00177                 s->~Body();
00178             }
00179             if( my_context==1 ) 
00180                 parallel_reduce_store_body( static_cast<finish_reduce_with_affinity*>(parent())->my_body, my_body );
00181             return NULL;
00182         }       
00183         template<typename Range,typename Body_>
00184         friend class start_reduce_with_affinity;
00185     };
00186 
00188 
00189     template<typename Range, typename Body>
00190     class start_reduce_with_affinity: public task {
00191         typedef finish_reduce_with_affinity<Body> finish_type;
00192         Body* my_body;
00193         Range my_range;
00194         typename affinity_partitioner::partition_type my_partition;
00195         reduction_context my_context;
00196         /*override*/ task* execute();
00197         template<typename Body_>
00198         friend class finish_reduce_with_affinity;
00199     
00201         start_reduce_with_affinity( const Range& range, Body* body, affinity_partitioner& partitioner ) :
00202             my_body(body),
00203             my_range(range),
00204             my_partition(partitioner),
00205             my_context(0)
00206         {
00207         }
00209 
00210         start_reduce_with_affinity( start_reduce_with_affinity& parent, split ) :
00211             my_body(parent.my_body),
00212             my_range(parent.my_range,split()),
00213             my_partition(parent.my_partition,split()),
00214             my_context(2)
00215         {
00216             my_partition.set_affinity(*this);
00217             parent.my_context = 1;
00218         }
00220         /*override*/ void note_affinity( affinity_id id ) {
00221             my_partition.note_affinity( id );
00222         }
00223 
00224 public:
00225         static void run( const Range& range, Body& body, affinity_partitioner& partitioner ) {
00226             if( !range.empty() ) {
00227 #if !__TBB_EXCEPTIONS || TBB_JOIN_OUTER_TASK_GROUP
00228                 task::spawn_root_and_wait( *new(task::allocate_root()) start_reduce_with_affinity(range,&body,partitioner) );
00229 #else
00230                 // Bound context prevents exceptions from body to affect nesting or sibling algorithms,
00231                 // and allows users to handle exceptions safely by wrapping parallel_for in the try-block.
00232                 task_group_context context;
00233                 task::spawn_root_and_wait( *new(task::allocate_root(context)) start_reduce_with_affinity(range,&body,partitioner) );
00234 #endif /* __TBB_EXCEPTIONS && !TBB_JOIN_OUTER_TASK_GROUP */
00235             }
00236         }
00237 #if __TBB_EXCEPTIONS
00238         static void run( const Range& range, Body& body, affinity_partitioner& partitioner, task_group_context& context ) {
00239             if( !range.empty() ) 
00240                 task::spawn_root_and_wait( *new(task::allocate_root(context)) start_reduce_with_affinity(range,&body,partitioner) );
00241         }
00242 #endif /* __TBB_EXCEPTIONS */
00243     };
00244 
00245     template<typename Range, typename Body>
00246     task* start_reduce_with_affinity<Range,Body>::execute() {
00247         if( my_context==2 ) {
00248             finish_type* p = static_cast<finish_type*>(parent() );
00249             if( !parallel_reduce_load_body(p->my_body) ) {
00250                 my_body = new( p->zombie_space.begin() ) Body(*my_body,split());
00251                 p->has_right_zombie = true;
00252             } 
00253         }
00254         if( !my_range.is_divisible() || my_partition.should_execute_range(*this) ) {
00255             (*my_body)( my_range );
00256             if( my_context==1 ) 
00257                 parallel_reduce_store_body(static_cast<finish_type*>(parent())->my_body, my_body );
00258             return my_partition.continue_after_execute_range(*this);
00259         } else {
00260             finish_type& c = *new( allocate_continuation()) finish_type(my_context);
00261             recycle_as_child_of(c);
00262             c.set_ref_count(2);    
00263             bool delay = my_partition.decide_whether_to_delay();
00264             start_reduce_with_affinity& b = *new( c.allocate_child() ) start_reduce_with_affinity(*this,split());
00265             my_partition.spawn_or_delay(delay,*this,b);
00266             return this;
00267         }
00268     } 
00269 
00271 
00275     template<typename Range, typename Value, typename RealBody, typename Reduction>
00276     class lambda_reduce_body {
00277 
00278 //FIXME: decide if my_real_body, my_reduction, and identity_element should be copied or referenced
00279 //       (might require some performance measurements)
00280 
00281         const Value&     identity_element;
00282         const RealBody&  my_real_body;
00283         const Reduction& my_reduction;
00284         Value            my_value;
00285     public:
00286         lambda_reduce_body( const Value& identity, const RealBody& body, const Reduction& reduction )
00287             : identity_element(identity)
00288             , my_real_body(body)
00289             , my_reduction(reduction)
00290             , my_value(identity)
00291         { }
00292         lambda_reduce_body( const lambda_reduce_body& other )
00293             : identity_element(other.identity_element)
00294             , my_real_body(other.my_real_body)
00295             , my_reduction(other.my_reduction)
00296             , my_value(other.my_value)
00297         { }
00298         lambda_reduce_body( lambda_reduce_body& other, tbb::split )
00299             : identity_element(other.identity_element)
00300             , my_real_body(other.my_real_body)
00301             , my_reduction(other.my_reduction)
00302             , my_value(other.identity_element)
00303         { }
00304         void operator()(Range& range) {
00305             my_value = my_real_body(range, const_cast<const Value&>(my_value));
00306         }
00307         void join( lambda_reduce_body& rhs ) {
00308             my_value = my_reduction(const_cast<const Value&>(my_value), const_cast<const Value&>(rhs.my_value));
00309         }
00310         Value result() const {
00311             return my_value;
00312         }
00313     };
00314 
00315 } // namespace internal
00317 
00318 // Requirements on Range concept are documented in blocked_range.h
00319 
00338 
00340 
00341 template<typename Range, typename Body>
00342 void parallel_reduce( const Range& range, Body& body, const simple_partitioner& partitioner = simple_partitioner() ) {
00343     internal::start_reduce<Range,Body,simple_partitioner>::run( range, body, partitioner );
00344 }
00345 
00347 
00348 template<typename Range, typename Body>
00349 void parallel_reduce( const Range& range, Body& body, const auto_partitioner& partitioner ) {
00350     internal::start_reduce<Range,Body,auto_partitioner>::run( range, body, partitioner );
00351 }
00352 
00354 
00355 template<typename Range, typename Body>
00356 void parallel_reduce( const Range& range, Body& body, affinity_partitioner& partitioner ) {
00357     internal::start_reduce_with_affinity<Range,Body>::run( range, body, partitioner );
00358 }
00359 
00360 #if __TBB_EXCEPTIONS
00362 
00363 template<typename Range, typename Body>
00364 void parallel_reduce( const Range& range, Body& body, const simple_partitioner& partitioner, task_group_context& context ) {
00365     internal::start_reduce<Range,Body,simple_partitioner>::run( range, body, partitioner, context );
00366 }
00367 
00369 
00370 template<typename Range, typename Body>
00371 void parallel_reduce( const Range& range, Body& body, const auto_partitioner& partitioner, task_group_context& context ) {
00372     internal::start_reduce<Range,Body,auto_partitioner>::run( range, body, partitioner, context );
00373 }
00374 
00376 
00377 template<typename Range, typename Body>
00378 void parallel_reduce( const Range& range, Body& body, affinity_partitioner& partitioner, task_group_context& context ) {
00379     internal::start_reduce_with_affinity<Range,Body>::run( range, body, partitioner, context );
00380 }
00381 #endif /* __TBB_EXCEPTIONS */
00382 
00386 
00387 
00388 template<typename Range, typename Value, typename RealBody, typename Reduction>
00389 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00390                        const simple_partitioner& partitioner = simple_partitioner() ) {
00391     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00392     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,simple_partitioner>
00393                           ::run(range, body, partitioner );
00394     return body.result();
00395 }
00396 
00398 
00399 template<typename Range, typename Value, typename RealBody, typename Reduction>
00400 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00401                        const auto_partitioner& partitioner ) {
00402     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00403     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,auto_partitioner>
00404                           ::run( range, body, partitioner );
00405     return body.result();
00406 }
00407 
00409 
00410 template<typename Range, typename Value, typename RealBody, typename Reduction>
00411 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00412                        affinity_partitioner& partitioner ) {
00413     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00414     internal::start_reduce_with_affinity<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction> >
00415                                         ::run( range, body, partitioner );
00416     return body.result();
00417 }
00418 
00419 #if __TBB_EXCEPTIONS
00421 
00422 template<typename Range, typename Value, typename RealBody, typename Reduction>
00423 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00424                        const simple_partitioner& partitioner, task_group_context& context ) {
00425     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00426     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,simple_partitioner>
00427                           ::run( range, body, partitioner, context );
00428     return body.result();
00429 }
00430 
00432 
00433 template<typename Range, typename Value, typename RealBody, typename Reduction>
00434 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00435                        const auto_partitioner& partitioner, task_group_context& context ) {
00436     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00437     internal::start_reduce<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction>,auto_partitioner>
00438                           ::run( range, body, partitioner, context );
00439     return body.result();
00440 }
00441 
00443 
00444 template<typename Range, typename Value, typename RealBody, typename Reduction>
00445 Value parallel_reduce( const Range& range, const Value& identity, const RealBody& real_body, const Reduction& reduction,
00446                        affinity_partitioner& partitioner, task_group_context& context ) {
00447     internal::lambda_reduce_body<Range,Value,RealBody,Reduction> body(identity, real_body, reduction);
00448     internal::start_reduce_with_affinity<Range,internal::lambda_reduce_body<Range,Value,RealBody,Reduction> >
00449                                         ::run( range, body, partitioner, context );
00450     return body.result();
00451 }
00452 #endif /* __TBB_EXCEPTIONS */
00453 
00454 
00455 } // namespace tbb
00456 
00457 #endif /* __TBB_parallel_reduce_H */
00458 

Copyright © 2005-2009 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.