本文最后更新于:2022-02-22T16:49:10+08:00
Partition分区
自定义分区类需要继承Partitioner,重写其中的getPartiton方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class myPartitioner extends Partitioner<Text, FLowBean> { @Override public int getPartition(Text text, FLowBean fLowBean, int numPartitions) { String phone = text.toString();
int partition; if (phone.startsWith("136")) { partition = 0; } else if (phone.startsWith("137")) { partition = 1; } else if (phone.startsWith("138")) { partition = 2; } else if (phone.startsWith("139")) { partition = 3; } else { partition = 4; } return partition; } }
|
之后在驱动类中进行相应的设置
1 2 3
| job.setPartitionerClass(myPartitioner.class); job.setNumReduceTasks(5);
|
注意事项:
可以自由指定ReduceTask的数量,但是最好和getPartition的结果数保持一致
- 如果ReduceTask的数量 >
getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
- 如果ReduceTask的数量 <
getPartition的结果数,则会有一部分数据无处安放,会报错;
- 如果ReduceTask的数量 =
1,那么不管MapTask输出多少个分区文件,最终结果都交给一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;
- 在ReduceTask数量为1的时候,MapTask中根本不走我们自定义的Partitioner,而是走自己创建的一个内部类
返回的分区号必须从0开始,逐一累加
WritableComparable排序
将自定义类作为key的时候,应该实现WritableComparable接口,使得它能够进行排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class FlowBean implements WritableComparable<FlowBean> { @Override public int compareTo(FlowBean bean) { int result; if (this.sumFlow > bean.getSumFlow()) { result = -1; }else if (this.sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; } }
|