A,0,0,1
A,0,1,2
A,1,0,3
A,1,1,4
B,0,0,5
B,0,1,6
B,1,0,7
B,1,1,8
======================================
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MatrixDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MatrixDriver <input> <output>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "Matrix Multiplication");
job.setJarByClass(MatrixDriver.class);
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
===================================================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Input format: MatrixName,i,j,value
// Example: A,0,0,2
String[] parts = value.toString().split(",");
String matrix = parts[0];
int i = Integer.parseInt(parts[1]);
int j = Integer.parseInt(parts[2]);
int val = Integer.parseInt(parts[3]);
int N = 2; // assume 2x2 matrices for simplicity
if (matrix.equals("A")) {
// A[i][k] goes to all C[i][j]
for (int col = 0; col < N; col++) {
context.write(new Text(i + "," + col),
new Text("A," + j + "," + val));
}
} else {
// B[k][j] goes to all C[i][j]
for (int row = 0; row < N; row++) {
context.write(new Text(row + "," + j),
new Text("B," + i + "," + val));
}
}
}
}
================================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// For simplicity, assume max 10 values
int[] Aval = new int[10];
int[] Bval = new int[10];
for (Text t : values) {
String[] parts = t.toString().split(",");
String m = parts[0];
int index = Integer.parseInt(parts[1]);
int val = Integer.parseInt(parts[2]);
if (m.equals("A")) {
Aval[index] = val;
} else {
Bval[index] = val;
}
}
int sum = 0;
for (int k = 0; k < 10; k++) {
sum += Aval[k] * Bval[k];
}
context.write(key, new Text(Integer.toString(sum)));
}
}
No comments:
Post a Comment