Hadoop File System으로 비행데이터 분석하기
http://stat-computing.org/dataexpo/2009/the-data.html
hadoop@pw123-VirtualBox:~/dataexpo$ wget http://stat-computing.org/dataexpo/2009/1987.csv.bz2 (파일 다운로드)
hadoop@pw123-VirtualBox:~/dataexpo$ bzip2 -d 1987.csv.bz2
자신의 zip파일을 없애고 압축 풀기
hadoop@pw123-VirtualBox:~/dataexpo$ sed -e '1d' 1987.csv > 1987_temp.csv
첫 줄 지우고 1987_temp.csv로 받기
hadoop@pw123-VirtualBox:~/dataexpo$ head 1987_temp.csv
hadoop@pw123-VirtualBox:~/dataexpo$ mv 1987_temp.csv 1987.csv
http://stat-computing.org/dataexpo/2009/the-data.html
hadoop@pw123-VirtualBox:~/dataexpo$ wget http://stat-computing.org/dataexpo/2009/1987.csv.bz2 (파일 다운로드)
hadoop@pw123-VirtualBox:~/dataexpo$ bzip2 -d 1987.csv.bz2자신의 zip파일을 없애고 압축 풀기
hadoop@pw123-VirtualBox:~/dataexpo$ sed -e '1d' 1987.csv > 1987_temp.csv
첫 줄 지우고 1987_temp.csv로 받기 hadoop@pw123-VirtualBox:~/dataexpo$ head 1987_temp.csv hadoop@pw123-VirtualBox:~/dataexpo$ mv 1987_temp.csv 1987.csv
비행데이터는 1999- 2008년도 까지 있다.
그래서, 이 파일들을 갖고 와 보았다.
hadoop@pw123-VirtualBox:~/dataexpo$ mkdir dataexpo
hadoop@pw123-VirtualBox:~/dataexpo$ cd dataexpo/
hadoop@pw123-VirtualBox:~/dataexpo$ gedit dataexpo.sh //vi 윈도우에 메모장 같은 역할을 하는 gedit
for((i=1987; i<=2008; i++)); do
wget http://stat-computing.org/dataexpo/2009/$i.csv.bz2
bzip2 -d $i.csv.bz2
sed -e '1d' $i.csv > $i_temp.csv
mv $i_temp.csv $i.csv
done
hadoop@pw123-VirtualBox:~/dataexpo$ ./dataexpo.sh //다운로드가 시작 된다.
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -put 1987.csv /user/hadoop/1987.csv
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -ls /user/hadoop/ hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -rmdir /user/1987out //이클립스에서 실행을 할 때 혹시나, 중복된게 있다고 할 수 있기 때문에 ..
[ 이클립스 ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | Class AirlinePerformanceParser package dataexpo; import java.io.Serializable; import org.apache.hadoop.io.Text; public class AirlinePerformanceParser implements Serializable{ private int year; //년도 private int month; //월 private int arriveDelayTime;//도착지연시간 private int departureDelayTime;//출발지연시간 private int distance; //이동거리 private boolean arriveDelayAvailable = true; //도착지연여부 private boolean departureDelayAvailable = true;//출발지연여부 private boolean distanceAvailable = true; //취향여부 private String uniqueCarrier; //항공사코드 public AirlinePerformanceParser(Text text) { try { String[] columns = text.toString().split(","); year = Integer.parseInt(columns[0]); month = Integer.parseInt(columns[1]); uniqueCarrier = columns[8]; if(!columns[15].equals("NA")) { departureDelayTime = Integer.parseInt(columns[15]); }else { departureDelayAvailable = false; } if(!columns[14].equals("NA")) { arriveDelayTime = Integer.parseInt(columns[14]); } else { arriveDelayAvailable = false; } if(!columns[18].equals("NA")) { distance = Integer.parseInt(columns[18]); } else { distanceAvailable = false; } } catch (Exception e) { e.printStackTrace(); } } //getter public int getYear() { return year; } public int getMonth() { return month; } public int getArriveDelayTime() { return arriveDelayTime; } public int getDepartureDelayTime() { return departureDelayTime; } public int getDistance() { return distance; } public boolean isArriveDelayAvailable() { return arriveDelayAvailable; } public boolean isDepartureDelayAvailable() { return departureDelayAvailable; } public boolean isDistanceAvailable() { return distanceAvailable; } public String getUniqueCarrier() { return uniqueCarrier; } } | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | class ArrivalDelayCount package dataexpo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * @author hadoop * 월별 도착지연 건수 출력하기 */ public class ArrivalDelayCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf,"ArrivalDelayCount"); job.setJarByClass(ArrivalDelayCount.class); job.setMapperClass(ArrivalDelayCountMapper.class); job.setReducerClass(DelayCountReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/hadoop/1987.csv")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/hadoop/1987out")); job.waitForCompletion(true); } } | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | class ArrivalDelayCountMapper package dataexpo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ArrivalDelayCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text outkey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { AirlinePerformanceParser parser = new AirlinePerformanceParser(value); outkey.set(parser.getYear() + "," + parser.getMonth()); if(parser.getArriveDelayTime() > 0 && parser.isArriveDelayAvailable()) { context.write(outkey, one); } } } | cs |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | class DelayCountReducer package dataexpo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DelayCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException { int sum = 0; for(IntWritable v : values) { sum += v.get(); } result.set(sum); context.write(key, result); } } |
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -ls /user/hadoop
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -ls /user/hadoop/1987out
hadoop@pw123-VirtualBox:~/dataexpo$ hadoop fs -cat /user/hadoop/1987out/part-r-0000