{"id":1708,"date":"2021-01-31T11:54:37","date_gmt":"2021-01-31T03:54:37","guid":{"rendered":"https:\/\/www.specialwu.com\/?p=1708"},"modified":"2021-01-31T11:54:37","modified_gmt":"2021-01-31T03:54:37","slug":"mapreduce%e6%ba%90%e7%a0%81","status":"publish","type":"post","link":"http:\/\/www.specialwu.com\/?p=1708","title":{"rendered":"Mapreduce\u6e90\u7801"},"content":{"rendered":"<div id=\"ez-toc-container\" class=\"ez-toc-v2_0_61 ez-toc-wrap-center counter-hierarchy ez-toc-counter ez-toc-custom ez-toc-container-direction\">\n<div class=\"ez-toc-title-container\">\n<p class=\"ez-toc-title \" >\u76ee\u5f55<\/p>\n<span class=\"ez-toc-title-toggle\"><a href=\"#\" class=\"ez-toc-pull-right ez-toc-btn ez-toc-btn-xs ez-toc-btn-default ez-toc-toggle\" aria-label=\"Toggle Table of Content\"><span class=\"ez-toc-js-icon-con\"><span class=\"\"><span class=\"eztoc-hide\" style=\"display:none;\">Toggle<\/span><span class=\"ez-toc-icon-toggle-span\"><svg style=\"fill: #000000;color:#000000\" xmlns=\"http:\/\/www.w3.org\/2000\/svg\" class=\"list-377408\" width=\"20px\" height=\"20px\" viewBox=\"0 0 24 24\" fill=\"none\"><path d=\"M6 6H4v2h2V6zm14 0H8v2h12V6zM4 11h2v2H4v-2zm16 0H8v2h12v-2zM4 16h2v2H4v-2zm16 0H8v2h12v-2z\" fill=\"currentColor\"><\/path><\/svg><svg style=\"fill: #000000;color:#000000\" class=\"arrow-unsorted-368013\" xmlns=\"http:\/\/www.w3.org\/2000\/svg\" width=\"10px\" height=\"10px\" viewBox=\"0 0 24 24\" version=\"1.2\" baseProfile=\"tiny\"><path d=\"M18.2 9.3l-6.2-6.3-6.2 6.3c-.2.2-.3.4-.3.7s.1.5.3.7c.2.2.4.3.7.3h11c.3 0 .5-.1.7-.3.2-.2.3-.5.3-.7s-.1-.5-.3-.7zM5.8 14.7l6.2 6.3 6.2-6.3c.2-.2.3-.5.3-.7s-.1-.5-.3-.7c-.2-.2-.4-.3-.7-.3h-11c-.3 0-.5.1-.7.3-.2.2-.3.5-.3.7s.1.5.3.7z\"\/><\/svg><\/span><\/span><\/span><\/a><\/span><\/div>\n<nav><ul class='ez-toc-list ez-toc-list-level-1 ' ><li class='ez-toc-page-1 ez-toc-heading-level-1'><a class=\"ez-toc-link ez-toc-heading-1\" href=\"http:\/\/www.specialwu.com\/?p=1708\/#package_orgapachehadoopmapreduceliboutput\" title=\"package org.apache.hadoop.mapreduce.lib.output\">package org.apache.hadoop.mapreduce.lib.output<\/a><ul class='ez-toc-list-level-2' ><li class='ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-2\" href=\"http:\/\/www.specialwu.com\/?p=1708\/#getSplits%E6%BA%90%E7%A0%81\" title=\"getSplits\u6e90\u7801\">getSplits\u6e90\u7801<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-3\" href=\"http:\/\/www.specialwu.com\/?p=1708\/#FileInputFormat%E6%BA%90%E7%A0%81\" title=\"FileInputFormat\u6e90\u7801\">FileInputFormat\u6e90\u7801<\/a><\/li><li class='ez-toc-page-1 ez-toc-heading-level-2'><a class=\"ez-toc-link ez-toc-heading-4\" href=\"http:\/\/www.specialwu.com\/?p=1708\/#FileOutputFormat%E6%BA%90%E7%A0%81\" title=\"FileOutputFormat\u6e90\u7801\">FileOutputFormat\u6e90\u7801<\/a><\/li><\/ul><\/li><\/ul><\/nav><\/div>\n<h1><span class=\"ez-toc-section\" id=\"package_orgapachehadoopmapreduceliboutput\"><\/span>package org.apache.hadoop.mapreduce.lib.output<span class=\"ez-toc-section-end\"><\/span><\/h1>\n<h2><span class=\"ez-toc-section\" id=\"getSplits%E6%BA%90%E7%A0%81\"><\/span>getSplits\u6e90\u7801<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<pre><code class=\"language-java line-numbers\">\/** \n   * Generate the list of files and make them into FileSplits.\n   * @param job the job context\n   * @throws IOException\n   *\/\n  public List&lt;InputSplit&gt; getSplits(JobContext job) throws IOException {\n    \/\/\u542f\u52a8\u4e00\u4e2a\u5b88\u62a4\u7ebf\u7a0b\u7528\u6765\u76d1\u63a7job\u662f\u5426\u7ed3\u675f\n    StopWatch sw = new StopWatch().start();\n\n    \/\/\u83b7\u53d6\u5207\u7247\u7684\u6700\u5c0f\u5927\u5c0f\n    \/\/       1                    1                        1\n    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));\n\n    \/\/\u83b7\u53d6\u5207\u7247\u7684\u6700\u5927\u5927\u5c0f\n    \/\/ 9223372036854775807\n    long maxSize = getMaxSplitSize(job);\n\n    \/\/ generate splits\n    \/\/ \u521b\u5efa\u4e00\u4e2aList\u7528\u6765\u4fdd\u5b58\u5206\u7247\u4fe1\u606f\u7684\u7ed3\u679c\n    List&lt;InputSplit&gt; splits = new ArrayList&lt;InputSplit&gt;();\n    \/\/ \u901a\u8fc7job\u4e2d\u7684\u8f93\u5165\u8def\u5f84\u83b7\u53d6 job\u9700\u8981\u5904\u7406\u7684\u6240\u6709\u6587\u4ef6\u7684\u8be6\u60c5\n    List&lt;FileStatus&gt; files = listStatus(job);\n    \/\/ \u904d\u5386\u6bcf\u4e2a\u6587\u4ef6 \u8ba1\u7b97\u5206\u7247\u7684\u6570\u91cf\n    for (FileStatus file: files) {\n      \/\/ \u83b7\u53d6\u6587\u4ef6\u8def\u5f84\n      Path path = file.getPath();\n      \/\/ \u83b7\u53d6\u6587\u4ef6\u957f\u5ea6\n      long length = file.getLen();\n      \/\/ \u5982\u679c\u6587\u4ef6\u957f\u5ea6\u4e0d\u4e3a\u96f6\n      if (length != 0) {\n        \/\/\u58f0\u660e\u6570\u7ec4\u53d8\u91cf \u6570\u7ec4\u4e2d\u5b58\u653e\u7684\u662f \u5757\u4fe1\u606f\n        BlockLocation[] blkLocations;\n        \/\/ \u5224\u65adfileStatus\u5bf9\u8c61\u662f\u5426\u662fLocatedFileStatus\u7c7b\u578b\n        if (file instanceof LocatedFileStatus) {\n          \/\/ \u5982\u679c\u662f\uff0c\u5c31\u5f3a\u8f6c\u6210LocatedFileStatus,\n          \/\/ \u5e76\u53d6\u51fa\u5757\u4fe1\u606f\u8d4b\u503c\u7ed9\u4e0a\u9762\u58f0\u660e\u7684\u6570\u7ec4\n          blkLocations = ((LocatedFileStatus) file).getBlockLocations();\n        } else {\n          \/\/ \u5982\u679c \u6587\u4ef6\u72b6\u6001\u4e2d\u4e0d\u5305\u542b\u5757\u4fe1\u606f\uff0c\n          \/\/ \u5c31\u76f4\u63a5\u901a\u8fc7FS\u5bf9\u8c61\u83b7\u53d6\u6587\u4ef6\u7684\u5757\u4fe1\u606f\u3002\n          FileSystem fs = path.getFileSystem(job.getConfiguration());\n          blkLocations = fs.getFileBlockLocations(file, 0, length);\n        }\n\n        \/\/ \u5224\u65ad\u662f\u5426\u8fdb\u884c\u5207\u7247\n        if (isSplitable(job, path)) {\n          \/\/ \u62ff\u5230\u5757\u5927\u5c0f 134217728\n          long blockSize = file.getBlockSize();\n          \/\/   134217728                   134217728     1     9223372036854775807\n          \/\/ \u9ed8\u8ba4\u7684\u5206\u7247\u5927\u5c0f\u548c\u5757\u5927\u5c0f\u76f8\u540c\n          long splitSize = computeSplitSize(blockSize, minSize, maxSize);\n          \/\/bytesRemaining\u5b58\u50a8\u8fd8\u6ca1\u6709\u5206\u914d\u5206\u7247\u7684\u5269\u4f59\u957f\u5ea6\n          \/\/ \u5f00\u59cb\u65f6 \u5c06\u6587\u4ef6\u957f\u5ea6\u8d4b\u503c\u7ed9\u5269\u4f59\u5927\u5c0f\n          long bytesRemaining = length;\n          \/\/               209715200        13421772\n          \/\/               200M      \/       128M   =1.56 &gt; 1.1\n          \/\/               72     \/ 128 &lt; 1.1\n          while (((double) bytesRemaining)\/splitSize &gt; SPLIT_SLOP) {\n            \/\/ \u83b7\u53d6\u5757\u7d22\u5f15\n            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);\n            \/\/\u521b\u5efa\u5206\u7247\u4fe1\u606f\uff0c\u5e76\u4e14\u6dfb\u52a0\u5230List\u4e2d               0                    128M\n            splits.add(makeSplit(path, length-bytesRemaining, splitSize,\n                        blkLocations[blkIndex].getHosts(),\n                        blkLocations[blkIndex].getCachedHosts()));\n            \/\/\u4ece\u5269\u4f59\u5927\u5c0f\u4e2d\u51cf\u53bb\u5206\u7247\u5927\u5c0f\n            bytesRemaining -= splitSize;\n          }\n\n          if (bytesRemaining != 0) {\n            \/\/ \u5c06\u6700\u540e\u4e00\u6bb5\u6570\u636e\u521b\u5efa\u4e00\u4e2a\u5206\u7247\u4fe1\u606f \u52a0\u5165\u5230\u5206\u7247\u7ed3\u679c\u4e2d\n            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);\n            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,\n                       blkLocations[blkIndex].getHosts(),\n                       blkLocations[blkIndex].getCachedHosts()));\n          }\n        } else { \/\/ not splitable\n          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),\n                      blkLocations[0].getCachedHosts()));\n        }\n      } else { \n        \/\/Create empty hosts array for zero length files\n        \/\/ \u5bf9\u4e8e\u6587\u4ef6\u957f\u5ea6\u4e3a0\u7684\u7a7a\u6587\u4ef6\uff0c\u521b\u5efa\u4e00\u4e2a\u7a7a\u7684\u5206\u7247\u4fe1\u606f\n        splits.add(makeSplit(path, 0, length, new String[0]));\n      }\n    }\n    \/\/ Save the number of input files for metrics\/loadgen\n    \/\/ \u5c06\u8ba1\u7b97\u51fa\u7684\u5206\u7247\u6570\u91cf\u4fdd\u5b58\u5728Configuration\n    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());\n    sw.stop();\n    if (LOG.isDebugEnabled()) {\n      LOG.debug(\"Total # of splits generated by getSplits: \" + splits.size()\n          + \", TimeTaken: \" + sw.now(TimeUnit.MILLISECONDS));\n    }\n    return splits;\n  }\n<\/code><\/pre>\n<h2><span class=\"ez-toc-section\" id=\"FileInputFormat%E6%BA%90%E7%A0%81\"><\/span>FileInputFormat\u6e90\u7801<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<pre><code class=\"language-java line-numbers\">\/**\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements.  See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership.  The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License.  You may obtain a copy of the License at\n *\n *     http:\/\/www.apache.org\/licenses\/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n *\/\n\npackage org.apache.hadoop.mapreduce.lib.output;\n\n\/*import com.google.common.collect.Lists;*\/\nimport com.clearspring.analytics.util.Lists;\nimport org.apache.commons.logging.Log;\nimport org.apache.commons.logging.LogFactory;\nimport org.apache.hadoop.classification.InterfaceAudience;\nimport org.apache.hadoop.classification.InterfaceStability;\nimport org.apache.hadoop.conf.Configuration;\nimport org.apache.hadoop.fs.*;\nimport org.apache.hadoop.mapred.LocatedFileStatusFetcher;\nimport org.apache.hadoop.mapreduce.*;\nimport org.apache.hadoop.mapreduce.lib.input.FileSplit;\nimport org.apache.hadoop.mapreduce.lib.input.InvalidInputException;\nimport org.apache.hadoop.mapreduce.security.TokenCache;\nimport org.apache.hadoop.util.ReflectionUtils;\nimport org.apache.hadoop.util.StopWatch;\nimport org.apache.hadoop.util.StringUtils;\n\nimport java.io.IOException;\nimport java.util.ArrayList;\nimport java.util.List;\nimport java.util.concurrent.TimeUnit;\n\n\/** \n * A base class for file-based {@link InputFormat}s.\n * \n * &lt;p&gt;&lt;code&gt;FileInputFormat&lt;\/code&gt; is the base class for all file-based \n * &lt;code&gt;InputFormat&lt;\/code&gt;s. This provides a generic implementation of\n * {@link #getSplits(JobContext)}.\n * Subclasses of &lt;code&gt;FileInputFormat&lt;\/code&gt; can also override the \n * {@link #isSplitable(JobContext, Path)} method to ensure input-files are\n * not split-up and are processed as a whole by {@link Mapper}s.\n *\/\n@InterfaceAudience.Public\n@InterfaceStability.Stable\npublic abstract class FileInputFormat&lt;K, V&gt; extends InputFormat&lt;K, V&gt; {\n  public static final String INPUT_DIR = \n    \"mapreduce.input.fileinputformat.inputdir\";\n  public static final String SPLIT_MAXSIZE = \n    \"mapreduce.input.fileinputformat.split.maxsize\";\n  public static final String SPLIT_MINSIZE = \n    \"mapreduce.input.fileinputformat.split.minsize\";\n  public static final String PATHFILTER_CLASS = \n    \"mapreduce.input.pathFilter.class\";\n  public static final String NUM_INPUT_FILES =\n    \"mapreduce.input.fileinputformat.numinputfiles\";\n  public static final String INPUT_DIR_RECURSIVE =\n    \"mapreduce.input.fileinputformat.input.dir.recursive\";\n  public static final String LIST_STATUS_NUM_THREADS =\n      \"mapreduce.input.fileinputformat.list-status.num-threads\";\n  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;\n\n  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);\n\n  private static final double SPLIT_SLOP = 1.1;   \/\/ 10% slop\n\n  @Deprecated\n  public static enum Counter { \n    BYTES_READ\n  }\n\n  private static final PathFilter hiddenFileFilter = new PathFilter(){\n      public boolean accept(Path p){\n        String name = p.getName(); \n        return !name.startsWith(\"_\") &amp;&amp; !name.startsWith(\".\"); \n      }\n    }; \n\n  \/**\n   * Proxy PathFilter that accepts a path only if all filters given in the\n   * constructor do. Used by the listPaths() to apply the built-in\n   * hiddenFileFilter together with a user provided one (if any).\n   *\/\n  private static class MultiPathFilter implements PathFilter {\n    private List&lt;PathFilter&gt; filters;\n\n    public MultiPathFilter(List&lt;PathFilter&gt; filters) {\n      this.filters = filters;\n    }\n\n    public boolean accept(Path path) {\n      for (PathFilter filter : filters) {\n        if (!filter.accept(path)) {\n          return false;\n        }\n      }\n      return true;\n    }\n  }\n\n  \/**\n   * @param job\n   *          the job to modify\n   * @param inputDirRecursive\n   *\/\n  public static void setInputDirRecursive(Job job,\n      boolean inputDirRecursive) {\n    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,\n        inputDirRecursive);\n  }\n\n  \/**\n   * @param job\n   *          the job to look at.\n   * @return should the files to be read recursively?\n   *\/\n  public static boolean getInputDirRecursive(JobContext job) {\n    return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,\n        false);\n  }\n\n  \/**\n   * Get the lower bound on split size imposed by the format.\n   * @return the number of bytes of the minimal split for this format\n   *\/\n  protected long getFormatMinSplitSize() {\n    return 1;\n  }\n\n  \/**\n   * Is the given filename splitable? Usually, true, but if the file is\n   * stream compressed, it will not be.\n   * \n   * &lt;code&gt;FileInputFormat&lt;\/code&gt; implementations can override this and return\n   * &lt;code&gt;false&lt;\/code&gt; to ensure that individual input files are never split-up\n   * so that {@link Mapper}s process entire files.\n   * \n   * @param context the job context\n   * @param filename the file name to check\n   * @return is this file splitable?\n   *\/\n  protected boolean isSplitable(JobContext context, Path filename) {\n    return true;\n  }\n\n  \/**\n   * Set a PathFilter to be applied to the input paths for the map-reduce job.\n   * @param job the job to modify\n   * @param filter the PathFilter class use for filtering the input paths.\n   *\/\n  public static void setInputPathFilter(Job job,\n                                        Class&lt;? extends PathFilter&gt; filter) {\n    job.getConfiguration().setClass(PATHFILTER_CLASS, filter, \n                                    PathFilter.class);\n  }\n\n  \/**\n   * Set the minimum input split size\n   * @param job the job to modify\n   * @param size the minimum size\n   *\/\n  public static void setMinInputSplitSize(Job job,\n                                          long size) {\n    job.getConfiguration().setLong(SPLIT_MINSIZE, size);\n  }\n\n  \/**\n   * Get the minimum split size\n   * @param job the job\n   * @return the minimum number of bytes that can be in a split\n   *\/\n  public static long getMinSplitSize(JobContext job) {\n    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);\n  }\n\n  \/**\n   * Set the maximum split size\n   * @param job the job to modify\n   * @param size the maximum split size\n   *\/\n  public static void setMaxInputSplitSize(Job job,\n                                          long size) {\n    job.getConfiguration().setLong(SPLIT_MAXSIZE, size);\n  }\n\n  \/**\n   * Get the maximum split size.\n   * @param context the job to look at.\n   * @return the maximum number of bytes a split can include\n   *\/\n  public static long getMaxSplitSize(JobContext context) {\n    return context.getConfiguration().getLong(SPLIT_MAXSIZE, \n                                              Long.MAX_VALUE);\n  }\n\n  \/**\n   * Get a PathFilter instance of the filter set for the input paths.\n   *\n   * @return the PathFilter instance set for the job, NULL if none has been set.\n   *\/\n  public static PathFilter getInputPathFilter(JobContext context) {\n    Configuration conf = context.getConfiguration();\n    Class&lt;?&gt; filterClass = conf.getClass(PATHFILTER_CLASS, null,\n        PathFilter.class);\n    return (filterClass != null) ?\n        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;\n  }\n\n  \/** List input directories.\n   * Subclasses may override to, e.g., select only files matching a regular\n   * expression. \n   * \n   * @param job the job to list input paths for\n   * @return array of FileStatus objects\n   * @throws IOException if zero items.\n   *\/\n  protected List&lt;FileStatus&gt; listStatus(JobContext job\n                                        ) throws IOException {\n    Path[] dirs = getInputPaths(job);\n    if (dirs.length == 0) {\n      throw new IOException(\"No input paths specified in job\");\n    }\n\n    \/\/ get tokens for all the required FileSystems..\n    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, \n                                        job.getConfiguration());\n\n    \/\/ Whether we need to recursive look into the directory structure\n    boolean recursive = getInputDirRecursive(job);\n\n    \/\/ creates a MultiPathFilter with the hiddenFileFilter and the\n    \/\/ user provided one (if any).\n    List&lt;PathFilter&gt; filters = new ArrayList&lt;PathFilter&gt;();\n    filters.add(hiddenFileFilter);\n    PathFilter jobFilter = getInputPathFilter(job);\n    if (jobFilter != null) {\n      filters.add(jobFilter);\n    }\n    PathFilter inputFilter = new MultiPathFilter(filters);\n\n    List&lt;FileStatus&gt; result = null;\n\n    int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,\n        DEFAULT_LIST_STATUS_NUM_THREADS);\n    StopWatch sw = new StopWatch().start();\n    if (numThreads == 1) {\n      result = singleThreadedListStatus(job, dirs, inputFilter, recursive);\n    } else {\n      Iterable&lt;FileStatus&gt; locatedFiles = null;\n      try {\n        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(\n            job.getConfiguration(), dirs, recursive, inputFilter, true);\n        locatedFiles = locatedFileStatusFetcher.getFileStatuses();\n      } catch (InterruptedException e) {\n        throw new IOException(\"Interrupted while getting file statuses\");\n      }\n      result = Lists.newArrayList(locatedFiles);\n    }\n\n    sw.stop();\n    if (LOG.isDebugEnabled()) {\n      LOG.debug(\"Time taken to get FileStatuses: \"\n          + sw.now(TimeUnit.MILLISECONDS));\n    }\n    LOG.info(\"Total input paths to process : \" + result.size()); \n    return result;\n  }\n\n  private List&lt;FileStatus&gt; singleThreadedListStatus(JobContext job, Path[] dirs,\n      PathFilter inputFilter, boolean recursive) throws IOException {\n    List&lt;FileStatus&gt; result = new ArrayList&lt;FileStatus&gt;();\n    List&lt;IOException&gt; errors = new ArrayList&lt;IOException&gt;();\n    for (int i=0; i &lt; dirs.length; ++i) {\n      Path p = dirs[i];\n      FileSystem fs = p.getFileSystem(job.getConfiguration()); \n      FileStatus[] matches = fs.globStatus(p, inputFilter);\n      if (matches == null) {\n        errors.add(new IOException(\"Input path does not exist: \" + p));\n      } else if (matches.length == 0) {\n        errors.add(new IOException(\"Input Pattern \" + p + \" matches 0 files\"));\n      } else {\n        for (FileStatus globStat: matches) {\n          if (globStat.isDirectory()) {\n            RemoteIterator&lt;LocatedFileStatus&gt; iter =\n                fs.listLocatedStatus(globStat.getPath());\n            while (iter.hasNext()) {\n              LocatedFileStatus stat = iter.next();\n              if (inputFilter.accept(stat.getPath())) {\n                if (recursive &amp;&amp; stat.isDirectory()) {\n                  addInputPathRecursively(result, fs, stat.getPath(),\n                      inputFilter);\n                } else {\n                  result.add(stat);\n                }\n              }\n            }\n          } else {\n            result.add(globStat);\n          }\n        }\n      }\n    }\n\n    if (!errors.isEmpty()) {\n      throw new InvalidInputException(errors);\n    }\n    return result;\n  }\n\n  \/**\n   * Add files in the input path recursively into the results.\n   * @param result\n   *          The List to store all files.\n   * @param fs\n   *          The FileSystem.\n   * @param path\n   *          The input path.\n   * @param inputFilter\n   *          The input filter that can be used to filter files\/dirs. \n   * @throws IOException\n   *\/\n  protected void addInputPathRecursively(List&lt;FileStatus&gt; result,\n      FileSystem fs, Path path, PathFilter inputFilter) \n      throws IOException {\n    RemoteIterator&lt;LocatedFileStatus&gt; iter = fs.listLocatedStatus(path);\n    while (iter.hasNext()) {\n      LocatedFileStatus stat = iter.next();\n      if (inputFilter.accept(stat.getPath())) {\n        if (stat.isDirectory()) {\n          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);\n        } else {\n          result.add(stat);\n        }\n      }\n    }\n  }\n\n\n  \/**\n   * A factory that makes the split for this class. It can be overridden\n   * by sub-classes to make sub-types\n   *\/\n  protected FileSplit makeSplit(Path file, long start, long length,\n                                String[] hosts) {\n    return new FileSplit(file, start, length, hosts);\n  }\n\n  \/**\n   * A factory that makes the split for this class. It can be overridden\n   * by sub-classes to make sub-types\n   *\/\n  protected FileSplit makeSplit(Path file, long start, long length, \n                                String[] hosts, String[] inMemoryHosts) {\n    return new FileSplit(file, start, length, hosts, inMemoryHosts);\n  }\n\n  \/** \n   * Generate the list of files and make them into FileSplits.\n   * @param job the job context\n   * @throws IOException\n   *\/\n  public List&lt;InputSplit&gt; getSplits(JobContext job) throws IOException {\n    \/\/\u542f\u52a8\u4e00\u4e2a\u5b88\u62a4\u7ebf\u7a0b\u7528\u6765\u76d1\u63a7job\u662f\u5426\u7ed3\u675f\n    StopWatch sw = new StopWatch().start();\n\n    \/\/\u83b7\u53d6\u5207\u7247\u7684\u6700\u5c0f\u5927\u5c0f\n    \/\/       1                    1                        1\n    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));\n\n    \/\/\u83b7\u53d6\u5207\u7247\u7684\u6700\u5927\u5927\u5c0f\n    \/\/ 9223372036854775807\n    long maxSize = getMaxSplitSize(job);\n\n    \/\/ generate splits\n    \/\/ \u521b\u5efa\u4e00\u4e2aList\u7528\u6765\u4fdd\u5b58\u5206\u7247\u4fe1\u606f\u7684\u7ed3\u679c\n    List&lt;InputSplit&gt; splits = new ArrayList&lt;InputSplit&gt;();\n    \/\/ \u901a\u8fc7job\u4e2d\u7684\u8f93\u5165\u8def\u5f84\u83b7\u53d6 job\u9700\u8981\u5904\u7406\u7684\u6240\u6709\u6587\u4ef6\u7684\u8be6\u60c5\n    List&lt;FileStatus&gt; files = listStatus(job);\n    \/\/ \u904d\u5386\u6bcf\u4e2a\u6587\u4ef6 \u8ba1\u7b97\u5206\u7247\u7684\u6570\u91cf\n    for (FileStatus file: files) {\n      \/\/ \u83b7\u53d6\u6587\u4ef6\u8def\u5f84\n      Path path = file.getPath();\n      \/\/ \u83b7\u53d6\u6587\u4ef6\u957f\u5ea6\n      long length = file.getLen();\n      \/\/ \u5982\u679c\u6587\u4ef6\u957f\u5ea6\u4e0d\u4e3a\u96f6\n      if (length != 0) {\n        \/\/\u58f0\u660e\u6570\u7ec4\u53d8\u91cf \u6570\u7ec4\u4e2d\u5b58\u653e\u7684\u662f \u5757\u4fe1\u606f\n        BlockLocation[] blkLocations;\n        \/\/ \u5224\u65adfileStatus\u5bf9\u8c61\u662f\u5426\u662fLocatedFileStatus\u7c7b\u578b\n        if (file instanceof LocatedFileStatus) {\n          \/\/ \u5982\u679c\u662f\uff0c\u5c31\u5f3a\u8f6c\u6210LocatedFileStatus,\n          \/\/ \u5e76\u53d6\u51fa\u5757\u4fe1\u606f\u8d4b\u503c\u7ed9\u4e0a\u9762\u58f0\u660e\u7684\u6570\u7ec4\n          blkLocations = ((LocatedFileStatus) file).getBlockLocations();\n        } else {\n          \/\/ \u5982\u679c \u6587\u4ef6\u72b6\u6001\u4e2d\u4e0d\u5305\u542b\u5757\u4fe1\u606f\uff0c\n          \/\/ \u5c31\u76f4\u63a5\u901a\u8fc7FS\u5bf9\u8c61\u83b7\u53d6\u6587\u4ef6\u7684\u5757\u4fe1\u606f\u3002\n          FileSystem fs = path.getFileSystem(job.getConfiguration());\n          blkLocations = fs.getFileBlockLocations(file, 0, length);\n        }\n\n        \/\/ \u5224\u65ad\u662f\u5426\u8fdb\u884c\u5207\u7247\n        if (isSplitable(job, path)) {\n          \/\/ \u62ff\u5230\u5757\u5927\u5c0f 134217728\n          long blockSize = file.getBlockSize();\n          \/\/   134217728                   134217728     1     9223372036854775807\n          \/\/ \u9ed8\u8ba4\u7684\u5206\u7247\u5927\u5c0f\u548c\u5757\u5927\u5c0f\u76f8\u540c\n          long splitSize = computeSplitSize(blockSize, minSize, maxSize);\n          \/\/bytesRemaining\u5b58\u50a8\u8fd8\u6ca1\u6709\u5206\u914d\u5206\u7247\u7684\u5269\u4f59\u957f\u5ea6\n          \/\/ \u5f00\u59cb\u65f6 \u5c06\u6587\u4ef6\u957f\u5ea6\u8d4b\u503c\u7ed9\u5269\u4f59\u5927\u5c0f\n          long bytesRemaining = length;\n          \/\/               209715200        13421772\n          \/\/               200M      \/       128M   =1.56 &gt; 1.1\n          \/\/               72     \/ 128 &lt; 1.1\n          while (((double) bytesRemaining)\/splitSize &gt; SPLIT_SLOP) {\n            \/\/ \u83b7\u53d6\u5757\u7d22\u5f15\n            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);\n            \/\/\u521b\u5efa\u5206\u7247\u4fe1\u606f\uff0c\u5e76\u4e14\u6dfb\u52a0\u5230List\u4e2d               0                    128M\n            splits.add(makeSplit(path, length-bytesRemaining, splitSize,\n                        blkLocations[blkIndex].getHosts(),\n                        blkLocations[blkIndex].getCachedHosts()));\n            \/\/\u4ece\u5269\u4f59\u5927\u5c0f\u4e2d\u51cf\u53bb\u5206\u7247\u5927\u5c0f\n            bytesRemaining -= splitSize;\n          }\n\n          if (bytesRemaining != 0) {\n            \/\/ \u5c06\u6700\u540e\u4e00\u6bb5\u6570\u636e\u521b\u5efa\u4e00\u4e2a\u5206\u7247\u4fe1\u606f \u52a0\u5165\u5230\u5206\u7247\u7ed3\u679c\u4e2d\n            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);\n            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,\n                       blkLocations[blkIndex].getHosts(),\n                       blkLocations[blkIndex].getCachedHosts()));\n          }\n        } else { \/\/ not splitable\n          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),\n                      blkLocations[0].getCachedHosts()));\n        }\n      } else { \n        \/\/Create empty hosts array for zero length files\n        \/\/ \u5bf9\u4e8e\u6587\u4ef6\u957f\u5ea6\u4e3a0\u7684\u7a7a\u6587\u4ef6\uff0c\u521b\u5efa\u4e00\u4e2a\u7a7a\u7684\u5206\u7247\u4fe1\u606f\n        splits.add(makeSplit(path, 0, length, new String[0]));\n      }\n    }\n    \/\/ Save the number of input files for metrics\/loadgen\n    \/\/ \u5c06\u8ba1\u7b97\u51fa\u7684\u5206\u7247\u6570\u91cf\u4fdd\u5b58\u5728Configuration\n    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());\n    sw.stop();\n    if (LOG.isDebugEnabled()) {\n      LOG.debug(\"Total # of splits generated by getSplits: \" + splits.size()\n          + \", TimeTaken: \" + sw.now(TimeUnit.MILLISECONDS));\n    }\n    return splits;\n  }\n\n  protected long computeSplitSize(long blockSize, long minSize,\n                                  long maxSize) {\n    return Math.max(minSize, Math.min(maxSize, blockSize));\n  }\n\n  protected int getBlockIndex(BlockLocation[] blkLocations, \n                              long offset) {\n    for (int i = 0 ; i &lt; blkLocations.length; i++) {\n      \/\/ is the offset inside this block?\n      if ((blkLocations[i].getOffset() &lt;= offset) &amp;&amp;\n          (offset &lt; blkLocations[i].getOffset() + blkLocations[i].getLength())){\n        return i;\n      }\n    }\n    BlockLocation last = blkLocations[blkLocations.length -1];\n    long fileLength = last.getOffset() + last.getLength() -1;\n    throw new IllegalArgumentException(\"Offset \" + offset + \n                                       \" is outside of file (0..\" +\n                                       fileLength + \")\");\n  }\n\n  \/**\n   * Sets the given comma separated paths as the list of inputs \n   * for the map-reduce job.\n   * \n   * @param job the job\n   * @param commaSeparatedPaths Comma separated paths to be set as \n   *        the list of inputs for the map-reduce job.\n   *\/\n  public static void setInputPaths(Job job, \n                                   String commaSeparatedPaths\n                                   ) throws IOException {\n    setInputPaths(job, StringUtils.stringToPath(\n                        getPathStrings(commaSeparatedPaths)));\n  }\n\n  \/**\n   * Add the given comma separated paths to the list of inputs for\n   *  the map-reduce job.\n   * \n   * @param job The job to modify\n   * @param commaSeparatedPaths Comma separated paths to be added to\n   *        the list of inputs for the map-reduce job.\n   *\/\n  public static void addInputPaths(Job job, \n                                   String commaSeparatedPaths\n                                   ) throws IOException {\n    for (String str : getPathStrings(commaSeparatedPaths)) {\n      addInputPath(job, new Path(str));\n    }\n  }\n\n  \/**\n   * Set the array of {@link Path}s as the list of inputs\n   * for the map-reduce job.\n   * \n   * @param job The job to modify \n   * @param inputPaths the {@link Path}s of the input directories\/files \n   * for the map-reduce job.\n   *\/ \n  public static void setInputPaths(Job job, \n                                   Path... inputPaths) throws IOException {\n    Configuration conf = job.getConfiguration();\n    Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);\n    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));\n    for(int i = 1; i &lt; inputPaths.length;i++) {\n      str.append(StringUtils.COMMA_STR);\n      path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);\n      str.append(StringUtils.escapeString(path.toString()));\n    }\n    conf.set(INPUT_DIR, str.toString());\n  }\n\n  \/**\n   * Add a {@link Path} to the list of inputs for the map-reduce job.\n   * \n   * @param job The {@link Job} to modify\n   * @param path {@link Path} to be added to the list of inputs for \n   *            the map-reduce job.\n   *\/\n  public static void addInputPath(Job job, \n                                  Path path) throws IOException {\n    Configuration conf = job.getConfiguration();\n    path = path.getFileSystem(conf).makeQualified(path);\n    String dirStr = StringUtils.escapeString(path.toString());\n    String dirs = conf.get(INPUT_DIR);\n    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + \",\" + dirStr);\n  }\n\n  \/\/ This method escapes commas in the glob pattern of the given paths.\n  private static String[] getPathStrings(String commaSeparatedPaths) {\n    int length = commaSeparatedPaths.length();\n    int curlyOpen = 0;\n    int pathStart = 0;\n    boolean globPattern = false;\n    List&lt;String&gt; pathStrings = new ArrayList&lt;String&gt;();\n\n    for (int i=0; i&lt;length; i++) {\n      char ch = commaSeparatedPaths.charAt(i);\n      switch(ch) {\n        case '{' : {\n          curlyOpen++;\n          if (!globPattern) {\n            globPattern = true;\n          }\n          break;\n        }\n        case '}' : {\n          curlyOpen--;\n          if (curlyOpen == 0 &amp;&amp; globPattern) {\n            globPattern = false;\n          }\n          break;\n        }\n        case ',' : {\n          if (!globPattern) {\n            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));\n            pathStart = i + 1 ;\n          }\n          break;\n        }\n        default:\n          continue; \/\/ nothing special to do for this character\n      }\n    }\n    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));\n\n    return pathStrings.toArray(new String[0]);\n  }\n\n  \/**\n   * Get the list of input {@link Path}s for the map-reduce job.\n   * \n   * @param context The job\n   * @return the list of input {@link Path}s for the map-reduce job.\n   *\/\n  public static Path[] getInputPaths(JobContext context) {\n    String dirs = context.getConfiguration().get(INPUT_DIR, \"\");\n    String [] list = StringUtils.split(dirs);\n    Path[] result = new Path[list.length];\n    for (int i = 0; i &lt; list.length; i++) {\n      result[i] = new Path(StringUtils.unEscapeString(list[i]));\n    }\n    return result;\n  }\n\n}\n<\/code><\/pre>\n<h2><span class=\"ez-toc-section\" id=\"FileOutputFormat%E6%BA%90%E7%A0%81\"><\/span>FileOutputFormat\u6e90\u7801<span class=\"ez-toc-section-end\"><\/span><\/h2>\n<pre><code class=\"language-java line-numbers\">package org.apache.hadoop.mapreduce.lib.output;\n\nimport org.apache.hadoop.classification.InterfaceAudience;\nimport org.apache.hadoop.classification.InterfaceStability;\nimport org.apache.hadoop.conf.Configuration;\nimport org.apache.hadoop.fs.FileSystem;\nimport org.apache.hadoop.fs.Path;\nimport org.apache.hadoop.io.compress.CompressionCodec;\nimport org.apache.hadoop.mapred.FileAlreadyExistsException;\nimport org.apache.hadoop.mapred.InvalidJobConfException;\nimport org.apache.hadoop.mapreduce.*;\nimport org.apache.hadoop.mapreduce.security.TokenCache;\n\nimport java.io.IOException;\nimport java.text.NumberFormat;\n\n\/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*\/\n@InterfaceAudience.Public\n@InterfaceStability.Stable\npublic abstract class FileOutputFormat&lt;K, V&gt; extends OutputFormat&lt;K, V&gt; {\n\n  \/** Construct output file names so that, when an output directory listing is\n   * sorted lexicographically, positions correspond to output partitions.*\/\n  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();\n  protected static final String BASE_OUTPUT_NAME = \"mapreduce.output.basename\";\n  protected static final String PART = \"part\";\n  static {\n    NUMBER_FORMAT.setMinimumIntegerDigits(5);\n    NUMBER_FORMAT.setGroupingUsed(false);\n  }\n  private FileOutputCommitter committer = null;\npublic static final String COMPRESS =\"mapreduce.output.fileoutputformat.compress\";\npublic static final String COMPRESS_CODEC = \n\"mapreduce.output.fileoutputformat.compress.codec\";\npublic static final String COMPRESS_TYPE = \"mapreduce.output.fileoutputformat.compress.type\";\npublic static final String OUTDIR = \"mapreduce.output.fileoutputformat.outputdir\";\n\n  @Deprecated\n  public static enum Counter {\n    BYTES_WRITTEN\n  }\n\n  \/**\n   * Set whether the output of the job is compressed.\n   * @param job the job to modify\n   * @param compress should the output of the job be compressed?\n   *\/\n  public static void setCompressOutput(Job job, boolean compress) {\n    job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);\n  }\n\n  \/**\n   * Is the job output compressed?\n   * @param job the Job to look in\n   * @return &lt;code&gt;true&lt;\/code&gt; if the job output should be compressed,\n   *         &lt;code&gt;false&lt;\/code&gt; otherwise\n   *\/\n  public static boolean getCompressOutput(JobContext job) {\n    return job.getConfiguration().getBoolean(\n      FileOutputFormat.COMPRESS, false);\n  }\n\n  \/**\n   * Set the {@link CompressionCodec} to be used to compress job outputs.\n   * @param job the job to modify\n   * @param codecClass the {@link CompressionCodec} to be used to\n   *                   compress the job outputs\n   *\/\n  public static void \n  setOutputCompressorClass(Job job, \n                           Class&lt;? extends CompressionCodec&gt; codecClass) {\n    setCompressOutput(job, true);\n    job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC, \n                                    codecClass, \n                                    CompressionCodec.class);\n  }\n\n  \/**\n   * Get the {@link CompressionCodec} for compressing the job outputs.\n   * @param job the {@link Job} to look in\n   * @param defaultValue the {@link CompressionCodec} to return if not set\n   * @return the {@link CompressionCodec} to be used to compress the \n   *         job outputs\n   * @throws IllegalArgumentException if the class was specified, but not found\n   *\/\n  public static Class&lt;? extends CompressionCodec&gt; \n  getOutputCompressorClass(JobContext job, \n                               Class&lt;? extends CompressionCodec&gt; defaultValue) {\n    Class&lt;? extends CompressionCodec&gt; codecClass = defaultValue;\n    Configuration conf = job.getConfiguration();\n    String name = conf.get(FileOutputFormat.COMPRESS_CODEC);\n    if (name != null) {\n      try {\n        codecClass = \n            conf.getClassByName(name).asSubclass(CompressionCodec.class);\n      } catch (ClassNotFoundException e) {\n        throw new IllegalArgumentException(\"Compression codec \" + name + \n                                           \" was not found.\", e);\n      }\n    }\n    return codecClass;\n  }\n\n  public abstract RecordWriter&lt;K, V&gt; \n     getRecordWriter(TaskAttemptContext job\n                     ) throws IOException, InterruptedException;\n\n  public void checkOutputSpecs(JobContext job\n                               ) throws FileAlreadyExistsException, IOException{\n    \/\/ Ensure that the output directory is set and not already there\n    Path outDir = getOutputPath(job);\n    if (outDir == null) {\n      throw new InvalidJobConfException(\"Output directory not set.\");\n    }\n\n    \/\/ get delegation token for outDir's file system\n    TokenCache.obtainTokensForNamenodes(job.getCredentials(),\n        new Path[] { outDir }, job.getConfiguration());\n\n    if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {\n      outDir.getFileSystem(job.getConfiguration()).delete(outDir,true);\n\/\/      throw new FileAlreadyExistsException(\"Output directory \" + outDir +\n\/\/                                           \" already exists\");\n\/\/      System.out.println(\"\u8f93\u51fa\u76ee\u5f55\u5df2\u5b58\u5728\uff0c\u6211\u77e5\u9053\uff0c\u6211\u4e0d\u7ba1\");\n    }\n  }\n\n  \/**\n   * Set the {@link Path} of the output directory for the map-reduce job.\n   *\n   * @param job The job to modify\n   * @param outputDir the {@link Path} of the output directory for \n   * the map-reduce job.\n   *\/\n  public static void setOutputPath(Job job, Path outputDir) {\n    try {\n      outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(\n          outputDir);\n    } catch (IOException e) {\n        \/\/ Throw the IOException as a RuntimeException to be compatible with MR1\n        throw new RuntimeException(e);\n    }\n    job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());\n  }\n\n  \/**\n   * Get the {@link Path} to the output directory for the map-reduce job.\n   * \n   * @return the {@link Path} to the output directory for the map-reduce job.\n   * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)\n   *\/\n  public static Path getOutputPath(JobContext job) {\n    String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);\n    return name == null ? null: new Path(name);\n  }\n\n  \/**\n   *  Get the {@link Path} to the task's temporary output directory \n   *  for the map-reduce job\n   *  \n   * &lt;b id=\"SideEffectFiles\"&gt;Tasks' Side-Effect Files&lt;\/b&gt;\n   * \n   * &lt;p&gt;Some applications need to create\/write-to side-files, which differ from\n   * the actual job-outputs.\n   * \n   * &lt;p&gt;In such cases there could be issues with 2 instances of the same TIP \n   * (running simultaneously e.g. speculative tasks) trying to open\/write-to the\n   * same file (path) on HDFS. Hence the application-writer will have to pick \n   * unique names per task-attempt (e.g. using the attemptid, say \n   * &lt;tt&gt;attempt_200709221812_0001_m_000000_0&lt;\/tt&gt;), not just per TIP.&lt;\/p&gt; \n   * \n   * &lt;p&gt;To get around this the Map-Reduce framework helps the application-writer \n   * out by maintaining a special \n   * &lt;tt&gt;<span class=\"katex math inline\">{mapreduce.output.fileoutputformat.outputdir}\/_temporary\/_<\/span>{taskid}&lt;\/tt&gt; \n   * sub-directory for each task-attempt on HDFS where the output of the \n   * task-attempt goes. On successful completion of the task-attempt the files \n   * in the &lt;tt&gt;<span class=\"katex math inline\">{mapreduce.output.fileoutputformat.outputdir}\/_temporary\/_<\/span>{taskid}&lt;\/tt&gt; (only) \n   * are &lt;i&gt;promoted&lt;\/i&gt; to &lt;tt&gt;<span class=\"katex math inline\">{mapreduce.output.fileoutputformat.outputdir}&lt;\/tt&gt;. Of course, the   * framework discards the sub-directory of unsuccessful task-attempts. This   * is completely transparent to the application.&lt;\/p&gt;\n   *   * &lt;p&gt;The application-writer can take advantage of this by creating any   * side-files required in a work directory during execution   * of his task i.e. via   * {@link #getWorkOutputPath(TaskInputOutputContext)}, and\n   * the framework will move them out similarly - thus she doesn't have to pick   * unique paths per task-attempt.&lt;\/p&gt;\n   *   * &lt;p&gt;The entire discussion holds true for maps of jobs with   * reducer=NONE (i.e. 0 reduces) since output of the map, in that case,   * goes directly to HDFS.&lt;\/p&gt;   *   * @return the {@link Path} to the task's temporary output directory   * for the map-reduce job.\n   *\/\n  public static Path getWorkOutputPath(TaskInputOutputContext&lt;?,?,?,?&gt; context\n                                       ) throws IOException,                                                InterruptedException {\n    FileOutputCommitter committer = (FileOutputCommitter)      context.getOutputCommitter();\n    return committer.getWorkPath();\n  }\n\n  \/**\n   * Helper function to generate a {@link Path} for a file that is unique for\n   * the task within the job output directory.\n   *\n   * &lt;p&gt;The path can be used to create custom files from within the map and\n   * reduce tasks. The path name will be unique for each task. The path parent\n   * will be the job output directory.&lt;\/p&gt;ls\n   *\n   * &lt;p&gt;This method uses the {@link #getUniqueFile} method to make the file name\n   * unique for the task.&lt;\/p&gt;\n   *\n   * @param context the context for the task.\n   * @param name the name for the file.\n   * @param extension the extension for the file\n   * @return a unique path accross all tasks of the job.\n   *\/\n  public  static Path getPathForWorkFile(TaskInputOutputContext&lt;?,?,?,?&gt; context,                                 String name,\n                                 String extension\n                                ) throws IOException, InterruptedException {\n    return new Path(getWorkOutputPath(context),\n                    getUniqueFile(context, name, extension));\n  }\n\n  \/**\n   * Generate a unique filename, based on the task id, name, and extension\n   * @param context the task that is calling this\n   * @param name the base filename\n   * @param extension the filename extension\n   * @return a string like<\/span>name-[mrsct]-<span class=\"katex math inline\">id<\/span>extension\n   *\/\n  public synchronized static String getUniqueFile(TaskAttemptContext context,\n                                                  String name,\n                                                  String extension) {\n    TaskID taskId = context.getTaskAttemptID().getTaskID();\n    int partition = taskId.getId();\n    StringBuilder result = new StringBuilder();\n    result.append(name);\n    result.append('-');\n    result.append(\n        TaskID.getRepresentingCharacter(taskId.getTaskType()));\n    result.append('-');\n    result.append(NUMBER_FORMAT.format(partition));\n    result.append(extension);\n    return result.toString();\n  }\n\n  \/**\n   * Get the default path and filename for the output format.\n   * @param context the task context\n   * @param extension an extension to add to the filename\n   * @return a full path <span class=\"katex math inline\">output\/_temporary\/<\/span>taskid\/part-[mr]-$id\n   * @throws IOException\n   *\/\n  public Path getDefaultWorkFile(TaskAttemptContext context,\n                                 String extension) throws IOException{\n    FileOutputCommitter committer = \n      (FileOutputCommitter) getOutputCommitter(context);\n    return new Path(committer.getWorkPath(), getUniqueFile(context, \n      getOutputName(context), extension));\n  }\n\n  \/**\n   * Get the base output name for the output file.\n   *\/\n  protected static String getOutputName(JobContext job) {\n    return job.getConfiguration().get(BASE_OUTPUT_NAME, PART);\n  }\n\n  \/**\n   * Set the base output name for output file to be created.\n   *\/\n  protected static void setOutputName(JobContext job, String name) {\n    job.getConfiguration().set(BASE_OUTPUT_NAME, name);\n  }\n\n  public synchronized \n     OutputCommitter getOutputCommitter(TaskAttemptContext context\n                                        ) throws IOException {\n    if (committer == null) {\n      Path output = getOutputPath(context);\n      committer = new FileOutputCommitter(output, context);\n    }\n    return committer;\n  }\n}\n\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>package org.apache.hadoop.mapreduce.lib.output getSplit [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[7],"tags":[],"_links":{"self":[{"href":"http:\/\/www.specialwu.com\/index.php?rest_route=\/wp\/v2\/posts\/1708"}],"collection":[{"href":"http:\/\/www.specialwu.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/www.specialwu.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/www.specialwu.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/www.specialwu.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1708"}],"version-history":[{"count":1,"href":"http:\/\/www.specialwu.com\/index.php?rest_route=\/wp\/v2\/posts\/1708\/revisions"}],"predecessor-version":[{"id":1709,"href":"http:\/\/www.specialwu.com\/index.php?rest_route=\/wp\/v2\/posts\/1708\/revisions\/1709"}],"wp:attachment":[{"href":"http:\/\/www.specialwu.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1708"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/www.specialwu.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1708"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/www.specialwu.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1708"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}