网约车大数据综合项目——基于MapReduce的数据清洗

2024-07-06 1147阅读

网约车大数据综合项目——基于MapReduce的数据清洗

第1关:网约车撤销订单数据清洗

package traffic.step1.mapreduce;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**连接MySQL工具类*/
public class DBHelper {
    /********** Begin **********/
    private static final String driver = "com.mysql.jdbc.Driver";
    private static final String url = "jdbc:mysql://127.0.0.1:3306/trafficdb?useUnicode=true&characterEncoding=UTF-8";
    private static final String username = "root";// 数据库的用户名
    private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
    private static Connection conn = null; // 声明数据库连接对象
    static {
        try {
            Class.forName(driver);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public static Connection getConnection() {
        if (conn == null) {
            try {
                conn = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                e.printStackTrace();
            } // 连接数据库
            return conn;
        }
        return conn;
    }
    /********** End **********/
}
package traffic.step1.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
public class TrafficMap extends Mapper  {
    Map addressMap = new HashMap();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        /********* Begin **********/
        //连接mysql
         Connection connection = DBHelper.getConnection();
        try {
            Statement statement = connection.createStatement();
            String sql = "select * from t_address";
            ResultSet resultSet = statement.executeQuery(sql);
            while (resultSet.next()) {
                String address_code = resultSet.getString(1);
                String address_name = resultSet.getString(2);
                addressMap.put(address_code, address_name);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        /********* End **********/
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /********* Begin **********/
        String[] values = value.toString().split(",",8);
        boolean flag=true;
        //判断字符是否为空
        for (String val : values){
            if (val.equals("")){
                flag=false;
            }
        }
         //判断字符列表长度是否为8
         if (values.length!=8){
             flag=false;
         }
        //判断订单时间与是否为2019年3月7日
        if (!values[3].startsWith("20190307")||!values[4].startsWith("20190307")){
            flag=false;
        }
        if (flag){
            if (values[7].equals("null")){
                values[7]="未知";
            }
            DateFormat df1 = new SimpleDateFormat("yyyyMMddHHmmss");
            DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                values[3]=df2.format(df1.parse(values[3]));
                values[4]=df2.format(df1.parse(values[4]));
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
            String districtname="";
            if (addressMap.containsKey(values[1])){
                districtname=addressMap.get(values[1].trim());
            }else {
                districtname="未知";
            }
            String result = "";
            for (int i=0;i
                if (i==7){
                    result = result + values[i];
                }else{
                    if (i==1){
                        result = result +  values[i] + "|"+districtname + "|";
                    }else{
                        result = result + values[i] + "|";
                    }
                }
            }
            //通过订单号来查重
            context.write(new Text(values[2]),new Text(result));
        }
    /********* End **********/
    }
}

    @Override
    protected void reduce(Text key, Iterable
       /********** Begin **********/
       //相同订单只保留第一行
        int num=0;
        String result="";
        for (Text val:values){
            if (num==0){
                result=val.toString();
                num++;
                context.write(NullWritable.get(),new Text(result));
            }
        }
        /********** End **********/
    }
}

    public static void main(String[] args) throws Exception {
        /********** Begin **********/
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TrafficJob.class);
        job.setMapperClass(TrafficMap.class);
        job.setReducerClass(TrafficReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        String outputpath = "/root/files";
        Path path= new Path(outputpath);
        FileSystem fileSystem =path.getFileSystem(conf);
        if (fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }
        FileInputFormat.addInputPath(job, new Path("/data/workspace/myshixun/data/ProvOrderCancel/*/"));
        FileOutputFormat.setOutputPath(job,path);
        job.waitForCompletion(true);
        /********** End **********/
        
    }
}

    /********** Begin **********/
    private static final String driver = "com.mysql.jdbc.Driver";
    private static final String url = "jdbc:mysql://localhost:3306/trafficdb?useUnicode=true&characterEncoding=UTF-8";
    private static final String username = "root";// 数据库的用户名
    private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
    private static Connection conn = null; // 声明数据库连接对象
    static {
        try {
            Class.forName(driver);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public static Connection getConnection() {
        if (conn == null) {
            try {
                conn = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                e.printStackTrace();
            } // 连接数据库
            return conn;
        }
        return conn;
    }
    /********** End **********/
} 

    Map
        /********** Begin **********/
        //链接数据库,将查询数据放入addressMap中
        Connection connection = DBHelper.getConnection();
        try {
            Statement statement = connection.createStatement();
            String sql = "select * from t_address";
            ResultSet resultSet = statement.executeQuery(sql);
            while (resultSet.next()) {
                String address_code = resultSet.getString(1);
                String address_name = resultSet.getString(2);
                addressMap.put(address_code, address_name);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        /********** End **********/
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /********** Begin **********/
        //切割长度为14的字符列表
        String[] val = value.toString().split(",",14);
        String vals="";
        //去除乘客便签(passengernote)、加密字段(encrypt_c)和支付类型(faretype)
        for(int i=0;i
            if (i==5||i==12||i==13){
            }else if (i==11){
                vals = vals + val[i];
            }else {
                vals = vals + val[i] + ",";
            }
        }
        String[] values= vals.split(",",11);
        //如果 flag 为 flase则清楚掉,true 返回清洗后数据
        boolean flag=true;
        if (values.length!=11){
            flag=false;
        }
        //判断字段是否为空
        for (String va : values){
            if (!flag){break;}
            if (va.equals("")){
                flag=false;
            }
        }
        try {
            //设置经纬度格式
            values[6]=values[6].substring(0,3)+"."+values[6].substring(3,values[6].trim().length());
            values[7]=values[7].substring(0,2)+"."+values[7].substring(2,values[7].trim().length());
            values[9]=values[9].substring(0,3)+"."+values[9].substring(3,values[9].trim().length());
            values[10]=values[10].substring(0,2)+"."+values[10].substring(2,values[10].trim().length());
        } catch (Exception e) {
            flag=false;
        }
        //判断订单时间是否为2019年3月7日
        if (!values[3].startsWith("20190307")||!values[4].startsWith("20190307")){
            flag=false;
        }
        if (flag){
            //日期转换
            DateFormat df1 = new SimpleDateFormat("yyyyMMddHHmmss");
            DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                values[3]=df2.format(df1.parse(values[3]));
                values[4]=df2.format(df1.parse(values[4]));
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
            //行政区设置
            String districtname="";
            if (addressMap.containsKey(values[1])){
                districtname=addressMap.get(values[1].trim());
            }else {
                districtname="未知";
            }
            //返回结果
            String result = "";
            for (int i=0;i
                if (i==10){
                    result = result + values[i];
                }else{
                    if (i==1){
                        //添加行政区名称 districtname
                        result = result + values[i] + "\t"+districtname + "\t";
                    }else {
                        result = result + values[i] + "\t";
                    }
                }
            }
            //根据订单去重
            context.write(new Text(values[2]),new Text(result));
        }
    /********** End **********/
    }
}

    @Override
    protected void reduce(Text key, Iterable
        /********** Begin **********/
        //相同订单 id(orderid)只保留第一行
        int num=0;
        String result="";
        for (Text val:values){
            if (num==0){
                result=val.toString();
                num++;
                context.write(NullWritable.get(),new Text(result));
            }
        }
        /********** End **********/
    }
}

    public static void main(String[] args) throws Exception {
        /********** Begin **********/
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TrafficJob.class);
        job.setMapperClass(TrafficMap.class);
        job.setReducerClass(TrafficReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        //若有此文件夹,则先删除。
        String outputpath = "/root/files1";
        Path path= new Path(outputpath);
        FileSystem fileSystem =path.getFileSystem(conf);
        if (fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }
        FileInputFormat.addInputPath(job, new Path("/data/workspace/myshixun/data/ProvOrderCreate/*/"));
        FileOutputFormat.setOutputPath(job,path);
        job.waitForCompletion(true);
       
        /********** End **********/
    }
}
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]