userId,movieId,tag,timestamp
15,1193,good plot,16234567
15,1193,classic,16234570
20,1200,funny,16234600
35,1200,boring,16234620
35,1193,emotional,16234625
==================================
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TagsMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString().trim();
if (line.isEmpty()) return;
// Skip header
if (line.toLowerCase().startsWith("userid")) return;
String[] parts = line.split(",", 4);
if (parts.length < 3) return;
String movieId = parts[1].trim();
String tag = parts[2].trim();
if (!movieId.isEmpty() && !tag.isEmpty()) {
context.write(new Text(movieId), new Text(tag));
}
}
}
================================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.Iterator;
public class TagsReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws java.io.IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
Iterator<Text> it = values.iterator();
while (it.hasNext()) {
sb.append(it.next().toString());
if (it.hasNext()) sb.append(", ");
}
context.write(key, new Text(sb.toString()));
}
}
======================================
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TagsDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: TagsDriver <input> <output>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "Movie Tags Extraction");
job.setJarByClass(TagsDriver.class);
job.setMapperClass(TagsMapper.class);
job.setReducerClass(TagsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
No comments:
Post a Comment