网约车大数据综合项目——基于MapReduce的数据清洗
网约车大数据综合项目——基于MapReduce的数据清洗
第1关:网约车撤销订单数据清洗
package traffic.step1.mapreduce;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**连接MySQL工具类*/
public class DBHelper {
/********** Begin **********/
private static final String driver = "com.mysql.jdbc.Driver";
private static final String url = "jdbc:mysql://127.0.0.1:3306/trafficdb?useUnicode=true&characterEncoding=UTF-8";
private static final String username = "root";// 数据库的用户名
private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
private static Connection conn = null; // 声明数据库连接对象
static {
try {
Class.forName(driver);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static Connection getConnection() {
if (conn == null) {
try {
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
e.printStackTrace();
} // 连接数据库
return conn;
}
return conn;
}
/********** End **********/
}
package traffic.step1.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
public class TrafficMap extends Mapper {
Map addressMap = new HashMap();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
/********* Begin **********/
//连接mysql
Connection connection = DBHelper.getConnection();
try {
Statement statement = connection.createStatement();
String sql = "select * from t_address";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String address_code = resultSet.getString(1);
String address_name = resultSet.getString(2);
addressMap.put(address_code, address_name);
}
} catch (SQLException e) {
e.printStackTrace();
}
/********* End **********/
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/********* Begin **********/
String[] values = value.toString().split(",",8);
boolean flag=true;
//判断字符是否为空
for (String val : values){
if (val.equals("")){
flag=false;
}
}
//判断字符列表长度是否为8
if (values.length!=8){
flag=false;
}
//判断订单时间与是否为2019年3月7日
if (!values[3].startsWith("20190307")||!values[4].startsWith("20190307")){
flag=false;
}
if (flag){
if (values[7].equals("null")){
values[7]="未知";
}
DateFormat df1 = new SimpleDateFormat("yyyyMMddHHmmss");
DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
values[3]=df2.format(df1.parse(values[3]));
values[4]=df2.format(df1.parse(values[4]));
} catch (Exception e) {
System.out.println(e.getMessage());
}
String districtname="";
if (addressMap.containsKey(values[1])){
districtname=addressMap.get(values[1].trim());
}else {
districtname="未知";
}
String result = "";
for (int i=0;i
if (i==7){
result = result + values[i];
}else{
if (i==1){
result = result + values[i] + "|"+districtname + "|";
}else{
result = result + values[i] + "|";
}
}
}
//通过订单号来查重
context.write(new Text(values[2]),new Text(result));
}
/********* End **********/
}
}
@Override
protected void reduce(Text key, Iterable
/********** Begin **********/
//相同订单只保留第一行
int num=0;
String result="";
for (Text val:values){
if (num==0){
result=val.toString();
num++;
context.write(NullWritable.get(),new Text(result));
}
}
/********** End **********/
}
}
public static void main(String[] args) throws Exception {
/********** Begin **********/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TrafficJob.class);
job.setMapperClass(TrafficMap.class);
job.setReducerClass(TrafficReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
String outputpath = "/root/files";
Path path= new Path(outputpath);
FileSystem fileSystem =path.getFileSystem(conf);
if (fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileInputFormat.addInputPath(job, new Path("/data/workspace/myshixun/data/ProvOrderCancel/*/"));
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
/********** End **********/
}
}
/********** Begin **********/
private static final String driver = "com.mysql.jdbc.Driver";
private static final String url = "jdbc:mysql://localhost:3306/trafficdb?useUnicode=true&characterEncoding=UTF-8";
private static final String username = "root";// 数据库的用户名
private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
private static Connection conn = null; // 声明数据库连接对象
static {
try {
Class.forName(driver);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static Connection getConnection() {
if (conn == null) {
try {
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
e.printStackTrace();
} // 连接数据库
return conn;
}
return conn;
}
/********** End **********/
}
Map
/********** Begin **********/
//链接数据库,将查询数据放入addressMap中
Connection connection = DBHelper.getConnection();
try {
Statement statement = connection.createStatement();
String sql = "select * from t_address";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
String address_code = resultSet.getString(1);
String address_name = resultSet.getString(2);
addressMap.put(address_code, address_name);
}
} catch (SQLException e) {
e.printStackTrace();
}
/********** End **********/
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/********** Begin **********/
//切割长度为14的字符列表
String[] val = value.toString().split(",",14);
String vals="";
//去除乘客便签(passengernote)、加密字段(encrypt_c)和支付类型(faretype)
for(int i=0;i
if (i==5||i==12||i==13){
}else if (i==11){
vals = vals + val[i];
}else {
vals = vals + val[i] + ",";
}
}
String[] values= vals.split(",",11);
//如果 flag 为 flase则清楚掉,true 返回清洗后数据
boolean flag=true;
if (values.length!=11){
flag=false;
}
//判断字段是否为空
for (String va : values){
if (!flag){break;}
if (va.equals("")){
flag=false;
}
}
try {
//设置经纬度格式
values[6]=values[6].substring(0,3)+"."+values[6].substring(3,values[6].trim().length());
values[7]=values[7].substring(0,2)+"."+values[7].substring(2,values[7].trim().length());
values[9]=values[9].substring(0,3)+"."+values[9].substring(3,values[9].trim().length());
values[10]=values[10].substring(0,2)+"."+values[10].substring(2,values[10].trim().length());
} catch (Exception e) {
flag=false;
}
//判断订单时间是否为2019年3月7日
if (!values[3].startsWith("20190307")||!values[4].startsWith("20190307")){
flag=false;
}
if (flag){
//日期转换
DateFormat df1 = new SimpleDateFormat("yyyyMMddHHmmss");
DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
values[3]=df2.format(df1.parse(values[3]));
values[4]=df2.format(df1.parse(values[4]));
} catch (Exception e) {
System.out.println(e.getMessage());
}
//行政区设置
String districtname="";
if (addressMap.containsKey(values[1])){
districtname=addressMap.get(values[1].trim());
}else {
districtname="未知";
}
//返回结果
String result = "";
for (int i=0;i
if (i==10){
result = result + values[i];
}else{
if (i==1){
//添加行政区名称 districtname
result = result + values[i] + "\t"+districtname + "\t";
}else {
result = result + values[i] + "\t";
}
}
}
//根据订单去重
context.write(new Text(values[2]),new Text(result));
}
/********** End **********/
}
}
@Override
protected void reduce(Text key, Iterable
/********** Begin **********/
//相同订单 id(orderid)只保留第一行
int num=0;
String result="";
for (Text val:values){
if (num==0){
result=val.toString();
num++;
context.write(NullWritable.get(),new Text(result));
}
}
/********** End **********/
}
}
public static void main(String[] args) throws Exception {
/********** Begin **********/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TrafficJob.class);
job.setMapperClass(TrafficMap.class);
job.setReducerClass(TrafficReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//若有此文件夹,则先删除。
String outputpath = "/root/files1";
Path path= new Path(outputpath);
FileSystem fileSystem =path.getFileSystem(conf);
if (fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileInputFormat.addInputPath(job, new Path("/data/workspace/myshixun/data/ProvOrderCreate/*/"));
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
/********** End **********/
}
}
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!
