第二部分,函数式数据处理(第4章到第7章)

第4章,引入流

流简介

流到底是什么呢?简短的定义就是“从支持数据处理操作的源生成的元素序列”。流操作有两个重要的特点:流水线,内部迭代。

~流是什么,工作中需要用到它,那么就应该知道它是什么。

流与集合

粗略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。 相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。这对编程有很大的好处。这个思想就是用户仅仅从流中提取需要的值,而这些值——在用户看不见的地方——只会按需生成。这是一种生产者-消费者的关系。从另一角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值。

~流与集合的区别。

流操作

使用流

总而言之,流的使用一般包括三件事: 1、一个数据源(如集合)来执行一个查询; 2、一个中间操作链,形成一条流的流水线; 3、一个终端操作,执行流水线,并能生成结果。

~知道就行。

第5章,使用流

筛选和切片

1、谓词筛选

  menu.stream()
      .filter(Dish::isVegetarian)//
      .collect(toList());

2、筛选各异的元素

  numbers.stream()
          .filter(i -> i % 2 == 0)
          .distinct()//
          .forEach(System.out::println);

3、截断流

  menu.stream()
      .filter(d -> d.getCalories() > 300)
      .limit(3)//
      .collect(toList());

4、跳过元素

  menu.stream()
      .filter(d -> d.getCalories() > 300)
      .skip(2)//
      .collect(toList());

~筛选和切片例子,记录在这里,便于以后使用。

映射

对流中每一个元素应用函数

  menu.stream()
      .map(Dish::getName)//
      .map(String::length)//
      .collect(toList());

~列举的例子,方便以后使用。

流的扁平化

  words.stream()
      .map(w -> w.split(""))
      .flatMap(Arrays::stream)//
      .distinct()
      .collect(toList());

使用flatMap方法的效果是,各个数组并不是分别映射成一个流,而是映射成流的内容。所有使用map(Arrays::stream)时生成的单个流都被合并起来,即扁平化为一个流。

一言以蔽之,flatMap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

~将各个生成流扁平化为单个流。

查找与匹配

Stream API通过allMatch、anyMatch、noneMatch、findMatch和findAny方法来处理数据集中的某些元素是否匹配一个给定的属性。

  if(menu.stream().anyMatch(Dish::isVegetarian)){}

  Optional<Dish> dish =
                    menu.stream()
                        .filter(Dish::isVegetarian)
                        .findAny();

~例子

归约

元素求和

reduce接受两个参数: 1、一个初始值,这里是0; 2、一个BinaryOperator来将两个元素结合起来产生一个新值。

  int sum = numbers.stream().reduce(0,(a,b) -> a+b);

~记录在这里,

最大值和最小值

  Optional<Integer> max = numbers.stream().reduce(Integer::max);

  Optional<Integer> min = numbers.stream().reduce(Integer::min);

~例子,便于以后直接参考。

数值流

映射到数值流

  int calories = menu.stream()
                      .mapToInt(Dish::getCalories)//
                      .sum();

转换回对象流

  IntStream intStream = menu.stream()
                          .mapToInt(Dish::getCalories);
  Stream<Integer> stream = intStream.boxed();//

数值范围

  IntStream evenNumbers = IntStream
                      .rangeClosed(1,100)//range(1,100)
                      .filter(n -> n % 2 == 0);

构建流

1、由值构建流

  Stream<String> stream = Stream.of("Lambdas","in","action");
  Stream<String> emptyStream = Stream.empty();

2、由数组创建流

  int[] numbers = {1,2,3,4,5};
  int sum = Stream.stream(numbers).sum();

3、由文件生成流

  Stream<String> lines =
   Files.lines(Paths.get("data.txt"),Charset.defaultCharset());

4、由函数生成流:创建无限流 由iterate和generate产生的流会用给定的函数按需创建值,因此可以无穷无尽地计算下去。一般来说,应该使用limit来对这种流加以限制,以避免打印无穷多个值。

  Stream.iterate(0,n->n+2)
        .limit(10)
        .forEach(System.out::println);

与iterate方法类似,generate方法也可以让你按需生成一个无限流。但generate不是依次对每个新生成的值应用函数的。它接受一个Supplier类型的Lambda提供新的值。</u>``

  Stream.generate(Math::random)
        .limit(10)
        .forEach(System.out::println);

第6章,用流收集数据

规约和汇总

查询流中的最大值和最小值

  Comparator<Dish> dishColoriesComparator =
      Comparator.comparingInt(Dish::getCalories);//比较器
  Optional<Dish> mostCalorieDish =
      menu.stream()
          .collect(maxBy(dishColoriesComparator));

汇总

  //平均值
  double avgCalories =
      menu.stream().collect(averagingInt(Dish::getCalories));
  //统计值:数量,总和,平均值,最大值,最小值。
  IntSummaryStatices menuStatistics =
      menu.stream().collect(summarizingInt(Dish::getCalories));

连接字符串

  String shortMenu =
   menu.stream().map(Dish::getName).collect(joining(", "));

广义的归约汇总

事实上,我们已经讨论的所有收集器,都是一个可以用reducing工厂方法定义的归约过程的特殊情况而已。Collectors.reducing工厂方法是所有这些特殊情况的一般化。

  int totalCalories =
     menu.stream()
     .collect(reducing(0,Dish::getCalories,(i,j) -> i+j));

它有三个参数: 第一个参数是归约操作的起始值,也是流中没有元素时的返回值,所以很显然对于数值和而言0是一个合适的值。 第二个参数就是一个函数。 第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值。

~知道这个有帮助。

收集与归约

语义问题在于,reduce方法旨在把两个值结合起来生成一个新值,它是一个不可变的归约。与此相反,collect方法的设计就是要改变容器,从而累积要输出的结果。以错误的语义使用reduce方法还会造成一个实际问题:这个归约过程不能并行工作,因为由多个线程并发修改同一个数据结构可能会破坏List本身。在这种情况下,如果你想要线程安全,就需要每次分配一个新的List,而对象分配又会影响性能。这就是collect方法特别适合表达可变容器上的归约的原因,更关键的是它合适并行操作。

~大多数的时候都要考虑使用collect方法,只有在求和,求最大值,最小值,平均值可以考虑使用reduce方法。

分组

  Map<Dish.type,List<Dish>> dishesByType =
    menu.stream().collect(
          groupingBy(Dish::getType)
        );

多级分组

  Map<Dish.type,Map<CaloricLevel,List<Dish>>> dishesByTypeCaloricLevel =
    menu.stream().collect(
            groupingBy(Dish::getType,
              groupingBy(dish -> {
                if(dish.getCalories() <= 400 return CaloricLevel.DIET);
                else if(dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                else return CaloricLevel.FAT;
              })
            )
          );

按子组收集数据

传递给第一个groupingBy的第二个参数可以是任何类型,而不一定是另一个groupingBy,如下,可以传递counting收集器作为groupingBy收集器的第二个参数。

  Map<Dish.type,Long> typesCount =
    menu.stream().collect(
          groupingBy(Dish::getType,counting())//
        );

把收集器的结果转换为另一种类型

  Map<Dish.type,Dish> mostCaloricByType =
    menu.stream().collect(
          groupingBy(Dish::getType,
            collectingAndThen(
              maxBy(comparingInt(Dish::getCalories)),
              Optional::get
            ))
        );

分区

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称为分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组。

Map<Boolean,List<Dish>> partitionedMenu =
            menu.stream().collect(
              partitioningBy(Dish::isVegetarian));

第7章,并行数据处理与性能

并行流

在现实中,对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。

  Stream.iterate(1L,i -> i + 1)
        .limit(n)//n=1000000
        .parallel()
        .reduce(0L,Long::sum);

高效使用并行流

高效使用并行流的建议:

分支/合并框架

使用RecursiveTask

要把任务提交到这个池,必须创建RecursiveTask的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当然它可能会更新其他非局部机构)。要定义RecursiveTask,只需实现它唯一的抽象方法compute:

  protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便在拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

  if(任务足够小或不可分){
    顺序计算该任务
  } else {
    将任务分成两个任务
    递归调用本方法拆分每个子任务等待所有子任务完成
    合并每个子任务的结果
  }

~任务拆分

使用分支/合并框架的最佳做法

~使用ForkJoinPool时,这几点很重要。

工作窃取

不幸的是,实际中,每个子任务所花的时间可能天差万别,要么是因为划分策略效率低,要么是有不可预知的原因,比如磁盘访问慢,或者需要和外部服务协调执行。

分支/合并框架工程用一种称为工作窃取的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他线程还很忙。这时,这个线程并没有闲下来,而是随机选择一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

~需要了解。