任务5.1筛选日志文件并生成序列化文件
编写代码
添加import
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
添加Mapper类
public static class SelectDataMapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,Text,Text>.Context context)
throws IOException, InterruptedException {
String[] val=value.toString().split(",");
if(val[1].contains("2016-01") || val[1].contains("2016-02")){
context.write(new Text(val[0]),new Text(val[1]));
}
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,Text,Text>.Context context)
throws IOException, InterruptedException {
String[] val=value.toString().split(",");
if(val[1].contains("2016-01") || val[1].contains("2016-02")){
context.write(new Text(val[0]),new Text(val[1]));
}
}
}
添加Driver代码
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs = new String[] { "/user/myname/user_login.txt", "/user/myname/output_SelectData" };
} // myname要改为自已的姓名拼音
Job job = Job.getInstance(conf, "selectdata");
job.setJarByClass(SelectData.class);
job.setMapperClass(SelectDataMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);// 设置输入格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置输出格式
job.setNumReduceTasks(0);// 设置Reducer任务数为0
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs = new String[] { "/user/myname/user_login.txt", "/user/myname/output_SelectData" };
} // myname要改为自已的姓名拼音
Job job = Job.getInstance(conf, "selectdata");
job.setJarByClass(SelectData.class);
job.setMapperClass(SelectDataMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);// 设置输入格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置输出格式
job.setNumReduceTasks(0);// 设置Reducer任务数为0
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}
任务5.2Hadoop Java API读取序列化日志文件
获取测试数据
FileSystemAPI实例
S1_ListDir列出文件夹
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/limm/");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹还是文件
if(file.isDirectory()){
System.out.println("Dir:" +file.getPath().toString());
}
else if (file.isFile())
{
System.out.println("File:" +file.getPath().toString());
}
else
{
System.out.println("Other:" +file.getPath().toString());
}
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/limm/");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹还是文件
if(file.isDirectory()){
System.out.println("Dir:" +file.getPath().toString());
}
else if (file.isFile())
{
System.out.println("File:" +file.getPath().toString());
}
else
{
System.out.println("Other:" +file.getPath().toString());
}
}
}
}
S2_CreateDir创建目录
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
////声明创建的目录
Path path=new Path("/user/myname/temp"); //myname改为本人
//调用mkdirs函数创建目录
fs.mkdirs(path);
//关闭文件
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
////声明创建的目录
Path path=new Path("/user/myname/temp"); //myname改为本人
//调用mkdirs函数创建目录
fs.mkdirs(path);
//关闭文件
fs.close();
}
}
S3_CopyToLocal下载文件
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyToLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明源文件路径和目标路径
Path fromPath=new Path("/user/myname/user_login.txt");
Path toPath=new Path("D:/tmp");
//调用copyToLocalFile方法下载文件到本地
fs.copyToLocalFile(false, fromPath, toPath, true);
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyToLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明源文件路径和目标路径
Path fromPath=new Path("/user/myname/user_login.txt");
Path toPath=new Path("D:/tmp");
//调用copyToLocalFile方法下载文件到本地
fs.copyToLocalFile(false, fromPath, toPath, true);
//关闭文件系统
fs.close();
}
}
S4_CopyFromLocal上传文件
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFromLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fileSystem = FileSystem.get(conf);
//FileSystem fileSystem = FileSystem.get(URI.create("hdfs://home.hddly.cn:9864/"),conf,"myname");
//声明源文件路径和目标路径
Path fromPath = new Path("D:/tmp/user_login.txt");
Path toPath = new Path("/user/myname/temp/user_log.txt");
//调用copyFromLocalFile方法上传文件
fileSystem.copyFromLocalFile(fromPath,toPath);
//关闭文件系统
fileSystem.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFromLocal {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fileSystem = FileSystem.get(conf);
//FileSystem fileSystem = FileSystem.get(URI.create("hdfs://home.hddly.cn:9864/"),conf,"myname");
//声明源文件路径和目标路径
Path fromPath = new Path("D:/tmp/user_login.txt");
Path toPath = new Path("/user/myname/temp/user_log.txt");
//调用copyFromLocalFile方法上传文件
fileSystem.copyFromLocalFile(fromPath,toPath);
//关闭文件系统
fileSystem.close();
}
}
S5_CatFile读写文件
public class CatFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明查看的路径
Path path=new Path("/user/myname/temp/user_log.txt");
//创建新文件
Path newPath=new Path("/user/myname/temp/new_user_log.txt");
fs.delete(newPath,true);
FSDataOutputStream os=fs.create(newPath);
//获取指定文件的数据字节流
FSDataInputStream is=fs.open(path);
//读取文件内容并写入到新文件
BufferedReader br=new BufferedReader(new InputStreamReader(is,"utf-8"));
BufferedWriter bw=new BufferedWriter(new OutputStreamWriter(os,"utf-8"));
String line="";
while((line=br.readLine())!=null){
bw.write(line);
bw.newLine();
}
//关闭数据字节流
bw.close();
os.close();
br.close();
is.close();
//关闭文件系统
fs.close();
}
}
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明查看的路径
Path path=new Path("/user/myname/temp/user_log.txt");
//创建新文件
Path newPath=new Path("/user/myname/temp/new_user_log.txt");
fs.delete(newPath,true);
FSDataOutputStream os=fs.create(newPath);
//获取指定文件的数据字节流
FSDataInputStream is=fs.open(path);
//读取文件内容并写入到新文件
BufferedReader br=new BufferedReader(new InputStreamReader(is,"utf-8"));
BufferedWriter bw=new BufferedWriter(new OutputStreamWriter(os,"utf-8"));
String line="";
while((line=br.readLine())!=null){
bw.write(line);
bw.newLine();
}
//关闭数据字节流
bw.close();
os.close();
br.close();
is.close();
//关闭文件系统
fs.close();
}
}
S6_ListFile列出文件
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹
if(file.isFile()){
System.out.println(file.getPath().toString());
}
}
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ListFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864/");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname");
//获取文件列表
FileStatus[] fileStatuses=fs.listStatus(path);
//遍历文件列表
for (FileStatus file : fileStatuses) {
//判断是否是文件夹
if(file.isFile()){
System.out.println(file.getPath().toString());
}
}
//关闭文件系统
fs.close();
}
}
S7_DelFile删除文件
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/user_log.txt");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/user_log.txt");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}
S8_DelPath删除目录
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelPath {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DelPath {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
//获取文件系统
FileSystem fs=FileSystem.get(conf);
//声明文件路径
Path path=new Path("/user/myname/temp/");
//删除文件
fs.delete(path, true);
//关闭文件系统
fs.close();
}
}
读取序列化文件
DownloadFile
读序列化文件
读序列化文件
package chap5_selectdata;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
public class DownloadFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://c31:9864/");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//获取SequenceFile.Reader对象
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path("/user/limm/output_SelectData/part-m-00000"), conf);
//获取序列化文件中使用的键值类型
Text key = new Text();
Text value = new Text();
BufferedWriter out = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("d:\\tmp\\selectdata.txt", true)));
while (reader.next(key, value)) {
out.write(key.toString() + "\t" + value.toString() + "\r\n");
}
out.close();
reader.close();
System.out.println("end");
}
}
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
public class DownloadFile {
public static void main(String[] args) throws IOException {
//获取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://c31:9864/");
//获取文件系统
FileSystem fs = FileSystem.get(conf);
//获取SequenceFile.Reader对象
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path("/user/limm/output_SelectData/part-m-00000"), conf);
//获取序列化文件中使用的键值类型
Text key = new Text();
Text value = new Text();
BufferedWriter out = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("d:\\tmp\\selectdata.txt", true)));
while (reader.next(key, value)) {
out.write(key.toString() + "\t" + value.toString() + "\r\n");
}
out.close();
reader.close();
System.out.println("end");
}
}
任务5.3优化日志文件统计程序
MR实现日志按月份统计
源码实现
自定义类型MemberLogTime
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}
LogCountMapper实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}
LogCountCombiner实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
LogCountPartitioner实现
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}
LogCountReducer实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
LogCount驱动类实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class LogCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://home.hddly.cn:9864");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
otherArgs= new String[]{"/user/myname/output_SelectData/part-m-00000","/user/myname/output_MonthData"};
//注意myname处改为自已姓名全拼
}
Job job = Job.getInstance(conf, "logcount");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCountCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.err.println(job.waitForCompletion(true) ? -1 : 1);
}
}
任务5.4Eclipse提交日志文件统计程序
MR实现日志按月份统计2
源码实现
自定义类型MemberLogTime
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MemberLogTime implements WritableComparable<MemberLogTime>{
private String member_name;
private String logTime;
public MemberLogTime() {
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
@Override
public void readFields(DataInput in) throws IOException {
this.member_name=in.readUTF();
this.logTime=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(member_name);
out.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
@Override
public String toString() {
return this.member_name+","+this.logTime;
}
}
LogCountMapper实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogCountMapper extends Mapper<Text, Text, MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);;
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);;
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt, one);
}
}
LogCountCombiner实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountCombiner extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
LogCountPartitioner实现
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0%numPartitions;
}else{
return 1%numPartitions;
}
}
}
LogCountReducer实现
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable> {
@Override
protected void reduce(MemberLogTime key, Iterable<IntWritable> value,
Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)
throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);;
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter", "FebruaryResult").increment(1);
}
int sum=0;
for (IntWritable val : value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
LogCount驱动类实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class LogCount extends Configured implements Tool{
public static void main(String[] args){
String[] myArgs={
"/uer/myname/output_SelectData/part-m-00000",
"/uer/myname/output_logcount"
};
try {
ToolRunner.run(LogCount.getMyConfiguration(), new LogCount(), myArgs);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=LogCount.getMyConfiguration();
Job job=Job.getInstance(conf, "logcount");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);
job.setReducerClass(LogCountReducer.class);
job.setCombinerClass(LogCountCombiner.class);
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTasks(2);
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(SequenceFileAsTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true)?-1:1;
}
public static Configuration getMyConfiguration(){
//声明配置
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.app-submission.cross-platform",true);
conf.set("fs.defaultFS", "hdfs://master:9864");// 指定namenode
conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
String resourcenode="master";
conf.set("yarn.resourcemanager.address", resourcenode+":8032"); // 指定resourcemanager
conf.set("yarn.resourcemanager.scheduler.address",resourcenode+":8030");// 指定资源分配器
conf.set("mapreduce.jobhistory.address",resourcenode+":10020");
conf.set("mapreduce.job.jar",JarUtil.jar(LogCount.class));
return conf;
}
}
JarUtil类实现
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
public class JarUtil {
public static String jar(Class<?> cls){// 验证ok
String outputJar =cls.getName()+".jar";
String input = cls.getClassLoader().getResource("").getFile();
input= input.substring(0,input.length()-1);
input = input.substring(0,input.lastIndexOf("/")+1);
input =input +"bin/";
jar(input,outputJar);
return outputJar;
}
private static void jar(String inputFileName, String outputFileName){
JarOutputStream out = null;
try{
out = new JarOutputStream(new FileOutputStream(outputFileName));
File f = new File(inputFileName);
jar(out, f, "");
}catch (Exception e){
e.printStackTrace();
}finally{
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void jar(JarOutputStream out, File f, String base) throws Exception {
if (f.isDirectory()) {
File[] fl = f.listFiles();
base = base.length() == 0 ? "" : base + "/"; // 注意,这里用左斜杠
for (int i = 0; i < fl.length; i++) {
jar(out, fl[ i], base + fl[ i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
}
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
public class JarUtil {
public static String jar(Class<?> cls){// 验证ok
String outputJar =cls.getName()+".jar";
String input = cls.getClassLoader().getResource("").getFile();
input= input.substring(0,input.length()-1);
input = input.substring(0,input.lastIndexOf("/")+1);
input =input +"bin/";
jar(input,outputJar);
return outputJar;
}
private static void jar(String inputFileName, String outputFileName){
JarOutputStream out = null;
try{
out = new JarOutputStream(new FileOutputStream(outputFileName));
File f = new File(inputFileName);
jar(out, f, "");
}catch (Exception e){
e.printStackTrace();
}finally{
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void jar(JarOutputStream out, File f, String base) throws Exception {
if (f.isDirectory()) {
File[] fl = f.listFiles();
base = base.length() == 0 ? "" : base + "/"; // 注意,这里用左斜杠
for (int i = 0; i < fl.length; i++) {
jar(out, fl[ i], base + fl[ i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
}
任务5.5拓展任务
中文词频统计
环境准备
jeiba包下载
wget https://archiva-maven-storage-prod.oss-cn-beijing.aliyuncs.com/repository/central/com/huaban/jieba-analysis/1.0.2/jieba-analysis-1.0.2.jar?Expires=1651050773&OSSAccessKeyId=LTAIfU51SusnnfCC&Signature=cOgPvS81TiysluB%2FbBW94BXIvXY%3D
scp到所有从机
scp_workers.sh
#!/bin/bash
workers=`cat /usr/local/hadoop-3.3.1/etc/hadoop/workers`
for w in $workers
do
scp -P 8022 /usr/local/hadoop-3.3.1/etc/hadoop/* $w:/usr/local/hadoop-3.3.1/etc/hadoop/
scp -P 8022 /usr/local/hadoop-3.3.1/share/hadoop/common/* $w:/usr/local/hadoop-3.3.1/share/hadoop/common/
scp -P 8022 /etc/ntp.conf $w:/etc/
done
workers=`cat /usr/local/hadoop-3.3.1/etc/hadoop/workers`
for w in $workers
do
scp -P 8022 /usr/local/hadoop-3.3.1/etc/hadoop/* $w:/usr/local/hadoop-3.3.1/etc/hadoop/
scp -P 8022 /usr/local/hadoop-3.3.1/share/hadoop/common/* $w:/usr/local/hadoop-3.3.1/share/hadoop/common/
scp -P 8022 /etc/ntp.conf $w:/etc/
done
其它参考
任务拓展
How Many Maps?(官网)
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus, if you expect 10TB of input data and have a blocksize of 128MB, you’ll end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.
实训1统计全球每年月的最高气温和最低气温
常见问题
SequenceFile.Reader过时
//获取Option实例,新方法
SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(new Path("/user/root/JanFeb/part-m-00000"));
//获取Reader实例
SequenceFile.Reader reader1 = new SequenceFile.Reader(conf, pathOption);
SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(new Path("/user/root/JanFeb/part-m-00000"));
//获取Reader实例
SequenceFile.Reader reader1 = new SequenceFile.Reader(conf, pathOption);
Cannot allocate containers as requested resource is greater than maximum allowed allocation
在eclipse中运行LogCountRun报错
Invalid resource request! Cannot allocate containers as requested resource is greater than maximum allowed allocation. Requested resource type=[memory-mb], Requested resource=<memory:1536, vCores:1>, maximum allowed allocation=<memory:1024, vCores:2>, please note that maximum allowed allocation is calculated by scheduler based on maximum resource of registered NodeManagers, which might be less than configured maximum allocation=<memory:1024, vCores:4>
Could not find or load main class
org.apache.hadoop.mapreduce.v2.app.MRAppMaster”
org.apache.hadoop.mapreduce.v2.app.MRAppMaster”
提示处理
Please check whether your <HADOOP_HOME>/etc/hadoop/mapred-site.xml contains the below configuration:
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${full path of your hadoop distribution directory}</value>
</property>
处理
使用hadoop classpath查找路径
[root@master hadoop]# hadoop classpath
/usr/local/hadoop-3.3.1/etc/hadoop:/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/common/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*:/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn:/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn/*
[root@master hadoop]#
/usr/local/hadoop-3.3.1/etc/hadoop:/usr/local/hadoop-3.3.1/share/hadoop/common/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/common/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/hdfs/*:/usr/local/hadoop-3.3.1/share/hadoop/mapreduce/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn:/usr/local/hadoop-3.3.1/share/hadoop/yarn/lib/*:/usr/local/hadoop-3.3.1/share/hadoop/yarn/*
[root@master hadoop]#
org.apache.hadoop.mapreduce.v2.app.MRAppMaster:
Error starting MRAppMaster
Error starting MRAppMaster
错误日志
错误日志:2023-05-23 01:18:44,500 ERROR [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster java.lang.UnsupportedClassVersionError: train3_musiccount/S1_MusicSelectData$SelectDataMapper has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
Not enough documents for more than one split! Consider setting mongo.input.split_size to a lower value.
连接Mongodb库查数据报错:
查看日志:/usr/local/hadoop-3.3.1/logs/userlogs/application_1649938751428_0002/container_1649938751428_0002_01_000402/syslog
2022-04-14 21:03:08,216 INFO [main] org.mongodb.driver.connection: Opened connection [connectionId{localValue:2, serverValue:877}] to home.hddly.cn:57017
2022-04-14 21:03:09,770 ERROR [main] com.mongodb.hadoop.input.MongoRecordReader: Exception reading next key/val from mongo: Query failed with error code 51173 and error mes
sage 'error processing query: ns=pythondb.news_dataTree: $and
Sort: {}
Proj: {}
planner returned error :: caused by :: When using min()/max() a hint of which index to use must be provided' on server home.hddly.cn:57017
2022-04-14 21:03:09,770 ERROR [main] com.mongodb.hadoop.input.MongoRecordReader: Exception reading next key/val from mongo: Query failed with error code 51173 and error mes
sage 'error processing query: ns=pythondb.news_dataTree: $and
Sort: {}
Proj: {}
planner returned error :: caused by :: When using min()/max() a hint of which index to use must be provided' on server home.hddly.cn:57017
使用yarn方式运行LogCountRun,报slf4j错
错误信息
[2023-03-30 07:34:40.108]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
使用yarn方式运行LogCountRun,报prelaunch.err错
错误信息
[2023-03-30 07:34:40.108]Container exited with a non-zero exit code 1. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
错误分析
发现错误信息:2023-03-30 10:41:22,178 ERROR [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Error starting MRAppMaster java.lang.UnsupportedClassVersionError: chap5_logcount/LogCountMapper has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:756)