Search This Blog

Saturday, 30 August 2025

pigdemo-1

 1. first create data file emp111.txt in ur LFS

2. MOVE to HDFS

3. OPen vi editor type Pig Script


4. vi pig1.pig


bag1= load 'emp.txt' using PigStorage(',')as  (eno:int,ename:chararray,sal:int);

bag2= filter bag1 by sal > 34000;

store bag2 into 'pigoutput11' using PigStorage(',');




5. execute script

 pig pig1.pig


-------------------------------------------------

avg salary group by clause

-----------------------------------------


bag1= load 'emp11.txt' using PigStorage(',')as

       (eno:int,ename:chararray,sal:int,dname:chararray);

bag2= group bag1 by dname;

bag3= foreach bag2 generate bag1.dname, AVG(bag1.sal);

store bag3 into 'avgop2' using PigStorage(',');


Friday, 29 August 2025

matrix ops

 A,0,0,1

A,0,1,2

A,1,0,3

A,1,1,4

B,0,0,5

B,0,1,6

B,1,0,7

B,1,1,8


======================================
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MatrixDriver {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MatrixDriver <input> <output>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        Job job = new Job(conf, "Matrix Multiplication");

        job.setJarByClass(MatrixDriver.class);
        job.setMapperClass(MatrixMapper.class);
        job.setReducerClass(MatrixReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
===================================================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        // Input format: MatrixName,i,j,value
        // Example: A,0,0,2
        String[] parts = value.toString().split(",");
        String matrix = parts[0];
        int i = Integer.parseInt(parts[1]);
        int j = Integer.parseInt(parts[2]);
        int val = Integer.parseInt(parts[3]);

        int N = 2;  // assume 2x2 matrices for simplicity

        if (matrix.equals("A")) {
            // A[i][k] goes to all C[i][j]
            for (int col = 0; col < N; col++) {
                context.write(new Text(i + "," + col),
                        new Text("A," + j + "," + val));
            }
        } else {
            // B[k][j] goes to all C[i][j]
            for (int row = 0; row < N; row++) {
                context.write(new Text(row + "," + j),
                        new Text("B," + i + "," + val));
            }
        }
    }
}

================================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        // For simplicity, assume max 10 values
        int[] Aval = new int[10];
        int[] Bval = new int[10];

        for (Text t : values) {
            String[] parts = t.toString().split(",");
            String m = parts[0];
            int index = Integer.parseInt(parts[1]);
            int val = Integer.parseInt(parts[2]);

            if (m.equals("A")) {
                Aval[index] = val;
            } else {
                Bval[index] = val;
            }
        }

        int sum = 0;
        for (int k = 0; k < 10; k++) {
            sum += Aval[k] * Bval[k];
        }

        context.write(key, new Text(Integer.toString(sum)));
    }
}

movie lens

 userId,movieId,tag,timestamp

15,1193,good plot,16234567

15,1193,classic,16234570

20,1200,funny,16234600

35,1200,boring,16234620

35,1193,emotional,16234625


==================================


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


public class TagsMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override

    protected void map(LongWritable key, Text value, Context context)

            throws java.io.IOException, InterruptedException {

        String line = value.toString().trim();

        if (line.isEmpty()) return;


        // Skip header

        if (line.toLowerCase().startsWith("userid")) return;


        String[] parts = line.split(",", 4);

        if (parts.length < 3) return;


        String movieId = parts[1].trim();

        String tag = parts[2].trim();


        if (!movieId.isEmpty() && !tag.isEmpty()) {

            context.write(new Text(movieId), new Text(tag));

        }

    }

}

================================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.util.Iterator;

public class TagsReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws java.io.IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        Iterator<Text> it = values.iterator();

        while (it.hasNext()) {
            sb.append(it.next().toString());
            if (it.hasNext()) sb.append(", ");
        }

        context.write(key, new Text(sb.toString()));
    }
}
======================================

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TagsDriver {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: TagsDriver <input> <output>");
            System.exit(-1);
        }

        Configuration conf = new Configuration();
        Job job = new Job(conf, "Movie Tags Extraction");
        job.setJarByClass(TagsDriver.class);

        job.setMapperClass(TagsMapper.class);
        job.setReducerClass(TagsReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

WEATHER-1

Date,TempC,Humidity,WindKmph,PrecipMM
2025-08-20,36,55,12,0
2025-08-20,34,88,15,1
2025-08-21,28,60,45,0
2025-08-21,24,82,18,3
2025-08-22,6,70,8,0
2025-08-22,2,65,10,0
2025-08-23,30,50,10,22
2025-08-24,31,85,8,0




Classification rules (edit freely)

  1. PrecipMM >= 20Heavy Rain (severity 6)

  2. PrecipMM >= 2Rainy (severity 5)

  3. TempC >= 35Hot (severity 4)

  4. TempC <= 5Cold (severity 3)

  5. WindKmph >= 40Windy (severity 2)

  6. Humidity >= 80Humid (severity 1)

  7. else → Clear (severity 0)

========================================

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeatherConditionsMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
String line = value.toString().trim();
if (line.isEmpty()) return;
// Skip header if present
if (line.toLowerCase().startsWith("date,")) return;

String[] parts = line.split(",");
if (parts.length < 5) return; // bad row

String date = parts[0].trim();
try {
double tempC = Double.parseDouble(parts[1].trim());
double humidity = Double.parseDouble(parts[2].trim());
double wind = Double.parseDouble(parts[3].trim());
double precip = Double.parseDouble(parts[4].trim());

// Compute severity + message
int severity; String message;
if (precip >= 20) { severity = 6; message = "Heavy Rain"; }
else if (precip >= 2) { severity = 5; message = "Rainy"; }
else if (tempC >= 35) { severity = 4; message = "Hot"; }
else if (tempC <= 5) { severity = 3; message = "Cold"; }
else if (wind >= 40) { severity = 2; message = "Windy"; }
else if (humidity >= 80) { severity = 1; message = "Humid"; }
else { severity = 0; message = "Clear"; }

// Emit: key = date, value = "severity|message"
context.write(new Text(date), new Text(severity + "|" + message));
} catch (NumberFormatException e) {
// skip invalid numeric rows
}
}
}
==========================================
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WeatherConditionsReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
int bestSeverity = -1;
String bestMessage = "";

for (Text t : values) {
String[] kv = t.toString().split("\\|");
if (kv.length != 2) continue;
try {
int sev = Integer.parseInt(kv[0]);
String msg = kv[1];
if (sev > bestSeverity) { bestSeverity = sev; bestMessage = msg; }
} catch (NumberFormatException ignored) {}
}

// Output: Date \t Message
if (bestSeverity >= 0) {
context.write(key, new Text(bestMessage));
}
}
}
======================================
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WeatherConditionsDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WeatherConditionsDriver <input> <output>");
System.exit(-1);
}

Configuration conf = new Configuration();
Job job = new Job(conf, "Daily Weather Conditions");
job.setJarByClass(WeatherConditionsDriver.class);

job.setMapperClass(WeatherConditionsMapper.class);
job.setReducerClass(WeatherConditionsReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Hadoop Analytics

pigdemo-1

 1. first create data file emp111.txt in ur LFS 2. MOVE to HDFS 3. OPen vi editor type Pig Script 4. vi pig1.pig bag1= load 'emp.txt...