本文最后更新于:2022-03-10T15:06:10+08:00
Hadoop序列化
1. 序列化的概念
序列化:序列化就是将内存中的对象转化成字节序列(或者其他数据传输协议)以便于持久化到磁盘和网络传输。
反序列化:反序列化就是将收到的字节序列(或者其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2. hadoop序列化背景
在MapReduce的过程中,Mapper和Reducer之间存在数据交换,他们很有可能是不在一台节点上的,所以需要利用网络传输的手段来传输数据。因此数据需要序列化。在之前的示例中,我们使用的是hadoop
writable类型的数据,那是Hadoop提供的可以序列化的基本类型。但是如果我们在键值对中想要使用我们自定义的类型,那么不作任何准备是不行的,因此需要完成序列化。
我们使用的是Hadoop的序列化,即让我们自定义的类实现序列化接口Writable。
这里并不使用java的序列化,因为Java的序列化是一个重量级的序列化框架(Serializable)。一个对象被序列化之后,会附带很多额外的信息,包括各种校验信息,Header,继承体系等等,不便于在网络中高效传输,所以Hadoop自己开发了一套序列化机制(Writable)。
Hadoop的序列化特点:
- 紧凑:高效使用存储空间
- 快速:读写数据的额外开销小
- 互操作:支持多语言的交互
自定义类实现序列化接口(Writable)
程序背景:我们希望统计用户的上行流量、下行流量以及流量总和,因此我们定义一个自定义类FlowBean来完成对应信息的记录,并完成序列化。
1.自定义类
这里我们仅实现一个自定义类记录流量,FlowBean。
具体实现序列化的步骤如下:
- 必须实现Writable接口
- 反序列化的时候,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法write
- 重写反序列化方法readFields
- 这里注意反序列化的顺序和序列化的顺序要完全一致
- 如果想要将结果显示在文件中,需要重写toString()方法
- 如果想要将自定义的对象放在key中传输,还需要实现WritableComparable接口。因为MapReduce框架中的Shuffle过程要求对key能够排序
因此,我们的自定义类实现如下(省略了setter和getter方法):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class FLowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow;
public FLowBean() { } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); }
@Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); }
@Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
|
后续的Mapper和Reducer同样需要自定义实现来完成对应的逻辑。
2.Mapper
Mapper的输入输出:<LongWritable, Text>
-><Text, FlowBean>
Mapper的逻辑:将读入的每一行进行转化,转化成对应电话号码和流量描述的键值对。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class FlowMapper extends Mapper<LongWritable, Text, Text, FLowBean> {
private Text outK = new Text(); private FLowBean outV = new FLowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FLowBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] infos = line.split("\t");
String phone = infos[1]; String upFlow = infos[infos.length - 3]; String downFlow = infos[infos.length - 2];
outK.set(phone); outV.setUpFlow(Long.parseLong(upFlow)); outV.setDownFlow(Long.parseLong(downFlow)); outV.setSumFlow();
context.write(outK, outV); } }
|
3.Reducer
Reducer的输入输出:<Text, FlowBean>
-><Text, FlowBean>
Reducer的逻辑:将电话号码相同的流量描述对象进行统计求和,每次reduce输出一个电话号码和一个流量描述总和。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class FLowReducer extends Reducer<Text, FLowBean, Text, FLowBean> { private FLowBean outV = new FLowBean();
@Override protected void reduce(Text key, Iterable<FLowBean> values, Reducer<Text, FLowBean, Text, FLowBean>.Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0;
for (FLowBean value : values) { sumUpFlow += value.getUpFlow(); sumDownFlow += value.getDownFlow(); }
outV.setUpFlow(sumUpFlow); outV.setDownFlow(sumDownFlow); outV.setSumFlow();
context.write(key, outV); } }
|
4.Driver
Driver仍然是固定的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class); job.setReducerClass(FLowReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FLowBean.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(FLowBean.class);
FileInputFormat.setInputPaths(job, new Path("D:/input/inputflow")); FileOutputFormat.setOutputPath(job, new Path("D:/output/output"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1); } }
|