
任务5.1筛选日志文件并生成序列化文件


编写代码

添加import

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

添加Mapper类

public static class SelectDataMapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,Text,Text>.Context context)
throws IOException, InterruptedException {
String[] val=value.toString().split(",");
if(val[1].contains("2016-01") || val[1].contains("2016-02")){
context.write(new Text(val[0]),new Text(val[1]));
}
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,Text,Text>.Context context)
throws IOException, InterruptedException {
String[] val=value.toString().split(",");
if(val[1].contains("2016-01") || val[1].contains("2016-02")){
context.write(new Text(val[0]),new Text(val[1]));
}
}
}

添加Driver代码

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs = new String[] { "/user/myname/user_login.txt", "/user/myname/output_SelectData" };
} // myname要改为自已的姓名拼音
Job job = Job.getInstance(conf, "selectdata");
job.setJarByClass(SelectData.class);
job.setMapperClass(SelectDataMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);// 设置输入格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置输出格式
job.setNumReduceTasks(0);// 设置Reducer任务数为0
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs = new String[] { "/user/myname/user_login.txt", "/user/myname/output_SelectData" };
} // myname要改为自已的姓名拼音
Job job = Job.getInstance(conf, "selectdata");
job.setJarByClass(SelectData.class);
job.setMapperClass(SelectDataMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);// 设置输入格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置输出格式
job.setNumReduceTasks(0);// 设置Reducer任务数为0
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}

任务5.2Hadoop Java API读取序列化日志文件

获取测试数据

FileSystemAPI实例

S1_ListDir列出文件夹

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/limm/");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹还是文件
if(file.isDirectory()){
System.out.println("Dir:" +file.getPath().toString());
}
else if (file.isFile())
{
System.out.println("File:" +file.getPath().toString());
}
else
{
System.out.println("Other:" +file.getPath().toString());
}
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/limm/");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹还是文件
if(file.isDirectory()){
System.out.println("Dir:" +file.getPath().toString());
}
else if (file.isFile())
{
System.out.println("File:" +file.getPath().toString());
}
else
{
System.out.println("Other:" +file.getPath().toString());
}
}
}
}

S2_CreateDir创建目录

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
////声明创建的目录
Path path=new Path("/user/myname/temp"); //myname改为本人
//调用mkdirs函数创建目录
fs.mkdirs(path);
//关闭文件
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
////声明创建的目录
Path path=new Path("/user/myname/temp"); //myname改为本人
//调用mkdirs函数创建目录
fs.mkdirs(path);
//关闭文件
fs.close();
}
}

S3_CopyToLocal下载文件

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyToLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明源文件路径和目标路径
Path fromPath=new Path("/user/myname/user_login.txt");
Path toPath=new Path("D:/tmp");
//调用copyToLocalFile方法下载文件到本地
fs.copyToLocalFile(false, fromPath, toPath, true);
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyToLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明源文件路径和目标路径
Path fromPath=new Path("/user/myname/user_login.txt");
Path toPath=new Path("D:/tmp");
//调用copyToLocalFile方法下载文件到本地
fs.copyToLocalFile(false, fromPath, toPath, true);
//关闭文件系统
fs.close();
}
}

S4_CopyFromLocal上传文件

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFromLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fileSystem = FileSystem.get(conf);
//FileSystem fileSystem = FileSystem.get(URI.create("hdfs://home.hddly.cn:9864/"),conf,"myname");
//声明源文件路径和目标路径
Path fromPath = new Path("D:/tmp/user_login.txt");
Path toPath = new Path("/user/myname/temp/user_log.txt");
//调用copyFromLocalFile方法上传文件
fileSystem.copyFromLocalFile(fromPath,toPath);
//关闭文件系统
fileSystem.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFromLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fileSystem = FileSystem.get(conf);
//FileSystem fileSystem = FileSystem.get(URI.create("hdfs://home.hddly.cn:9864/"),conf,"myname");
//声明源文件路径和目标路径
Path fromPath = new Path("D:/tmp/user_login.txt");
Path toPath = new Path("/user/myname/temp/user_log.txt");
//调用copyFromLocalFile方法上传文件
fileSystem.copyFromLocalFile(fromPath,toPath);
//关闭文件系统
fileSystem.close();
}
}

S5_CatFile读写文件

public class CatFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明查看的路径
Path path=new Path("/user/myname/temp/user_log.txt");
//创建新文件
Path newPath=new Path("/user/myname/temp/new_user_log.txt");
fs.delete(newPath,true);
FSDataOutputStream os=fs.create(newPath);
//获取指定文件的数据字节流
FSDataInputStream is=fs.open(path);
//读取文件内容并写入到新文件
BufferedReader br=new BufferedReader(new InputStreamReader(is,"utf-8"));
BufferedWriter bw=new BufferedWriter(new OutputStreamWriter(os,"utf-8"));
String line="";
while((line=br.readLine())!=null){
bw.write(line);
bw.newLine();
}
//关闭数据字节流
bw.close();
os.close();
br.close();
is.close();
//关闭文件系统
fs.close();
}
}
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明查看的路径
Path path=new Path("/user/myname/temp/user_log.txt");
//创建新文件
Path newPath=new Path("/user/myname/temp/new_user_log.txt");
fs.delete(newPath,true);
FSDataOutputStream os=fs.create(newPath);
//获取指定文件的数据字节流
FSDataInputStream is=fs.open(path);
//读取文件内容并写入到新文件
BufferedReader br=new BufferedReader(new InputStreamReader(is,"utf-8"));
BufferedWriter bw=new BufferedWriter(new OutputStreamWriter(os,"utf-8"));
String line="";
while((line=br.readLine())!=null){
bw.write(line);
bw.newLine();
}
//关闭数据字节流
bw.close();
os.close();
br.close();
is.close();
//关闭文件系统
fs.close();
}
}

S6_ListFile列出文件

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹
if(file.isFile()){
System.out.println(file.getPath().toString());
}
}
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹
if(file.isFile()){
System.out.println(file.getPath().toString());
}
}
//关闭文件系统
fs.close();
}
}

S7_DelFile删除文件

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/user_log.txt");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/user_log.txt");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}

S8_DelPath删除目录

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelPath {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelPath {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}

读取序列化文件

DownloadFile
读序列化文件
读序列化文件

package chap5_selectdata;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
public class DownloadFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://c31:9864/");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//获取SequenceFile.Reader对象
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path("/user/limm/output_SelectData/part-m-00000"), conf);
//获取序列化文件中使用的键值类型
Text key = new Text();
Text value = new Text();
BufferedWriter out = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("d:\\tmp\\selectdata.txt", true)));
while (reader.next(key, value)) {
out.write(key.toString() + "\t" + value.toString() + "\r\n");
}
out.close();
reader.close();
System.out.println("end");
}
}
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
public class DownloadFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://c31:9864/");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//获取SequenceFile.Reader对象
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path("/user/limm/output_SelectData/part-m-00000"), conf);
//获取序列化文件中使用的键值类型
Text key = new Text();
Text value = new Text();
BufferedWriter out = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("d:\\tmp\\selectdata.txt", true)));
while (reader.next(key, value)) {
out.write(key.toString() + "\t" + value.toString() + "\r\n");
}
out.close();
reader.close();
System.out.println("end");
}
}

任务5.3优化日志文件统计程序

MR实现日志按月份统计

源码实现


自定义类型MemberLogTime

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}

LogCountMapper实现

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}

LogCountCombiner实现

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}

LogCountPartitioner实现

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}

LogCountReducer实现

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}

LogCount驱动类实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class LogCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs= new String[]{"/user/myname/output_SelectData/part-m-00000","/user/myname/output_MonthData"};
//注意myname处改为自已姓名全拼
}
Job job = Job.getInstance(conf, "logcount");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCountCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}
}

任务5.4Eclipse提交日志文件统计程序

MR实现日志按月份统计2

源码实现


自定义类型MemberLogTime

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}

LogCountMapper实现

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}

LogCountCombiner实现

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}

LogCountPartitioner实现

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}

LogCountReducer实现

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}

LogCount驱动类实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class LogCount extends Configured implements Tool{
public static void main(String[] args){
String[] myArgs={
"/uer/myname/output_SelectData/part-m-00000",
"/uer/myname/output_logcount"
};
try {
ToolRunner.run(LogCount.getMyConfiguration(), new LogCount(), myArgs);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=LogCount.getMyConfiguration();
Job job=Job.getInstance(conf, "logcount");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCountCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true)?-1:1;
}
public static Configuration getMyConfiguration(){
//声明配置
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.app-submission.cross-platform",true);
conf.set("fs.defaultFS", "hdfs://master:9864");// 指定namenode
conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
String resourcenode="master";
conf.set("yarn.resourcemanager.address", resourcenode+":8032"); // 指定resourcemanager
conf.set("yarn.resourcemanager.scheduler.address",resourcenode+":8030");// 指定资源分配器
conf.set("mapreduce.jobhistory.address",resourcenode+":10020");
conf.set("mapreduce.job.jar",JarUtil.jar(LogCount.class));
return conf;
}
}

JarUtil类实现

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
public class JarUtil {
public static String jar(Class<?> cls){// 验证ok
String outputJar =cls.getName()+".jar";
String input = cls.getClassLoader().getResource("").getFile();
input= input.substring(0,input.length()-1);
input = input.substring(0,input.lastIndexOf("/")+1);
input =input +"bin/";
jar(input,outputJar);
return outputJar;
}
private static void jar(String inputFileName, String outputFileName){
JarOutputStream out = null;
try{
out = new JarOutputStream(new FileOutputStream(outputFileName));
File f = new File(inputFileName);
jar(out, f, "");
}catch (Exception e){
e.printStackTrace();
}finally{
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void jar(JarOutputStream out, File f, String base) throws Exception {
if (f.isDirectory()) {
File[] fl = f.listFiles();
base = base.length() == 0 ? "" : base + "/"; // 注意,这里用左斜杠
for (int i = 0; i < fl.length; i++) {
jar(out, fl[ i], base + fl[ i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
}
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
public class JarUtil {
public static String jar(Class<?> cls){// 验证ok
String outputJar =cls.getName()+".jar";
String input = cls.getClassLoader().getResource("").getFile();
input= input.substring(0,input.length()-1);
input = input.substring(0,input.lastIndexOf("/")+1);
input =input +"bin/";
jar(input,outputJar);
return outputJar;
}
private static void jar(String inputFileName, String outputFileName){
JarOutputStream out = null;
try{
out = new JarOutputStream(new FileOutputStream(outputFileName));
File f = new File(inputFileName);
jar(out, f, "");
}catch (Exception e){
e.printStackTrace();
}finally{
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void jar(JarOutputStream out, File f, String base) throws Exception {
if (f.isDirectory()) {
File[] fl = f.listFiles();
base = base.length() == 0 ? "" : base + "/"; // 注意,这里用左斜杠
for (int i = 0; i < fl.length; i++) {
jar(out, fl[ i], base + fl[ i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
}

任务5.5拓展任务

中文词频统计

环境准备

jeiba包下载

wget https://archiva-maven-storage-prod.oss-cn-beijing.aliyuncs.com/repository/central/com/huaban/jieba-analysis/1.0.2/jieba-analysis-1.0.2.jar?Expires=1651050773&OSSAccessKeyId=LTAIfU51SusnnfCC&Signature=cOgPvS81TiysluB%2FbBW94BXIvXY%3D

scp到所有从机

scp_workers.sh

#!/bin/bash
workers=`cat /usr/local/hadoop-3.3.1/etc/hadoop/workers`
for w in $workers
do
scp -P 8022 /usr/local/hadoop-3.3.1/etc/hadoop/* $w:/usr/local/hadoop-3.3.1/etc/hadoop/
scp -P 8022 /usr/local/hadoop-3.3.1/share/hadoop/common/* $w:/usr/local/hadoop-3.3.1/share/hadoop/common/
scp -P 8022 /etc/ntp.conf $w:/etc/
done
workers=`cat /usr/local/hadoop-3.3.1/etc/hadoop/workers`
for w in $workers
do
scp -P 8022 /usr/local/hadoop-3.3.1/etc/hadoop/* $w:/usr/local/hadoop-3.3.1/etc/hadoop/
scp -P 8022 /usr/local/hadoop-3.3.1/share/hadoop/common/* $w:/usr/local/hadoop-3.3.1/share/hadoop/common/
scp -P 8022 /etc/ntp.conf $w:/etc/
done

其它参考

任务拓展

How Many Maps?(官网)

The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.

实训1统计全球每年月的最高气温和最低气温

常见问题

SequenceFile.Reader过时

//获取Option实例,新方法
SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(new Path("/user/root/JanFeb/part-m-00000"));
//获取Reader实例
SequenceFile.Reader reader1 = new SequenceFile.Reader(conf, pathOption);
SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(new Path("/user/root/JanFeb/part-m-00000"));
//获取Reader实例
SequenceFile.Reader reader1 = new SequenceFile.Reader(conf, pathOption);

Cannot allocate containers as requested resource is greater than maximum allowed allocation

在eclipse中运行LogCountRun报错

Invalid resource request! Cannot allocate containers as requested resource is greater than maximum allowed allocation. Requested resource type=[memory-mb], Requested resource=<memory:1536, vCores:1>, maximum allowed allocation=<memory:1024, vCores:2>, please note that maximum allowed allocation is calculated by scheduler based on maximum resource of registered NodeManagers, which might be less than configured maximum allocation=<memory:1024, vCores:4>


Could not find or load main class
org.apache.hadoop.mapreduce.v2.app.MRAppMaster”
org.apache.hadoop.mapreduce.v2.app.MRAppMaster”

提示处理

Please check whether your <HADOOP_HOME>/etc/hadoop/mapred-site.xml contains the below configuration:
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>

处理

使用hadoop classpath查找路径

[root@master hadoop]# hadoop classpath
/usr/local/hadoop-3.3.1/etc/hadoop:/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/common/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*:/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn:/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn/*
[root@master hadoop]#
/usr/local/hadoop-3.3.1/etc/hadoop:/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/common/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*:/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn:/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn/*
[root@master hadoop]#

org.apache.hadoop.mapreduce.v2.app.MRAppMaster:
Error starting MRAppMaster
Error starting MRAppMaster

错误日志

错误日志:2023-05-23 01:18:44,500 ERROR [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster java.lang.UnsupportedClassVersionError: train3_musiccount/S1_MusicSelectData$SelectDataMapper has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

Not enough documents for more than one split! Consider setting mongo.input.split_size to a lower value.

连接Mongodb库查数据报错:

查看日志:/usr/local/hadoop-3.3.1/logs/userlogs/application_1649938751428_0002/container_1649938751428_0002_01_000402/syslog

2022-04-14 21:03:08,216 INFO [main] org.mongodb.driver.connection: Opened connection [connectionId{localValue:2, serverValue:877}] to home.hddly.cn:57017
2022-04-14 21:03:09,770 ERROR [main] com.mongodb.hadoop.input.MongoRecordReader: Exception reading next key/val from mongo: Query failed with error code 51173 and error mes
sage 'error processing query: ns=pythondb.news_dataTree: $and
Sort: {}
Proj: {}
planner returned error :: caused by :: When using min()/max() a hint of which index to use must be provided' on server home.hddly.cn:57017
2022-04-14 21:03:09,770 ERROR [main] com.mongodb.hadoop.input.MongoRecordReader: Exception reading next key/val from mongo: Query failed with error code 51173 and error mes
sage 'error processing query: ns=pythondb.news_dataTree: $and
Sort: {}
Proj: {}
planner returned error :: caused by :: When using min()/max() a hint of which index to use must be provided' on server home.hddly.cn:57017

使用yarn方式运行LogCountRun,报slf4j错

错误信息

[2023-03-30 07:34:40.108]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

使用yarn方式运行LogCountRun,报prelaunch.err错

错误信息

[2023-03-30 07:34:40.108]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

错误分析

发现错误信息:2023-03-30 10:41:22,178 ERROR [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster java.lang.UnsupportedClassVersionError: chap5_logcount/LogCountMapper has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
