Java8——Stream并行流(二)

并行虽好,但是一定要注意正确使用,特别是对于我们自己传入的函数,并行模式下不会保证函数的线程安全,因此这要求我们的函数具有天然的线程安全性,如果使用的算法改变了某些共享对象的属性状态,并且我们自己没有保证并行安全,那么就可能造成数据异常!

2. 正确使用

  并行虽好,但是一定要注意正确使用,特别是对于我们自己传入的函数,并行模式下不会保证函数的线程安全,因此这要求我们的函数具有天然的线程安全性,如果使用的算法改变了某些共享对象的属性状态,并且我们自己没有保证并行安全,那么就可能造成数据异常!另外,在reduce方法中,并行流操作需要保证初始值与其他之操作之后的值不会改变,并且运算需要满足结合律。

  下面是两个复杂案例:

/**
 1. @author lx
 */
public class ParallelErr {

    /**
     * 求总和,long普通变量在串行模式下会丢失数据
     */
    @Test
    public void test() {
        Accumulator accumulator = new Accumulator();

        IntStream.rangeClosed(1, 10000).forEach(accumulator::add);
        System.out.println(accumulator);
    }

    /**
     * 求总和,long普通变量在并行模式下会丢失数据
     */
    @Test
    public void test1() {
        Accumulator accumulator = new Accumulator();
        IntStream.rangeClosed(1, 10000).parallel().forEach(accumulator::add);
        System.out.println(accumulator);
    }

    /**
     * 求总和,使用LongAdder累加器,这是一个线程安全的累加器
     */
    @Test
    public void test2() {
        LongAdder longAdder = new LongAdder();
        IntStream.rangeClosed(1, 10000).parallel().forEach(longAdder::add);
        System.out.println(longAdder);
    }


    public static class Accumulator {
        public static int total = 0;

        public void add(int value) {
            total += value;
        }

        @Override
        public String toString() {
            return "" + total;
        }
    }

    /**
     * 要求:计算n的阶乘,然后将结果乘以5后返回
     */
    @Test
    public void test4() {
        System.out.println("安全的串行");

        //5的阶乘再乘以5,然后返回
        //串行模式下使用reduce没问题
        System.out.println(IntStream.rangeClosed(1, 5).reduce(5, (x, y) -> x * y));


        System.out.println("错误的并行");

        //如果仅仅改成并行操作,那么就会出问题,最终计算结果为375000
        //因为并行操作首先会将[1,5]之间的数拆分成为五组数据分别与5相乘,得到:5、10、15、20、25
        //然后将结果继续汇总相乘:5*10=50、20*25=500、15 ,然后继续汇总相乘 15*500=7500、50,最后一次汇总:50*7500=375000
        System.out.println(IntStream.rangeClosed(1, 5).parallel().reduce(5, (x, y) -> {
            System.out.println("x:" + x + " y: " + y + " -> " + (x * y));
            return x * y;
        }));

        System.out.println("collectingAndThen改进");

        //改成并行时,首先我们要保证初始值与其他之操作之后的值不会改变,因此这里的初始值只能是1,然后拆分相乘之后我们可以得到:1、2、3、4、5
        //随后会将结果继续汇总相乘:1*2=2、4*5=20、3 ,然后继续汇总相乘 30*3=60、2,最后一次汇总:60*2=120
        //这样我们就首先把5的阶乘求得了,最后是一个乘以5的操作,这一步我们必须保证不能并行
        //因此使用collectingAndThen方法在最后调用乘以5即可,collectingAndThen可以保证最后一步是串行的
        Integer collect = IntStream.rangeClosed(1, 5).parallel().boxed().collect(Collectors.collectingAndThen(Collectors.reducing(1, (x, y) -> x * y), x -> x * 5));
        System.out.println(collect);


        System.out.println("自定义收集器改进");
        //当前我们也可以自定义收集器,同样满足我们的要求,而且性能更好

        //完整版如下
        System.out.println(IntStream.rangeClosed(1, 5).parallel().boxed().collect(Collector.of(new Supplier<List<Integer>>() {

            /**
             * 返回累加器函数
             */
            @Override
            public List<Integer> get() {
                return new ArrayList<>();
            }
        }, new BiConsumer<List<Integer>, Integer>() {
            /**
             * 处理元素函数,添加到集合中
             */
            @Override
            public void accept(List<Integer> integers, Integer integer) {
                integers.add(integer);
            }
        }, new BinaryOperator<List<Integer>>() {
            /**
             * 结果并行汇总函数
             * 我们将两个集合的第一个元素相乘,然后替换第一个元素,最终结果就是1*2*3*4*5=120
             */
            @Override
            public List<Integer> apply(List<Integer> integers, List<Integer> integers2) {
                Integer integer = integers.get(0);
                Integer integer1 = integers2.get(0);
                integers.add(0, integer * integer1);
                return integers;
            }
        }, new Function<List<Integer>, Integer>() {
            /**
             * 返回最终结果函数
             * 最后乘以5返回即可
             */
            @Override
            public Integer apply(List<Integer> integers) {
                return integers.get(0) * 5;
            }
        })));


        //自定义收集器简化之后,这里也能看出来自定义收集器功能的强大
        System.out.println(IntStream.rangeClosed(1, 5).parallel().boxed().collect(Collector.of(ArrayList::new, List::add, (integers, integers2) -> {
            integers.add(0, integers.get(0) * integers2.get(0));
            return integers;
        }, (Function<List<Integer>, Integer>) integers -> integers.get(0) * 5)));
    }
}

  除了我们自己要保证安全之外,并行流而并不一定适用与所有操作,通常在生产环境使用并行流之前,我们需要进行性能测试。
  一般来说,影响并行化性能的有这么几个因素:

  1. 源数据数量:只有在数据足够多的时候,并行化效果才会更好,因为并行操作涉及到新的线程的启动、任务划分等并行化操作,会消耗额外的时间。
  2. 源数据结构:原数据的结构对数据的拆分操作有非常重要的影响。支持随机存取的结构比如ArrayList、数组或IntStream.range等结构拆分非常容易。HashSet、TreeSet涉及到复杂树结构,不容易被分解,但是可以通过分解树形结构勉强分解。而LinkedList、Streams.iterate 、BufferedReader.lines等数据结构,难以选择分解的点位,不知道数据量,可能需要花费O(N)时间统计数量然后分解,性能极差。
  3. 装箱和拆箱:如果处理基本类型的原数据,那么尽量使用特性化的流,因为没有中间的拆装箱操作,大量数据情况下能够节省很多时间。
  4. 单个元素流水线操作耗时:一个元素在流水线上处理时间越长,并行化带来的收益越高。
  5. 合并操作耗时:并行化的最后步骤会将结果合并,如果合并操作(比如Collector中的combiner方法)是一个非常耗时的操作,那么可能造成性能反而不如串行流。
  6. 处理器线程数量:如果是单核单线程,那么没必要并行化,因为一定不能并行,只能是并发,带来的收益一般不大,处理器可用线程数量越多,并行化收益越高!

  • 发表于 2020-09-16 17:07
  • 阅读 ( 16 )

0 条评论

请先 登录 后评论
NX小编
NX小编

973 篇文章

作家榜 »

  1. NX小编 973 文章
  2. 58沈剑 309 文章
  3. 奈学教育 130 文章
  4. 李希沅 | 奈学教育 25 文章
  5. 江帅帅 | 奈学教育 24 文章
  6. 林淮川 | 奈学教育 12 文章
  7. 科技热点 10 文章
  8. 邱鹏超 2 文章