본문 바로가기
카테고리 없음

hadoop 사용하기 -1 비행데이터 분석하기

by 인디코더 2018. 11. 12.

Hadoop File System으로 비행데이터 분석하기


http://stat-computing.org/dataexpo/2009/the-data.html



hadoop@pw123-VirtualBox:~/dataexpo$ wget http://stat-computing.org/dataexpo/2009/1987.csv.bz2 (파일 다운로드)
hadoop@pw123-VirtualBox:~/dataexpo$ bzip2 -d 1987.csv.bz2          

자신의 zip파일을 없애고 압축 풀기

hadoop@pw123-VirtualBox:~/dataexpo$ sed -e '1d' 1987.csv > 1987_temp.csv

                      첫 줄 지우고 1987_temp.csv로 받기 hadoop@pw123-VirtualBox:~/dataexpo$ head 1987_temp.csv hadoop@pw123-VirtualBox:~/dataexpo$ mv 1987_temp.csv 1987.csv


비행데이터는 1999- 2008년도 까지 있다. 

그래서, 이 파일들을 갖고 와 보았다.

hadoop@pw123-VirtualBox:~/dataexpo$ mkdir dataexpo
hadoop@pw123-VirtualBox:~/dataexpo$ cd dataexpo/
hadoop@pw123-VirtualBox:~/dataexpo$ gedit dataexpo.sh            //vi 윈도우에 메모장 같은 역할을 하는 gedit

for((i=1987; i<=2008; i++)); do
wget http://stat-computing.org/dataexpo/2009/$i.csv.bz2
bzip2 -d $i.csv.bz2
sed -e '1d' $i.csv > $i_temp.csv
mv $i_temp.csv $i.csv
done


hadoop@pw123-VirtualBox:~/dataexpo$ ./dataexpo.sh             //다운로드가 시작 된다.


hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -put 1987.csv /user/hadoop/1987.csv
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -ls /user/hadoop/ hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -rmdir /user/1987out            //이클립스에서 실행을 할 때 혹시나, 중복된게 있다고 할 수 있기 때문에 ..

[ 이클립스 ]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
Class AirlinePerformanceParser 
 
 
 
package dataexpo;
 
import java.io.Serializable;
 
import org.apache.hadoop.io.Text;
 
public class AirlinePerformanceParser implements Serializable{
    private int year;                //년도
    private int month;            //월
    private int arriveDelayTime;//도착지연시간
    private int departureDelayTime;//출발지연시간
    private int distance;            //이동거리
    private boolean arriveDelayAvailable = true;    //도착지연여부
    private boolean departureDelayAvailable = true;//출발지연여부
    private boolean distanceAvailable = true;        //취향여부
    private String uniqueCarrier;                     //항공사코드
    
    public AirlinePerformanceParser(Text text) {
        try {
            String[] columns = text.toString().split(",");
            year = Integer.parseInt(columns[0]);
            month = Integer.parseInt(columns[1]);
            uniqueCarrier = columns[8];
            if(!columns[15].equals("NA")) {
                departureDelayTime = Integer.parseInt(columns[15]);
            }else {
                departureDelayAvailable = false;
            }
            if(!columns[14].equals("NA")) {
                arriveDelayTime = Integer.parseInt(columns[14]);
            } else {
                arriveDelayAvailable = false;
            }
            if(!columns[18].equals("NA")) {
                distance = Integer.parseInt(columns[18]);
            } else {
                distanceAvailable = false;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //getter
 
    public int getYear() {
        return year;
    }
 
    public int getMonth() {
        return month;
    }
 
    public int getArriveDelayTime() {
        return arriveDelayTime;
    }
 
    public int getDepartureDelayTime() {
        return departureDelayTime;
    }
 
    public int getDistance() {
        return distance;
    }
 
    public boolean isArriveDelayAvailable() {
        return arriveDelayAvailable;
    }
 
    public boolean isDepartureDelayAvailable() {
        return departureDelayAvailable;
    }
 
    public boolean isDistanceAvailable() {
        return distanceAvailable;
    }
 
    public String getUniqueCarrier() {
        return uniqueCarrier;
    }
}
 
cs



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ArrivalDelayCount
 
package dataexpo;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 
/**
 * @author hadoop
 *    월별 도착지연 건수 출력하기
 */
public class ArrivalDelayCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf,"ArrivalDelayCount");
        job.setJarByClass(ArrivalDelayCount.class);
        job.setMapperClass(ArrivalDelayCountMapper.class);
        job.setReducerClass(DelayCountReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, 
                new Path("hdfs://localhost:9000/user/hadoop/1987.csv"));
        FileOutputFormat.setOutputPath(job, 
                new Path("hdfs://localhost:9000/user/hadoop/1987out"));
        job.waitForCompletion(true);
        
    }
 
}
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ArrivalDelayCountMapper 
 
package dataexpo;
 
import java.io.IOException;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class ArrivalDelayCountMapper 
        extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text outkey = new Text();
    @Override
    protected void map(LongWritable key, Text value, 
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        
        AirlinePerformanceParser parser = new AirlinePerformanceParser(value);
        outkey.set(parser.getYear() + "," + parser.getMonth());
        if(parser.getArriveDelayTime() > 0 && parser.isArriveDelayAvailable()) {
            context.write(outkey, one);
        }
    }
    
}
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DelayCountReducer
 
package dataexpo;
 
import java.io.IOException;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result = new IntWritable();
 
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)  throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable v : values) {
            sum += v.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

cs

 



hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -ls /user/hadoop
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -ls /user/hadoop/1987out
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -cat /user/hadoop/1987out/part-r-0000



반응형