• 8.5 在实践中设计并发代码
    • 8.5.1 并行实现:std::for_each
    • 8.5.2 并行实现:std::find
    • 8.5.3 并行实现:std::partial_sum

    8.5 在实践中设计并发代码

    为一个特殊的任务设计并发代码时,需要根据任务本身来考虑之前提到的问题。为了展示以上的注意事项如何应用,我们将看一下在C++标准库中三个标准函数的并行实现。当遇到问题时,这里的例子可以作为很好的参照。有较大的并发任务进行辅助下,我们也将实现一些函数。

    我主要演示这些实现使用的技术,不过可能这些技术并不是最先进的;更多优秀的实现可以更好的利用硬件并发,不过这些实现可能需要到与并行算法相关的学术文献,或者是多线程的专家库中(比如:Inter的TBB[4])才能看到。

    并行版的std::for_each可以看作为能最直观体现并行概念,就让我们从并行版的std::for_each开始吧!

    8.5.1 并行实现:std::for_each

    std::for_each的原理很简单:其对某个范围中的元素,依次调用用户提供的函数。并行和串行调用的最大区别就是函数的调用顺序。std::for_each是对范围中的第一个元素调用用户函数,接着是第二个,以此类推,在并行实现中对于每个元素的处理顺序就不能保证了,并且它们可能(我们希望如此)被并发处理。

    为了实现这个函数的并行版本,需要对每个线程上处理的元素进行划分。事先知道元素数量,所以可以处理前对数据进行划分(详见8.1.1节)。假设只有并行任务运行,就可以使用std::thread::hardware_concurrency()来决定线程的数量。同样,这些元素都能被独立的处理,所以可以使用连续的数据块来避免伪共享(详见8.2.3节)。

    这里的算法有点类似于并行版的std::accumulate(详见8.4.1节),不过比起计算每一个元素的加和,这里对每个元素仅仅使用了一个指定功能的函数。因为不需要返回结果,但想要将异常传递给调用者,就需要使用std::packaged_taskstd::future机制对线程中的异常进行转移。这里展示一个样本实现。

    清单8.7 并行版std::for_each

    1. template<typename Iterator,typename Func>
    2. void parallel_for_each(Iterator first,Iterator last,Func f)
    3. {
    4. unsigned long const length=std::distance(first,last);
    5. if(!length)
    6. return;
    7. unsigned long const min_per_thread=25;
    8. unsigned long const max_threads=
    9. (length+min_per_thread-1)/min_per_thread;
    10. unsigned long const hardware_threads=
    11. std::thread::hardware_concurrency();
    12. unsigned long const num_threads=
    13. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
    14. unsigned long const block_size=length/num_threads;
    15. std::vector<std::future<void> > futures(num_threads-1); // 1
    16. std::vector<std::thread> threads(num_threads-1);
    17. join_threads joiner(threads);
    18. Iterator block_start=first;
    19. for(unsigned long i=0;i<(num_threads-1);++i)
    20. {
    21. Iterator block_end=block_start;
    22. std::advance(block_end,block_size);
    23. std::packaged_task<void(void)> task( // 2
    24. [=]()
    25. {
    26. std::for_each(block_start,block_end,f);
    27. });
    28. futures[i]=task.get_future();
    29. threads[i]=std::thread(std::move(task)); // 3
    30. block_start=block_end;
    31. }
    32. std::for_each(block_start,last,f);
    33. for(unsigned long i=0;i<(num_threads-1);++i)
    34. {
    35. futures[i].get(); // 4
    36. }
    37. }

    代码结构与清单8.4的差不多。最重要的不同在于期望值向量对std::future<void>类型①变量进行存储,因为工作线程不会返回值,并且简单的Lambda函数会对block_start到block_end上的任务②执行f函数,为了避免传入线程的构造函数③。工作线程不需要返回一个值时,调用futures[i].get()④只是提供检索工作线程异常的方法;如果不想把异常传递出去,就可以省略这一步。

    实现并行std::accumulate的时候,使用std::async会简化代码;同样,parallel_for_each也可以使用std::async。实现如下所示。

    清单8.8 使用std::async实现std::for_each

    1. template<typename Iterator,typename Func>
    2. void parallel_for_each(Iterator first,Iterator last,Func f)
    3. {
    4. unsigned long const length=std::distance(first,last);
    5. if(!length)
    6. return;
    7. unsigned long const min_per_thread=25;
    8. if(length<(2*min_per_thread))
    9. {
    10. std::for_each(first,last,f); // 1
    11. }
    12. else
    13. {
    14. Iterator const mid_point=first+length/2;
    15. std::future<void> first_half= // 2
    16. std::async(&parallel_for_each<Iterator,Func>,
    17. first,mid_point,f);
    18. parallel_for_each(mid_point,last,f); // 3
    19. first_half.get(); // 4
    20. }
    21. }

    和基于std::async的parallel_accumulate(清单8.5)一样,在运行时对数据进行迭代划分,而非在执行前划分好,因为不知道需要使用多少个线程。像之前一样,将每一级的数据分成两部分,异步执行另外一部分②,剩下的部分就不能再进行划分了,所以直接运行这一部分③;这样就可以直接对std::for_each①进行使用了。再次使用std::asyncstd::future的get()成员函数④来提供对异常的传播。

    回到算法,函数需要对每一个元素执行同样的操作(这样的操作有很多种,初学者可能会想到std::countstd::replace),一个稍微复杂一些的例子就是使用std::find

    8.5.2 并行实现:std::find

    接下来是std::find算法,因为不需要对数据元素做任何处理的算法。比如,当第一个元素就满足查找标准,就没有必要对其他元素进行搜索了。将会看到,算法属性对于性能具有很大的影响,并且对并行实现的设计有着直接的影响。这个算法是一个很特别的例子,数据访问模式都会对代码的设计产生影响(详见8.3.2节)。该类中的另一些算法包括std::equalstd::any_of

    当你和妻子或者搭档,在一个纪念盒中找寻一张老照片,当找到这张照片时,就不会再看另外的照片了。不过,你需要让其他人知道你已经找到照片了(比如,大喊一声“找到了!”),这样其他人就会停止搜索了。很多算法的特性就是要对每一个元素进行处理,所以它们没有办法像std::find一样,一旦找到合适数据就停止执行。因此,你需要设计代码对其进行使用——当得到想要的答案就中断其他任务的执行,所以不能等待线程处理对剩下的元素进行处理。

    如果不中断其他线程,那么串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候,停止搜索并返回。如果系统能支持四个并发线程,那么每个线程就可以对总数据量的1/4进行检查,并且在我们的实现只需要单核完成的1/4的时间,就能完成对所有元素的查找。如果匹配的元素在第一个1/4块中,串行算法将会返回第一个,因为算法不需要对剩下的元素进行处理了。

    中断其他线程的一个办法就是使用原子变量作为一个标识,处理过每一个元素后就对这个标识进行检查。如果标识被设置,就有线程找到了匹配元素,所以算法就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍每个线程的运行。

    如何返回值和传播异常呢?现在有两个选择。可以使用一个期望值数组,使用std::packaged_task来转移值和异常,主线程上对返回值和异常进行处理;或者使用std::promise对工作线程上的最终结果直接进行设置。这完全依赖于想怎么样处理工作线程上的异常。如果想停止第一个异常(即使还没有对所有元素进行处理),就可以使用std::promise对异常和最终值进行设置。另外,如果想让其他工作线程继续查找,可以使用std::packaged_task来存储所有的异常,当线程没有找到匹配元素时,异常将再次抛出。

    这种情况下,我会选择std::promise,因为其行为和std::find更为接近。这里需要注意一下搜索的元素是不是在提供的搜索范围内。因此,在所有线程结束前,获取期望值上的结果。如果被期望值阻塞住,所要查找的值不在范围内,就会持续的等待下去。实现代码如下。

    清单8.9 并行find算法实现

    1. template<typename Iterator,typename MatchType>
    2. Iterator parallel_find(Iterator first,Iterator last,MatchType match)
    3. {
    4. struct find_element // 1
    5. {
    6. void operator()(Iterator begin,Iterator end,
    7. MatchType match,
    8. std::promise<Iterator>* result,
    9. std::atomic<bool>* done_flag)
    10. {
    11. try
    12. {
    13. for(;(begin!=end) && !done_flag->load();++begin) // 2
    14. {
    15. if(*begin==match)
    16. {
    17. result->set_value(begin); // 3
    18. done_flag->store(true); // 4
    19. return;
    20. }
    21. }
    22. }
    23. catch(...) // 5
    24. {
    25. try
    26. {
    27. result->set_exception(std::current_exception()); // 6
    28. done_flag->store(true);
    29. }
    30. catch(...) // 7
    31. {}
    32. }
    33. }
    34. };
    35. unsigned long const length=std::distance(first,last);
    36. if(!length)
    37. return last;
    38. unsigned long const min_per_thread=25;
    39. unsigned long const max_threads=
    40. (length+min_per_thread-1)/min_per_thread;
    41. unsigned long const hardware_threads=
    42. std::thread::hardware_concurrency();
    43. unsigned long const num_threads=
    44. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
    45. unsigned long const block_size=length/num_threads;
    46. std::promise<Iterator> result; // 8
    47. std::atomic<bool> done_flag(false); // 9
    48. std::vector<std::thread> threads(num_threads-1);
    49. { // 10
    50. join_threads joiner(threads);
    51. Iterator block_start=first;
    52. for(unsigned long i=0;i<(num_threads-1);++i)
    53. {
    54. Iterator block_end=block_start;
    55. std::advance(block_end,block_size);
    56. threads[i]=std::thread(find_element(), // 11
    57. block_start,block_end,match,
    58. &result,&done_flag);
    59. block_start=block_end;
    60. }
    61. find_element()(block_start,last,match,&result,&done_flag); // 12
    62. }
    63. if(!done_flag.load()) //13
    64. {
    65. return last;
    66. }
    67. return result.get_future().get(); // 14
    68. }

    清单8.9中的函数主体与之前的例子相似。这次,由find_element类①的函数调用操作实现,来完成查找工作的。循环通过在给定数据块中的元素,检查每一步上的标识②。如果匹配的元素被找到,就将最终的结果设置到承诺值③当中,并且在返回前对done_flag④进行设置。

    如果有一个异常被抛出,就会被通用处理代码⑤捕获,并且在承诺值⑥尝中试存储前,对done_flag进行设置。如果对应promise已经被设置,设置在承诺值上的值可能会抛出一个异常,所以这里⑦发生的任何异常,都可以捕获并丢弃。

    当线程调用find_element查询一个值,或者抛出一个异常时,如果其他线程看到done_flag被设置,那么其他线程将会终止。如果多线程同时找到匹配值或抛出异常,它们将会对承诺值产生竞争。不过,这是良性的条件竞争;因为,成功的竞争者会作为“第一个”返回线程,因此这个结果可以接受。

    回到parallel_find函数本身,其拥有用来停止搜索的承诺值⑧和标识⑨;随着对范围内元素的查找⑪,承诺值和标识会传递到新线程中。主线程也使用find_element来对剩下的元素进行查找⑫。像之前提到的,需要在全部线程结束前,对结果进行检查,因为结果可能是任意位置上的匹配元素。这里将“启动-汇入”代码放在一个块中⑩,所有线程都会在找到匹配元素时⑬进行汇入。如果找到匹配元素,就可以调用std::future<Iterator>(来自承诺值⑭)的成员函数get()来获取返回值或异常。

    不过,假设使用硬件上所有可用的的并发线程,或使用其他机制对线程上的任务进行提前划分。就像之前一样,可以使用std::async,以及递归数据划分的方式来简化实现(同时使用C++标准库中提供的自动缩放工具)。使用std::async的parallel_find实现如下所示。

    清单8.10 使用std::async实现的并行find算法

    1. template<typename Iterator,typename MatchType> // 1
    2. Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
    3. std::atomic<bool>& done)
    4. {
    5. try
    6. {
    7. unsigned long const length=std::distance(first,last);
    8. unsigned long const min_per_thread=25; // 2
    9. if(length<(2*min_per_thread)) // 3
    10. {
    11. for(;(first!=last) && !done.load();++first) // 4
    12. {
    13. if(*first==match)
    14. {
    15. done=true; // 5
    16. return first;
    17. }
    18. }
    19. return last; // 6
    20. }
    21. else
    22. {
    23. Iterator const mid_point=first+(length/2); // 7
    24. std::future<Iterator> async_result=
    25. std::async(&parallel_find_impl<Iterator,MatchType>, // 8
    26. mid_point,last,match,std::ref(done));
    27. Iterator const direct_result=
    28. parallel_find_impl(first,mid_point,match,done); // 9
    29. return (direct_result==mid_point)?
    30. async_result.get():direct_result; // 10
    31. }
    32. }
    33. catch(...)
    34. {
    35. done=true; // 11
    36. throw;
    37. }
    38. }
    39. template<typename Iterator,typename MatchType>
    40. Iterator parallel_find(Iterator first,Iterator last,MatchType match)
    41. {
    42. std::atomic<bool> done(false);
    43. return parallel_find_impl(first,last,match,done); // 12
    44. }

    如果想要在找到匹配项时结束,就需要在线程之间设置一个标识来表明匹配项已经被找到。因此,需要将这个标识递归的传递。通过函数①的方式来实现是最简单的办法,只需要增加一个参数——一个done标识的引用,这个表示通过程序的主入口点传入⑫。

    核心实现和之前的代码一样。通常函数的实现中,会让单个线程处理最少的数据项②;如果数据块大小不足于分成两半,就要让当前线程完成所有的工作③。实际算法在一个简单的循环当中(给定范围),直到在循环到指定范围中的最后一个,或找到匹配项,并对标识进行设置④。如果找到匹配项,标识done就会在返回前进行设置⑤。无论是因为已经查找到最后一个,还是因为其他线程对done进行了设置,都会停止查找。如果没有找到,会将最后一个元素last进行返回⑥。

    如果给定范围可以进行划分,首先要在st::async在对第二部分进行查找⑧前,要找数据中点⑦,而且需要使用std::ref将done以引用的方式传递。同时,可以通过对第一部分直接进行递归查找。两部分都是异步的,并且在原始范围过大时,直接递归查找的部分可能会再细化。

    如果直接查找返回的是mid_point,这就意味着没有找到匹配项,所以就要从异步查找中获取结果。如果在另一半中没有匹配项的话,返回的结果就一定是last,这个值的返回就代表了没有找到匹配的元素⑩。如果“异步”调用被延迟(非真正的异步),实际上这里会运行get();这种情况下,如果对下半部分的元素搜索成功,就不会执行对上半部分元素的搜索了。如果异步查找真实的运行在其他线程上么async_result变量的析构函数将会等待该线程完成,所以这里不会有线程泄露。

    像之前一样,std::async可以用来提供“异常-安全”和“异常-传播”特性。如果直接递归抛出异常,期望值的析构函数就能让异步执行的线程提前结束;如果异步调用抛出异常,这个异常将会通过对get()成员函数的调用进行传播⑩。使用try/catch块只能捕捉在done发生的异常,并且当有异常抛出⑪时,所有线程都能很快的终止运行。不过,不使用try/catch的实现依旧没问题,不同的就是要等待所有线程的工作是否完成。

    实现中一个重要的特性,就是不能保证所有数据都能被std::find串行处理。其他并行算法可以借鉴这个特性,因为要让一个算法并行起来这是必须具有的特性。如果有顺序问题,元素就不能并发的处理了。如果每个元素独立,虽然对于parallel_for_each不是很重要,不过对于parallel_find,即使在开始部分已经找到了匹配元素,也有可能返回范围中最后一个元素;如果在知道结果的前提下,这样的结果会让人很惊讶。

    OK,现在你已经使用了并行化的std::find。如在本节开始说的那样,其他相似算法不需要对每一个数据元素进行处理,并且同样的技术可以使用到这些类似的算法上去。我们将在第9章中看到“中断线程”的问题。

    为了完成我们的并行“三重奏”,我们将换一个角度来看一下std::partial_sum。对于这个算法,没有太多的文献可参考,不过让这个算法并行起来是一件很有趣的事。

    8.5.3 并行实现:std::partial_sum

    std::partial_sum会计算给定范围中的每个元素,并用计算后的结果将原始序列中的值替换掉。比如,有一个序列[1,2,3,4,5],在执行该算法后会成为:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。让这样一个算法并行起来会很有趣,因为这里不能讲任务分块,对每一块进行独立的计算。比如,原始序列中的第一个元素需要加到后面的一个元素中去。

    确定某个范围部分和的一种的方式,就是在独立块中计算部分和,然后将第一块中最后的元素的值,与下一块中的所有元素进行相加,依次类推。如果有个序列[1,2,3,4,5,6,7,8,9],然后将其分为三块,那么在第一次计算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后将6(第一块的最后一个元素)加到第二个块中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再将第二块的最后一个元素21加到第三块中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。

    将原始数据分割成块,加上之前块的部分和就能够并行了。如果每个块中的末尾元素都是第一个被更新的,那么块中其他的元素就能被其他线程所更新,同时另一个线程对下一块进行更新,等等。当处理的元素比处理核心的个数多的时候,这样完成工作没问题,因为每一个核芯在每一个阶段都有合适的数据可以进行处理。

    如果有很多的处理器(就是要比处理的元素个数多),之前的方式就无法正常工作了。如果还是将工作划分给每个处理器,第一步就没必要去做了。这种情况下,传递结果就意味着让处理器进行等待,这时需要给这些处于等待中的处理器一些工作。所以,可以采用完全不同的方式来处理这个问题。比起将数据块中的最后一个元素的结果向后面的元素块传递,可以对部分结果进行传播:第一次与相邻的元素(距离为1)相加和(和之前一样),之后和距离为2的元素相加,后来和距离为4的元素相加,以此类推。比如:初始序列为[1,2,3,4,5,6,7,8,9],第一次后为[1,3,5,7,9,11,13,15,17],第二次后为[1,3,6,10,14,18, 22,26,30],下一次就要隔4个元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8个元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],这就是最终的结果。虽然,比起第一种方法多了很多步骤,不过在可并发平台下,这种方法提高了并行的可行性;每个处理器可在每一步中处理一个数据项。

    总体来说,当有N个操作时(每步使用一个处理器)第二种方法需要log(N)[底为2]步;在本节中,N就相当于数据链表的长度。比起第一种,每个线程对分配块做N/k个操作,然后在做N/k次结果传递(这里的k是线程的数量)。因此,第一种方法的时间复杂度为O(N),不过第二种方法的时间复杂度为Q(Nlog(N))。当数据量和处理器数量相近时,第二种方法需要每个处理器上log(N)个操作,第一种方法中每个处理器上执行的操作数会随着k的增加而增多,因为需要对结果进行传递。对于处理单元较少的情况,第一种方法会比较合适;对于大规模并行系统,第二种方法比较合适。

    不管怎么样,先将效率问题放一边,让我们来看一些代码。下面清单实现的,就是第一种方法。

    清单8.11 使用划分的方式来并行的计算部分和

    1. template<typename Iterator>
    2. void parallel_partial_sum(Iterator first,Iterator last)
    3. {
    4. typedef typename Iterator::value_type value_type;
    5. struct process_chunk // 1
    6. {
    7. void operator()(Iterator begin,Iterator last,
    8. std::future<value_type>* previous_end_value,
    9. std::promise<value_type>* end_value)
    10. {
    11. try
    12. {
    13. Iterator end=last;
    14. ++end;
    15. std::partial_sum(begin,end,begin); // 2
    16. if(previous_end_value) // 3
    17. {
    18. value_type& addend=previous_end_value->get(); // 4
    19. *last+=addend; // 5
    20. if(end_value)
    21. {
    22. end_value->set_value(*last); // 6
    23. }
    24. std::for_each(begin,last,[addend](value_type& item) // 7
    25. {
    26. item+=addend;
    27. });
    28. }
    29. else if(end_value)
    30. {
    31. end_value->set_value(*last); // 8
    32. }
    33. }
    34. catch(...) // 9
    35. {
    36. if(end_value)
    37. {
    38. end_value->set_exception(std::current_exception()); // 10
    39. }
    40. else
    41. {
    42. throw; // 11
    43. }
    44. }
    45. }
    46. };
    47. unsigned long const length=std::distance(first,last);
    48. if(!length)
    49. return last;
    50. unsigned long const min_per_thread=25; // 12
    51. unsigned long const max_threads=
    52. (length+min_per_thread-1)/min_per_thread;
    53. unsigned long const hardware_threads=
    54. std::thread::hardware_concurrency();
    55. unsigned long const num_threads=
    56. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
    57. unsigned long const block_size=length/num_threads;
    58. typedef typename Iterator::value_type value_type;
    59. std::vector<std::thread> threads(num_threads-1); // 13
    60. std::vector<std::promise<value_type> >
    61. end_values(num_threads-1); // 14
    62. std::vector<std::future<value_type> >
    63. previous_end_values; // 15
    64. previous_end_values.reserve(num_threads-1); // 16
    65. join_threads joiner(threads);
    66. Iterator block_start=first;
    67. for(unsigned long i=0;i<(num_threads-1);++i)
    68. {
    69. Iterator block_last=block_start;
    70. std::advance(block_last,block_size-1); // 17
    71. threads[i]=std::thread(process_chunk(), // 18
    72. block_start,block_last,
    73. (i!=0)?&previous_end_values[i-1]:0,
    74. &end_values[i]);
    75. block_start=block_last;
    76. ++block_start; // 19
    77. previous_end_values.push_back(end_values[i].get_future()); // 20
    78. }
    79. Iterator final_element=block_start;
    80. std::advance(final_element,std::distance(block_start,last)-1); // 21
    81. process_chunk()(block_start,final_element, // 22
    82. (num_threads>1)?&previous_end_values.back():0,
    83. 0);
    84. }

    这个实现中使用的结构体和之前算法中的一样,将问题进行分块解决,每个线程处理最小的数据块⑫。其中,有一组线程⑬和一组承诺值⑭,用来存储每块中的最后一个值;并且实现中还有一组future⑮,用来对前一块中的最后一个值进行检索。可以为期望值⑯做些储备,以避免生成新线程时,再分配内存。

    主循环和之前一样,不过这次是让迭代器指向了每个数据块的最后一个元素,而不是作为一个普通值传递到最后⑰,这样就方便向其他块传递当前块的最后一个元素了。实际处理是在process_chunk函数对象中完成,这个结构体看上去不是很长;当前块的开始和结束迭代器和前块中最后一个值的期望值一起,作为参数进行传递,并且承诺值用来保留当前范围内最后一个值的原始值⑱。

    生成新的线程后,就对开始块的ID进行更新,别忘了传递最后一个元素⑲,并且将当前块的最后一个元素存储到期望值,上面的数据将在循环中再次使用到⑳。

    处理最后一个数据块前,需要获取之前数据块中最后一个元素的迭代器(21),这样就可以将其作为参数传入process_chunk(22)中了。std::partial_sum不会返回一个值,所以在最后一个数据块被处理后,就不用再做任何事情了。当所有线程的操作完成时,求部分和的操作也就算完成了。

    OK,现在来看一下process_chunk函数对象①。对于整块的处理是始于对std::partial_sum的调用,包括对于最后一个值的处理②,不过需要要知道当前块是否是第一块③。如果当前块不是第一块,就会有一个previous_end_value值从前面的块传过来,所以需要等待这个值的产生④。为了将算法最大程度的并行,首先需要对最后一个元素进行更新⑤,这样就能将这个值传递给下一个数据块(如果有下一个数据块的话)⑥。当完成这个操作,就可以使用std::for_each和简单的Lambda函数⑦对剩余的数据项进行更新。

    如果previous_end_value值为空,当前数据块就是第一个数据块,所以只需要为下一个数据块更新end_value⑧(如果有下一个数据块的话——当前数据块可能是唯一的数据块)。

    最后,如果有任意一个操作抛出异常,就可以将其捕获⑨,并且存入承诺值⑩,如果下一个数据块尝试获取前一个数据块的最后一个值④时,异常会再次抛出。处理最后一个数据块时,异常会重新抛出⑪,因为抛出动作一定会在主线程上进行。

    因为线程间需要同步,这里的代码就不容易使用std::async重写。任务等待会让线程中途去执行其他的任务,所以所有的任务必须同时执行。

    基于块以传递末尾元素值的方法就介绍到这里,让我们来看一下第二种计算方式。

    实现以2的幂级数为距离部分和算法

    第二种算法通过增加距离的方式,让更多的处理器充分发挥作用。这种情况下,没有进一步同步的必要了,因为所有中间结果都直接传递到下一个处理器上去了。不过,实际中我们很少见到,单个处理器处理对一定数量的元素执行同一条指令,这种方式称为单指令-多数据流(SIMD)。因此,代码必须能处理通用情况,并且需要在每步上对线程进行显式同步。

    完成这种功能的一种方式是使用栅栏(barrier)——一种同步机制:只有所有线程都到达栅栏处,才能进行之后的操作;先到达的线程必须等待未到达的线程。C++11标准库没有直接提供这样的工具,所以需要自行设计一个。

    试想游乐场中的过山车。如果有适量的游客在等待,那么过山车管理员就要保证,在过山车启动前,每一个位置都得坐一个游客。栅栏的工作原理也一样:你已经知道了“座位”的数量,线程就是要等待所有“座位”都坐满。当等待线程够数,它们可以继续运行;这时,栅栏会重置,并且会让下一拨线程开始等待。通常,会在循环中这样做,当同一个线程再次到达栅栏处,它会再次等待。这种方法是为了让线程同步,所以不会有线程在其他未完成的情况下,就去完成下一个任务。如果有线程提前执行,这样的算法就是一场灾难,因为提前出发的线程可能会修改要被其他线程使用到的数据,后面线程获取到的数据就不是正确数据了。

    下面的代码就简单的实现了一个栅栏。

    清单8.12 简单的栅栏类

    1. class barrier
    2. {
    3. unsigned const count;
    4. std::atomic<unsigned> spaces;
    5. std::atomic<unsigned> generation;
    6. public:
    7. explicit barrier(unsigned count_): // 1
    8. count(count_),spaces(count),generation(0)
    9. {}
    10. void wait()
    11. {
    12. unsigned const my_generation=generation; // 2
    13. if(!--spaces) // 3
    14. {
    15. spaces=count; // 4
    16. ++generation; // 5
    17. }
    18. else
    19. {
    20. while(generation==my_generation) // 6
    21. std::this_thread::yield(); // 7
    22. }
    23. }
    24. };

    这个实现中,用一定数量的“座位”构造了一个barrier①,这个数量将会存储count变量中。起初,栅栏中的spaces与count数量相当。当有线程都在等待时,spaces的数量就会减少③。当spaces的数量减到0时,spaces的值将会重置为count④,并且generation变量会增加,以向线程发出信号,让这些等待线程能够继续运行⑤。如果spaces没有到达0,那么线程会继续等待。这个实现使用了一个简单的自旋锁⑥,对generation的检查会在wait()开始的时候进行②。因为generation只会在所有线程都到达栅栏的时候更新⑤,在等待的时候使用yield()⑦就不会让CPU处于忙等待的状态。

    这个实现比较“简单”的真实意义:使用自旋等待的情况下,如果让线程等待很长时间就不会很理想,并且如果超过count数量的线程对wait()进行调用,这个实现就没有办法工作了。如果想要很好的处理这样的情况,必须使用一个更加健壮(更加复杂)的实现。我依旧坚持对原子变量操作顺序的一致性,因为这会让事情更加简单,不过有时还是需要放松这样的约束。全局同步对于大规模并行架构来说是消耗巨大的,因为相关处理器会穿梭于存储栅栏状态的缓存行中(可见8.2.2中对乒乓缓存的讨论),所以需要格外的小心,来确保使用的是最佳同步方法。如果支持并发技术规范扩展,这里就可以使用std::experimental::barrier,如第4章所述。

    不论怎么样,这些都需要考虑到,需要有固定数量的线程执行同步循环。好吧,大多数情况下线程数量都是固定的。代码起始部分的几个数据项,只需要几步就能得到其最终值。这就意味着,无论是让所有线程循环处理范围内的所有元素,还是让栅栏来同步线程,都会递减count的值。我会选择后者,因为其能避免线程做不必要的工作,仅仅是等待最终步骤完成。

    要将count改为一个原子变量,这样在多线程对其进行更新的时候,就不需要添加额外的同步:

    1. std::atomic<unsigned> count;

    初始化保持不变,不过当spaces的值被重置后,需要显式的对count进行load()操作:

    1. spaces=count.load();

    这就是要对wait()函数的改动;现在需要一个新的成员函数来递减count。这个函数命名为done_waiting(),因为当一个线程完成其工作,并在等待的时候,才能对其进行调用它:

    1. void done_waiting()
    2. {
    3. --count; // 1
    4. if(!--spaces) // 2
    5. {
    6. spaces=count.load(); // 3
    7. ++generation;
    8. }
    9. }

    实现中,首先要减少count①,所以下一次spaces将会被重置为一个较小的数。然后,需要递减spaces的值②。如果不做这些操作,有些线程将会持续等待,因为spaces被旧的count初始化,大于期望值。一组当中最后一个线程需要对计数器进行重置,并且递增generation的值③,就像在wait()里面做的那样。最重要的区别:最后一个线程不需要等待。当最后一个线程结束,整个等待也就随之结束!

    现在就准备开始写部分和的第二个实现吧。每一步中的每一个线程都在栅栏出调用wait(),来保证线程所处步骤一致,并且当所有线程都结束,最后一个线程会调用done_waiting()来减少count的值。如果使用两个缓存对原始数据进行保存,栅栏也可以提供你所需要的同步。每一步中,线程都会从原始数据或是缓存中读取数据,并且将新值写入对应位置。如果有线程先从原始数据处获取数据,下一步就从缓存上获取数据(或相反)。这就能保证在读与写都是由独立线程完成,并不存在条件竞争。当线程结束等待循环,就能保证正确的值最终被写入到原始数据当中。下面的代码就是这样的实现。

    清单8.13 通过两两更新对的方式实现partial_sum

    1. struct barrier
    2. {
    3. std::atomic<unsigned> count;
    4. std::atomic<unsigned> spaces;
    5. std::atomic<unsigned> generation;
    6. barrier(unsigned count_):
    7. count(count_),spaces(count_),generation(0)
    8. {}
    9. void wait()
    10. {
    11. unsigned const gen=generation.load();
    12. if(!--spaces)
    13. {
    14. spaces=count.load();
    15. ++generation;
    16. }
    17. else
    18. {
    19. while(generation.load()==gen)
    20. {
    21. std::this_thread::yield();
    22. }
    23. }
    24. }
    25. void done_waiting()
    26. {
    27. --count;
    28. if(!--spaces)
    29. {
    30. spaces=count.load();
    31. ++generation;
    32. }
    33. }
    34. };
    35. template<typename Iterator>
    36. void parallel_partial_sum(Iterator first,Iterator last)
    37. {
    38. typedef typename Iterator::value_type value_type;
    39. struct process_element // 1
    40. {
    41. void operator()(Iterator first,Iterator last,
    42. std::vector<value_type>& buffer,
    43. unsigned i,barrier& b)
    44. {
    45. value_type& ith_element=*(first+i);
    46. bool update_source=false;
    47. for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
    48. {
    49. value_type const& source=(step%2)? // 2
    50. buffer[i]:ith_element;
    51. value_type& dest=(step%2)?
    52. ith_element:buffer[i];
    53. value_type const& addend=(step%2)? // 3
    54. buffer[i-stride]:*(first+i-stride);
    55. dest=source+addend; // 4
    56. update_source=!(step%2);
    57. b.wait(); // 5
    58. }
    59. if(update_source) // 6
    60. {
    61. ith_element=buffer[i];
    62. }
    63. b.done_waiting(); // 7
    64. }
    65. };
    66. unsigned long const length=std::distance(first,last);
    67. if(length<=1)
    68. return;
    69. std::vector<value_type> buffer(length);
    70. barrier b(length);
    71. std::vector<std::thread> threads(length-1); // 8
    72. join_threads joiner(threads);
    73. Iterator block_start=first;
    74. for(unsigned long i=0;i<(length-1);++i)
    75. {
    76. threads[i]=std::thread(process_element(),first,last, // 9
    77. std::ref(buffer),i,std::ref(b));
    78. }
    79. process_element()(first,last,buffer,length-1,b); // 10
    80. }

    代码的整体结构应该不用说了。process_element类有函数调用操作可以用来做具体的工作①,就是运行一组线程⑨,并将线程存储到vector中⑧,同样还需要在主线程中对其进行调用⑩。这里与之前最大的区别就是,线程的数量是根据列表中的数据量来定的,而不是根据std::thread::hardware_concurrency。如之前所说,除非使用的是一个大规模并行的机器,因为这上面的线程都十分廉价(虽然这样的方式并不是很好),还能为我们展示了其整体结构。这个结构在有较少线程的时候,每一个线程只能处理源数据中的部分数据,当没有足够的线程支持该结构时,效率要比传递算法低。

    不管怎样,主要的工作都是调用process_element的函数操作符来完成的。每一步,都会从原始数据或缓存中获取第i个元素②,并且将获取到的元素加到指定stride的元素中去③,如果从原始数据开始读取的元素,加和后的数需要存储在缓存中④。然后,在开始下一步前,会在栅栏处等待⑤。当stride超出了给定数据的范围,当最终结果已经存在缓存中时,就需要更新原始数据中的数据,同样这也意味着本次加和结束。最后,在调用栅栏中的done_waiting()函数⑦。

    注意这个解决方案并不是异常安全的。如果某个线程在process_element执行时抛出一个异常,就会终止整个应用。可以使用一个std::promise来存储异常,就像在清单8.9中parallel_find的实现,或仅使用一个被互斥量保护的std::exception_ptr即可。

    总结下这三个例子。希望其能保证我们了解8.1、8.2、8.3和8.4节中提到的设计考量,并且证明了这些技术在真实的代码中,需要承担哪些责任。


    [4] http://threadingbuildingblocks.org/