无情 @ 2015-12-15 12:48:23 阅读(1526)
Hadoop mapreduce



1:windows下载安装相关工具(这里是64x的win操作系统32位可能会出现调试不通


服务端安装配置 http://www.ccblog.cn/62.htm


a)安装jdk1.7 并且配置环境变量


b)下hadoop-2.7.1到本地硬盘,然后解压  http://mirror.bit.edu.cn/apache/hadoop/common/hadoop-2.7.1/hadoop-2.7.1.tar.gz


c)下载windows插件 https://codeload.github.com/SweetInk/hadoop-common-2.7.1-bin/zip/master

下载完成后把里面的winutils.exe 放到hadoop-2.7.1的bin目录下,把hadoop.dll 放到C:\Windows\System32\下面


b)配置

添加环境变量HADOOP_HOME=C:\hadoop-2.7.1\ 

追加环境变量path内容:%HADOOP_HOME%/bin 



2:eclipse插件安装

a)将下载的插件hadoop-eclipse-plugin-2.7.1.jar放到clipse安装目录的plugins文件夹中,如果重新打开eclipse后看到有如下视图,则说明你的hadoop插件已经安装成功了:

b) 配置eclipse中的Map/Reduce Locations,如下图所示:



3:提交job


注意:这里需要在服务端开启jobtracker配置mapred-site.xml 添加如下:

  <property>      
        <name>mapred.job.tracker</name>    
        <value>192.168.1.200:9001</value>    
  </property>


创建工程本人使用的是maven,pom文件内容如下:(这编译的时候有点慢的,因为依赖的jar比较多)

<dependency>
        <groupId>jdk.tools</groupId>
        <artifactId>jdk.tools</artifactId>
        <version>1.7</version>
        <scope>system</scope>
        <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
    </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>


代码如下


import java.io.IOException;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
    public static class TokenizerMapper extends Mapper {
        private final static IntWritable one  = new IntWritable(1);
        private Text                     word = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    public static class IntSumReducer extends Reducer {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable values, Context context) throws IOException,
                                                                                    InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    
        
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://192.168.1.200:9000/demo/input/file1.txt"));//file1.txt文件需要提前上传
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.200:9000/demo/output/count"));//如果count已经存在会报错
        boolean isSuccess = job.waitForCompletion(true);//返回true说明成功了
        System.out.println(isSuccess);
    }
}