- 官网给出的介绍
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.
The MapReduce framework consists of a single master ResourceManager, one worker NodeManager per cluster-node, and MRAppMaster per application (see YARN Architecture Guide).
Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration.
The Hadoop job client then submits the job (jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the workers, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
Although the Hadoop framework is implemented in Java™, MapReduce applications need not be written in Java.
Hadoop Streaming is a utility which allows users to create and run jobs with any executables (e.g. shell utilities) as the mapper and/or the reducer.
Hadoop Pipes is a SWIG-compatible C++ API to implement MapReduce applications (non JNI™ based).
- Inputs and Outputs
The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.
The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
前言
MapReduce作为hadoop下的一个计算模块,那么相应具备Hadoop的一些特征;在日常对数据处理过程中并不直接使用mapreduce进行数据处理,直接用mapreduce处理数据并不是很方便,那么在哪里应用MapReduce呢?我们在使用hive SQL处理数据时那么底层就使用的是MapReduce进行数据的计算,那么就需要了解到MapReduce是什么,他是怎么用的,我们通常用来解决那些问题这是本篇展示的内容。
一、MapReduce是什么
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
MapReduce作业通常将输入数据集分成独立的块,这些任务由地图任务以完全并行的方式处理。框架对地图的输出进行排序,然后将其输入到reduce任务。通常,作业的输入和输出都存储在文件系统中。该框架负责安排任务,监视任务并重新执行失败的任务。
MapReduce框架由一个主资源管理器,每个群集节点一个工作器NodeManager和每个应用程序MRAppMaster组成
二、MapReduce该怎么用
在理解怎么用之前,首先明确它是怎么安装配置的
MapReduce配置
第一步:在IDEA的pom.xml上添加需要引入的依赖(如果不在ideal上开发或者开发好了忽略此步骤)
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<!-- 根据自己的版本进行依赖导入 -->
<version>2.7.7</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
第二步:Linux上Hadoop的安装文件下需要引入的配置
集群运行模式配置在mapred-site.xml
下引入该配置
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>jobtracker.thrift.address</name>
<value>0.0.0.0:9290</value>
</property>
<property>
<name>mapred.jobtracker.plugins</name>
<value>org.apache.hadoop.thriftfs.ThriftJobTrackerPlugin</value>
<description>Comma-separated list of jobtracker plug-ins to be activated.</description>
</property>
</configuration>
第三步:打jar包
ideal上打jar包过程:file -> project structure -> Artifacts ->add ->jar -> from modules dependencies ->选好类后OK
默认路径是:D:\IdealProject\bd20201\out\artifacts\strangewugit_jar
Build -> Build Artifacts -> build
MapReduce的使用
下面是官网给出的一个单词计数案例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MRWordCount {
public static void main(String[] args) throws Exception {
//1. 获取job实例
Configuration conf = new Configuration();
// conf.set("mapreduce.input.fileinputformat.split.minsize","12313123");
Job job = Job.getInstance(conf);
//2. 设置job运行的主类
job.setJarByClass(MRWordCount.class);
//3. 设置Mapper的类
job.setMapperClass(WCMapper.class);
//4. 设置Reducer的类
job.setReducerClass(WCReducer.class);
//5. 设置Mapper输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//6. 设置Reducer输出的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//7. 设置job的输入路径
FileInputFormat.setInputPaths(job, new Path(args[1]));
//8. 设置job的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
// mr读取文本文件时 keyIn是LongWritable类型
// 当前行在文件中的字节偏移量 offset
// valueIn是Text类型
// 文本中的每一行字符串
// keyOut Text 存储每个单词
// ValueOut LongWritable 存储单词出现的数量
public static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text k = new Text();
private LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1. 取出每行的字符串
String line = value.toString();
// 2. 切分成单词
String[] words = line.split("\\s+");
// 3. 用context将处理完的结果写出到框架
for (String word : words) {
word = word.replaceAll("\\W", "").toLowerCase();
if (!"".equals(word)) {
k.set(word);
context.write(k, v);
}
}
}
}
public static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
// MR框架会将Mapper输出的数据按照Key进行分组
// 在调用Reducer的reduce方法时
// 每次传入一个key 和这个key对应的所有Value
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0L;
for (LongWritable value : values) {
count++;
}
context.write(key, new LongWritable(count));
}
}
}
运行架包
[root@wq1 bin]# pwd
/opt/hadoop-2.7.7/bin
要运行的jar包 运行的主类 要做处理的文件位置 处理后输出的位置
[root@wq1 bin]# hadoop jar wc.jar MRWordCount /wordcount/input /wordcount/output
三、MapReduce的核心问题
首先我们要明确他的工作流程是什么样的,通常我们要面临哪些问题,通过参数调优使得机器能够达到很好的状态
MapReduce工作流程(官网给出的图片)
合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
切片 -> 分区partition->写入环形内存缓冲区->执行溢出写 排序sort—>合并combiner—>生成溢出写文件->归并merge——>reduce——>复制copy->归并merge->reduce
- InputFormat源码解析
- 找到你数据存储的目录。
- 开始遍历处理(规划切片)目录下的每一个文件
- 遍历第一个文件test.txt
- 获取文件大小fs.sizeOf(test.txt);
- 计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
- 默认情况下,切片大小=blocksize
开始切,形成第1个切片:test.txt—0:128M 第2个切片test.txt—128:256M 第3个切片test.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
将切片信息写到一个切片规划文件中
整个切片的核心过程在getSplit()方法中完成。
数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。
提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。
- 在向该缓冲区写入的过程中进行分区(partition)
也就是对于每个键值对来说,都增加了一个partition属性值,然后连同键值对一起序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组,默认大小为100M)。分区操作,这样就能把map任务处理的结果发送给指定的reducer去执行,从而达到负载均衡,避免数据倾斜。MapReduce提供默认的分区类(HashPartitioner)
- 当写入的数据量达到预先设置的阙值后(mapreduce.map.io.sort.spill.percent
默认0.80,或者80%)便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临时文件中,并在写入前根据key进行排序(sort)和合并(combine,可选操作)。
- 溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的目录中。
-
当整个map任务完成溢出写后
会对磁盘中这个map任务产生的所有临时文件(spill文件)进行归并(merge)操作生成最终的正式输出文件,此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。至此,map端shuffle过程结束。
- Reduce进程启动一些数据copy线程
通过HTTP方式请求MapTask所在的NodeManager以获取输出文件。NodeManager需要为分区文件运行reduce任务。并且reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。而每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。reduce任务有少量复制线程,因此能够并行取得map输出。默认线程数为5,但这个默认值可以通过mapreduce.reduce.shuffle.parallelcopies属性进行设置。
- Copy过来的数据会先放入内存缓冲区中
如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。与map端的溢写类似,在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。
- 在reduce阶段
reduce()方法的输入是所有的Key和它的Value迭代器。此阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于NodeManager也运行数据节点,所以第一个块副本将被写到本地磁盘。1、当reduce将所有的map上对应自己partition的数据下载完成后,reducetask真正进入reduce函数的计算阶段。由于reduce计算时同样是需要内存作为buffer,可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代码MergeManagerImpl.java:674行)来设置reduce的缓存。这个参数默认情况下为0,也就是说,reduce是全部从磁盘开始读处理数据。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。所以默认情况下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让reduce直接从缓存读数据,这样做就有点Spark Cache的感觉。
map task数量,partition数量,reduce task数量怎么确定
有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。
- map 数量
一个task的map数量由splitSize来决定的,也就是由块大小来决定
goalSize = totalSize / mapred.map.tasks
inSize = max {mapred.min.split.size, minSplitSize}
splitSize = max (minSize, min(goalSize, dfs.block.size))
- 分区数量
- 之前的日志分析项目,要求将每天的日志处理单独输出到一个文件中,可以利用分区来实现
- 假设自定义分区数为5,则
job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
job.setNumReduceTasks(2);会报错
job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
源码分析,可通过自定义分区方法实现分区
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,int numReduceTasks) {
//默认使用key的hash值与上int的最大值,避免出现数据溢 的情况
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
- reduce数量
一个task的reduce数量,由partition决定
job.setPartitionerClass(PartitionChoice.class);
job.setNumReduceTasks(3);
使用mapreduce进行单词计数的过程
MapReduce的几种排序方式
- 部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。
-
全排序:
如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型 文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的 并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的 文件。主要思路是使用一个分区来描述输出的全局排序。 -
辅助排序:(GroupingComparator分组)
Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至 在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务 在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖 于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
-
二次排序:
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
Shuffle 产生的意义是什么?
完整地从 map task 端拉取数据到 reduce 端;在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗;减少磁盘 IO 对 task 执行的影响;
每个 map task 都有一个内存缓冲区,存储着 map 的输出结果
当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task 结束后再对磁盘中这个 map task 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待 reduce task 来拉数据。
- 特殊情况:当数据量很小,达不到缓冲区阙值时,怎么处理?
不会有写临时文件到磁盘的操作,也不会有后面的合并,无reduce任务。
MapReduce 怎么实现 TopN?
实现WritableComparable接口
public class compares implements WritableComparable<compares> {
@Override
public int compareTo(FlowBean o) {
if (this.sumFlow > o.getSumFlow()){
return -1;
}else if (this.sumFlow < o.getSumFlow()){
return 1;
}else {
return 0;
}
}
}
//那么在map和reduce中再通过下面这方法实现topn
compares = new compares();
if (kBeans.size() > n){
kBeans.remove(kBeans.lastKey());
}
MapReduce调优
- 关于硬件方面的调优
内存+硬盘
硬件的主要配置有 :memory 当CPU不够的话 : 程序执行运算会慢一点,当内存不够的 : 会导致heap溢出,GC。因为在内存不够的时候,Yarn里面有一个动态的线程就会检测到 , 然后就给你杀死了 - Linux层面的一些调优
vim /etc/security/limits.conf
soft nofile 65535 单个用户可用的最大进程数量(软限制)
hard nofile 65535 单个用户可用的最大进程数量(硬限制)
soft nproc 65535 可打开的文件描述符的最大数(软限制)
hard nproc 65535 可打开的文件描述符的最大数(硬限制)
- NameNode上JVM的参数调优
- 配置方面的调优
core-site.xml 的调整
- Ipc.server.listen.queue.size 控制了服务端socket的监听队列长度
<property>
<name>Ipc.server.listen.queue.size</name>
<value>65535</value>
</property>
- o.file.buffer.size默认值4096(4K),作为hadoop的缓冲区,用于hdfs文件的读写。还有map的输出都用到这个缓冲区,较大的缓存能提高数据传输这是读写sequence file的buffer size,可减少I/O次数。如果集群比较大,建议把参数调到65535-131072
<property>
<name>Io.file.buffer.size</name>
<value>65535</value>
</property>
- Hdfs的回收站,一但操作事物,删掉了HDFS中的数据,可以找回。默认不开启
Value:1440(分钟)1天
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
hdfs-site.xml 配置文件的调整
- HDFS中的block块大小的设置
有时候我们场景中可能出现大量小文件,这时候我们可以适当的调小,比如16M(16777216)
<property>
<name>dfs.blockSize</name>
<value>134217728</value>
</property>
- 带宽:默认是1M(在balancer的时候,设置hdfs中数据移动速度)集群一般是千M路由器,所以尽量改大
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>1048576</value>
</property>
- 真正datanode数据保存路径,可以写多块硬盘。主要用来实现IO平衡,因此会显著改进磁盘IO性能
<property>
<name>dfs.datanode.data.dir</name>
<value>/sda,/sda2,/sda3</value>
</property>
- Namenode server RPC 的处理线程数,默认是10,namenode线程通过RPC的方式跟datanode通信,如果datanode数量太多时可能出现RPC timeout,提升网络速度或者提高这个值。但是thread数量增多也代表着namenode消耗的内存也随着增加
<property>
<name>Dfs.namenode.hadler.count</name>
<value>30</value>
</property>
mapred-site.xml
- 3 默认开启的reduce数量,多台机器分担一台机器的压力
<property>
<name>Mapreduce.job.reduces</name>
<value>3</value>
</property>
- 环形缓冲区所占内存大小默认100M
<property>
<name> Mapreduce.task.io.sort.mb</name>
<value>200</value>
</property>
- 环形缓冲区的阈值 默认的是0.8
<property>
<name>Mapreduce.map.sort.spill.percent</name>
<value>0.8</value>
</property>
- Reduce Task中合并小文件时,一次合并的文件数据,每次合并的时候选择最小的前10(默认值)进行合并。
<property>
<name>Mapreduce.task.io.sort.factor</name>
<value>50</value>
</property>
- map输出是否进行压缩,如果压缩就会多耗cpu,但是减少传输时间,如果不压缩,就需要较多的传输带宽。需要配 合mapreduce.map.output.compress.codec(指定压缩方式)
<property>
<name>Mapreduce.map.output.compress</name>
<value>true</value>
</property>
#压缩方式(牺牲CPU换IO和磁盘的方式)
<property>
<name>Mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
- Reduce shuffle阶段并行传输的数量。根据集群大小可调
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>20</value>
</property>
- Map和reduce是通过http传输的,这个是设置传输的并行数
<property>
<name>Mapreduce.tasktracker.http.threads</name>
<value>40</value>
</property>
- 容错性相关的参数
#每个Map Task最大重试次数,一旦重试参数超过该值,则认为 Map Task 运行失败默认4
<property>
<name>mapreduce.map.maxattempts</name>
<value>4</value>
</property>
#每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为 Map Task 运行失败,默认4
<property>
<name>mapreduce.reduce.maxattempts</name>
<value>4</value>
</property>
Yarn-site.xml配置
- 给nodemanager可用的物理内存(8GB)
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value>
</property>
- 单个任务可申请的最少内存 也就是RM 中每个容器请求的最小配置
<property>
<name>Yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>
- 单个任务可申请的最大内存
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>8192</value>
</property>
- Yarn这个节点可使用的虚拟CPU个数 默认是8.但我们通常配置成跟物理CPU个数一样
<property>
<name>Yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
</property>
重要的参数配置
#默认为 1024。如果 Map Task 实际使用的资源量超过该值,则会被强制杀死。可以适当的调
<property>
<name>mapreduce.map.memory.mb</name>
<value>1280</value>
</property>
# Map端启用JVM所占用的内存大小 (官方----->200M heap over/GC)
# -XX:-UseGCOverheadLimit(禁用GC,我们使用parNew+CMS方式)
<property>
<name>Mapreduce.map.java.opts</name>
<value>-Xmx1024m -XX:-UseGCOverheadLimit</value>
</property>
# reduce阶段,reduce端的内存分配多少 一个 Reduce Task 可使用的资源上限(单位:MB),默认为 1024。如果 Reduce Task 实际使用的资源量超过该值,则会被强制杀死
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>1280</value>
</property>
# reduce端启用JVM所占用的内存大小
#java的堆栈肯定要小于memory的,因为还要考虑非堆和其他,上面最大能分配多少,还取决于yarn的配置。
<property>
<name>Mapreduce.reduce.java.opts</name>
<value>-Xmx1024m -XX:-UseGCOverheadLimit</value>
</property>
#每个 Maptask 可用的最多 cpu core 数目, 默认值: 1
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>1</value>
</property>
#每个 Reducetask 可用最多 cpu core 数 默认值: 1
<property>
<name> mapreduce.reduce.cpu.vcores</name>
<value>1</value>
</property>
mapreduce shuffle 与spark shuffle的区别
由于Shuffle涉及到了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响到了整个程序的运行效率。
spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle。
1. HashShuffle
又分为普通机制和合并机制(consolidation),普通机制因为其会产生M * R个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer缓存从而将磁盘小文件的数量降低到Core * R个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。
SortShuffle
也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生多个磁盘小文件,最后合并这些小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类(比如reduceByKey)的shuffle算子的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。
- 区别
在MapReduce中
,map task必须将所有的数据都写入本地磁盘文件以后,才能启动reduce操作,来拉取数据。为什么?因为mapreduce要实现默认的根据key的排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。
但是Spark不需要
,spark默认情况下,是不会对数据进行排序的。因此ShuffleMapTask每写入一点数据,ResultTask就可以拉取一点数据,然后在本地执行我们定义的聚合函数和算子,进行计算。
spark这种机制的好处在于,速度比mapreduce快多了。但是也有一个问题,mapreduce提供的reduce,是可以处理每个key对应的value上的,很方便。但是spark中,由于这种实时拉取的机制,因此提供不了直接处理key对应的values的算子,只能通过groupByKey,先shuffle,产生一个MapPartitionsRDD,然后用map算子,来处理每个key对应的values,就没有mapreduce的计算模型那么方便。
mapreduce怎么避免数据倾斜
是什么:数据倾斜就是数据的key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面
自己实现partition类,用key和value相加取hash值
public int getPartition(K key, V value,int numReduceTasks) {
return (((key).hashCode()+value.hashCode()) & Integer.MAX_VALUE) % numReduceTasks;
}
解决一些报错
ideal操作mapreduce报错
Could not locate executable null\bin\winutils.exe in the Hadoop binaries
点击进入下载winnuts
Exceptionin thread "main" java.lang.UnsatisfiedLinkError:org.apache.hadoop.util.NativeCrc32.nativeCo
把自己下载的hadoop.dll复制到C:\Windows\System32