Hadoop学习笔记-MapReduce(2)-MapReduce序列化

Hadoop序列化

1. 序列化的概念

序列化:序列化就是将内存中的对象转化成字节序列(或者其他数据传输协议)以便于持久化到磁盘和网络传输。

反序列化:反序列化就是将收到的字节序列(或者其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2. hadoop序列化背景

在MapReduce的过程中,Mapper和Reducer之间存在数据交换,他们很有可能是不在一台节点上的,所以需要利用网络传输的手段来传输数据。因此数据需要序列化。在之前的示例中,我们使用的是hadoop writable类型的数据,那是Hadoop提供的可以序列化的基本类型。但是如果我们在键值对中想要使用我们自定义的类型,那么不作任何准备是不行的,因此需要完成序列化。

我们使用的是Hadoop的序列化,即让我们自定义的类实现序列化接口Writable。

这里并不使用java的序列化,因为Java的序列化是一个重量级的序列化框架(Serializable)。一个对象被序列化之后,会附带很多额外的信息,包括各种校验信息,Header,继承体系等等,不便于在网络中高效传输,所以Hadoop自己开发了一套序列化机制(Writable)。

Hadoop的序列化特点:

  • 紧凑:高效使用存储空间
  • 快速:读写数据的额外开销小
  • 互操作:支持多语言的交互

自定义类实现序列化接口(Writable)

程序背景:我们希望统计用户的上行流量、下行流量以及流量总和,因此我们定义一个自定义类FlowBean来完成对应信息的记录,并完成序列化。

1.自定义类

这里我们仅实现一个自定义类记录流量,FlowBean。

具体实现序列化的步骤如下:

  1. 必须实现Writable接口
  2. 反序列化的时候,需要反射调用空参构造函数,所以必须有空参构造
  3. 重写序列化方法write
  4. 重写反序列化方法readFields
  5. 这里注意反序列化的顺序和序列化的顺序要完全一致
  6. 如果想要将结果显示在文件中,需要重写toString()方法
  7. 如果想要将自定义的对象放在key中传输,还需要实现WritableComparable接口。因为MapReduce框架中的Shuffle过程要求对key能够排序

因此,我们的自定义类实现如下(省略了setter和getter方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class FLowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;

// 空参构造
public FLowBean() {
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}

后续的Mapper和Reducer同样需要自定义实现来完成对应的逻辑。

2.Mapper

Mapper的输入输出:<LongWritable, Text>-><Text, FlowBean>

Mapper的逻辑:将读入的每一行进行转化,转化成对应电话号码和流量描述的键值对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FlowMapper extends Mapper<LongWritable, Text, Text, FLowBean> {

private Text outK = new Text();
private FLowBean outV = new FLowBean();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FLowBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] infos = line.split("\t");

String phone = infos[1];
String upFlow = infos[infos.length - 3];
String downFlow = infos[infos.length - 2];

outK.set(phone);
outV.setUpFlow(Long.parseLong(upFlow));
outV.setDownFlow(Long.parseLong(downFlow));
outV.setSumFlow();

context.write(outK, outV);
}
}

3.Reducer

Reducer的输入输出:<Text, FlowBean>-><Text, FlowBean>

Reducer的逻辑:将电话号码相同的流量描述对象进行统计求和,每次reduce输出一个电话号码和一个流量描述总和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FLowReducer extends Reducer<Text, FLowBean, Text, FLowBean> {
private FLowBean outV = new FLowBean();

@Override
protected void reduce(Text key, Iterable<FLowBean> values, Reducer<Text, FLowBean, Text, FLowBean>.Context context) throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;

for (FLowBean value : values) {
sumUpFlow += value.getUpFlow();
sumDownFlow += value.getDownFlow();
}

outV.setUpFlow(sumUpFlow);
outV.setDownFlow(sumDownFlow);
outV.setSumFlow();

context.write(key, outV);
}
}

4.Driver

Driver仍然是固定的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(FlowDriver.class);

job.setMapperClass(FlowMapper.class);
job.setReducerClass(FLowReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FLowBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FLowBean.class);

FileInputFormat.setInputPaths(job, new Path("D:/input/inputflow"));
FileOutputFormat.setOutputPath(job, new Path("D:/output/output"));

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

Hadoop学习笔记-MapReduce(2)-MapReduce序列化
http://example.com/2022/02/20/Hadoop学习笔记-MapReduce-2-MapReduce序列化/
作者
EverNorif
发布于
2022年2月20日
许可协议