import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
// Define matrix dimensions (m, n, p)
private static final int m = 2; // Rows in matrix A
private static final int n = 2; // Columns in matrix A / Rows in matrix B
private static final int p = 2; // Columns in matrix B
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Input format: "MatrixName,i,j,Value"
String[] tokens = value.toString().split(",");
String matrixName = tokens[0]; // "A" or "B"
int row = Integer.parseInt(tokens[1]); // i or k
int col = Integer.parseInt(tokens[2]); // k or j
double val = Double.parseDouble(tokens[3]); // A[i][k] or B[k][j]
if (matrixName.equals("A")) {
// Emit (i, j) as key and (A, k, A[i][k]) as value for all j in 1 to p
for (int j = 1; j <= p; j++) {
context.write(new Text(row + "," + j), new Text("A," + col + "," + val));
}
} else if (matrixName.equals("B")) {
// Emit (i, j) as key and (B, k, B[k][j]) as value for all i in 1 to m
for (int i = 1; i <= m; i++) {
context.write(new Text(i + "," + col), new Text("B," + row + "," + val));
}
}
}
}
----------------
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MatrixReducer extends Reducer<Text, Text, Text, DoubleWritable> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// Store partial products from matrix A and B
Map<Integer, Double> aMap = new HashMap<Integer, Double>(); // Explicit type declaration
Map<Integer, Double> bMap = new HashMap<Integer, Double>(); // Explicit type declaration
// Iterate through all values for the given key (i, j)
for (Text value : values) {
String[] tokens = value.toString().split(",");
String matrixName = tokens[0]; // "A" or "B"
int k = Integer.parseInt(tokens[1]); // k index
double val = Double.parseDouble(tokens[2]); // A[i][k] or B[k][j]
if (matrixName.equals("A")) {
aMap.put(k, val); // Store A[i][k]
} else if (matrixName.equals("B")) {
bMap.put(k, val); // Store B[k][j]
}
}
// Compute the dot product for C[i][j]
double result = 0.0;
for (int k : aMap.keySet()) {
result += aMap.get(k) * bMap.get(k); // Sum(A[i][k] * B[k][j])
}
// Emit the result as (i, j), C[i][j]
context.write(key, new DoubleWritable(result));
}
}
----------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MatrixMultiplication {
public static void main(String[] args) throws Exception {
// Check input arguments
if (args.length != 2) {
System.err.println("Usage: MatrixMultiplication <input path> <output path>");
System.exit(-1);
}
// Create a Hadoop job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Matrix Multiplication");
job.setJarByClass(MatrixMultiplication.class);
// Set Mapper and Reducer classes
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
// Set output key and value types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// Set input and output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Run the job and wait for completion
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
A,1,1,1.0
A,1,2,2.0
A,2,1,3.0
A,2,2,4.0
------------------------------------
B,1,1,5.0
B,1,2,6.0
B,2,1,7.0
B,2,2,8.0
----------------------------
No comments:
Post a Comment