Date,TempC,Humidity,WindKmph,PrecipMM2025-08-20,36,55,12,02025-08-20,34,88,15,12025-08-21,28,60,45,02025-08-21,24,82,18,32025-08-22,6,70,8,02025-08-22,2,65,10,02025-08-23,30,50,10,222025-08-24,31,85,8,0
Classification rules (edit freely)
PrecipMM >= 20
→ Heavy Rain (severity 6)PrecipMM >= 2
→ Rainy (severity 5)TempC >= 35
→ Hot (severity 4)TempC <= 5
→ Cold (severity 3)WindKmph >= 40
→ Windy (severity 2)Humidity >= 80
→ Humid (severity 1)else → Clear (severity 0)
========================================
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WeatherConditionsMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
String line = value.toString().trim();
if (line.isEmpty()) return;
// Skip header if present
if (line.toLowerCase().startsWith("date,")) return;
String[] parts = line.split(",");
if (parts.length < 5) return; // bad row
String date = parts[0].trim();
try {
double tempC = Double.parseDouble(parts[1].trim());
double humidity = Double.parseDouble(parts[2].trim());
double wind = Double.parseDouble(parts[3].trim());
double precip = Double.parseDouble(parts[4].trim());
// Compute severity + message
int severity; String message;
if (precip >= 20) { severity = 6; message = "Heavy Rain"; }
else if (precip >= 2) { severity = 5; message = "Rainy"; }
else if (tempC >= 35) { severity = 4; message = "Hot"; }
else if (tempC <= 5) { severity = 3; message = "Cold"; }
else if (wind >= 40) { severity = 2; message = "Windy"; }
else if (humidity >= 80) { severity = 1; message = "Humid"; }
else { severity = 0; message = "Clear"; }
// Emit: key = date, value = "severity|message"
context.write(new Text(date), new Text(severity + "|" + message));
} catch (NumberFormatException e) {
// skip invalid numeric rows
}
}
}
==========================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WeatherConditionsReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
int bestSeverity = -1;
String bestMessage = "";
for (Text t : values) {
String[] kv = t.toString().split("\\|");
if (kv.length != 2) continue;
try {
int sev = Integer.parseInt(kv[0]);
String msg = kv[1];
if (sev > bestSeverity) { bestSeverity = sev; bestMessage = msg; }
} catch (NumberFormatException ignored) {}
}
// Output: Date \t Message
if (bestSeverity >= 0) {
context.write(key, new Text(bestMessage));
}
}
}
======================================
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WeatherConditionsDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WeatherConditionsDriver <input> <output>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "Daily Weather Conditions");
job.setJarByClass(WeatherConditionsDriver.class);
job.setMapperClass(WeatherConditionsMapper.class);
job.setReducerClass(WeatherConditionsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
No comments:
Post a Comment