Search This Blog

Tuesday, 31 October 2017

SPARK SQL SAMPLE QUERY PROCESSING PROBLEM

The Transactions dataset should have the following attributes for each transaction:
TransID: unique sequential number (integer) from 1 to 5,000,000 (the file has 5M transactions)
CustID: References one of the customer IDs, i.e., from 1 to 50,000 (on Avg. a customer has 100 trans.)
TransTotal: random number (float) between 10 and 1000
TransNumItems: random number (integer) between 1 and 10
TransDesc: random text of characters of length between 20 and 50

--------------------------------------------------------------------------------------------

1) T1: Filter out (drop) the transactions from T whose total amount is less than $200
2) T2: Over T1, group the transactions by the Number of Items it has, and for each group
calculate the sum of total amounts, the average of total amounts, the min and the max of
the total amounts.
3) Report back T2 to the client side.
4) T3: Over T1, group the transactions by customer ID, and for each group report the
customer ID, and the transactions’ count.
5) T4: Filter out (drop) the transactions from T whose total amount is less than $600
6) T5: Over T4, group the transactions by customer ID, and for each group report the
customer ID, and the transactions’ count.


Queries in spark sql:


Project3:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> import spark.implicits._
import spark.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

1)
scala> val input = sqlContext.read.json("t.json")
input: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> input.registerTempTable("trans")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> val result = sqlContext.sql("SELECT * from trans where ttotal < 200")
result: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> result.show()
+-----+-------------------+----+------+------+
| cid| desc| tid|titems|ttotal|
+-----+-------------------+----+------+------+
|10111|computeraccessories| 112| 3| 150|
|10111|computeraccessories| 114| 5| 125|
|10111|computeraccessories| 116| 4| 180|
|10114|computeraccessories| 118| 3| 180|
|10115|computeraccessories| 119| 5| 100|
|10112|computeraccessories|1114| 5| 50|
|10112|computeraccessories|1115| 2| 150|

2)
scala> val result = sqlContext.sql("SELECT * from trans where ttotal < 200")
result: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> result.groupBy("titems").sum("ttotal").show()
[Stage 53:=======================> (42 + 1) / 100[Stage

53:============================> (52 + 2) / 100[Stage
53:====================================> (67 + 1) / 100[Stage
53:==========================================> (78 + 1) / 100[Stage
53:==================================================> (92 + 1) /
100 [Stage
55:=====================================> (50 + 1) / 75[Stage
55:==========================================> (57 + 1) / 75[Stage
55:===================================================> (69 + 1) /
75 +------+-----------+
|titems|sum(ttotal)|
+------+-----------+
| 5| 275|
| 3| 330|
| 2| 150|
| 4| 180|
+------+-----------+

scala> result.groupBy("titems").min("ttotal").show()
+------+-----------+
|titems|min(ttotal)|
+------+-----------+
| 5| 50|
| 3| 150|
| 2| 150|
| 4| 180|

scala> result.groupBy("titems").max("ttotal").show()
+------+-----------+
|titems|max(ttotal)|
+------+-----------+
| 5| 125|
| 3| 180|
| 2| 150|
| 4| 180|

3)
4) scala> result.groupBy("cid").count().show()
[Stage 63:===================================> (64 + 1) / 100[Stage
63:===========================================> (79 + 1) / 100[Stage
63:==================================================> (92 + 1) /
100 [Stage
65:===========================================> (58 + 1) / 75[Stage
65:=====================================================> (72 + 1) /
75 +-----+-----+
| cid|count|
+-----+-----+
|10112| 2|
|10114| 1|

|10111| 3|
|10115| 1|

5)
scala> val result3=result2.filter(result2("ttotal")<600)
result3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cid: bigint, desc: string ... 3 more
fields]
scala> result3.show
+-----+-------------------+----+------+------+
| cid| desc| tid|titems|ttotal|
+-----+-------------------+----+------+------+
|10111|computeraccessories| 111| 5| 500|
|10111|computeraccessories| 112| 3| 150|
|10112|computeraccessories| 113| 4| 200|
|10111|computeraccessories| 114| 5| 125|
|10112|computeraccessories| 115| 5| 400|
|10111|computeraccessories| 116| 4| 180|
|10113|computeraccessories| 117| 5| 500|
|10114|computeraccessories| 118| 3| 180|
|10115|computeraccessories| 119| 5| 100|
|10114|computeraccessories|1111| 3| 500|
|10112|computeraccessories|1114| 5| 50|
|10112|computeraccessories|1115| 2| 150|

6)
result3.groupBy("cid").count().show()
scala> result3.groupBy("cid").count().show()
[Stage 11:============================> (10 +[Stage
11:====================================> (13 +[Stage
11:==========================================> (15 +[Stage
11:=====================================================> (19
+ [Stage 13:============> (23 +
[Stage 13:================> (30 + [Stage
13:=====================> (39 + [Stage
13:========================> (45 + [Stage
13:===========================> (50 + [Stage
13:==============================> (56 + [Stage
13:===================================> (65 + [Stage
13:========================================> (73 + [Stage
13:============================================> (81 + [Stage
13:===================================================> (93 + [Stage
13:=====================================================> (98
+ [Stage
15:===================================> (47 +[Stage
15:============================================> (60 +[Stage
15:=====================================================> (71
+ +-----+-----+

| cid|count|
+-----+-----+
|10112| 4|
|10114| 2|
|10111| 4|
|10113| 1|
|10115| 1|

+-----+-----+


Sunday, 13 August 2017

HADOOP STREAMING USING PYTHON

Step1:
/home/training/map.py

#!/usr/bin/python
import sys
for myline in sys.stdin:
 myline = myline.strip()
 words = myline.split()
 for myword in words:
  print '%s\t%s' % (myword, 1)
~                          
Step2:

/home/training/red.py
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
for myline in sys.stdin:
 myline = myline.strip()
 word, count = myline.split('\t', 1)
 try:
    count = int(count)
 except ValueError:
   # Count was not a number, so silently ignore this line continue
    continue
 if current_word == word:
   current_count += count
 else:
   if current_word:
    print '%s\t%s' % (current_word, current_count)
   current_count = count
   current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
   print '%s\t%s' % (current_word, current_count)
~           

         
Step3:
Create input.txt  and move to Hadoop

Step4:

[training@localhost ~]$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-*streaming*.jar -file /home/training/map.py    -mapper /home/training/map.py -file /home/training/red.py   -reducer /home/training/red.py -input /user/training/web/input.txt -output /user/training/web/stout
Step5:
[training@localhost ~]$ hadoop fs -cat  web/stout/part-00000
dd      1
rr      2
tt      1
ww      2



input we have given:

[training@localhost ~]$ cat input.txt
dd rr
rr tt
ww

ww

Map Reduce Program Traffic Data analytics for accidents consumption of Alcohol and over speed

TRAFFIC DATA ACCIDENT ANALYTICS


import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Adriver extends Configured implements Tool {
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.out.printf(
          "Usage: %s [generic options] <input dir> <output dir>\n", getClass()
              .getSimpleName());
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }
    JobConf conf = new JobConf(getConf(), Adriver.class);
    conf.setJobName(this.getClass().getName());  
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  conf.setMapperClass(AMapper.class);
  conf.setReducerClass(AReducer.class);
  conf.setMapOutputValueClass(Text.class);
   conf.setOutputKeyClass(Text.class);
   conf.setOutputValueClass(Text.class);
    JobClient.runJob(conf);
    return 0;
  }
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new Adriver(), args);
    System.exit(exitCode);
  }
}
AMAPPER:
import java.io.IOException;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class AMapper extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, Text>
{
  @Override
 
  public void map(LongWritable key, Text value,
                      OutputCollector<Text, Text> output, Reporter reporter)
                      throws IOException {
                                 
                 
                               
                                  String record = value.toString();
                  String[] parts = record.split(",");
                                                             
                 
        output.collect(new Text(parts[0]), new Text(parts[1]+","+parts[2]+","+parts[5]));
 
 
                 
                 
                 
                 
  }}
Reducer
import java.io.IOException;
import java.util.Iterator;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class AReducer extends MapReduceBase implements
    Reducer<Text, Text, Text, Text> {
  @Override
  public void reduce(Text key, Iterator<Text> values,
      OutputCollector<Text, Text> output, Reporter reporter)
      throws IOException {
    String ac1="yes";
    String al1="yes";
    int sp;
    float sum = 0;
    float alsum=0;
    float  spsum=0;
    float alavg;
    float spavg;
    while (values.hasNext())
     {
      String record = values.next().toString();
      String[] parts=record.split(",");
      String ac=parts[0];                                                       ac  yes
      String al=parts[1];                                                        al   yes
      sp=Integer.parseInt(parts[2]);                                     sp 120
      {
                  sum=sum+1;
                  
                  if(al.equals(al1))
                                                  {
                                  alsum=alsum+1;
                                                                  }
                  
                 if(sp>100)
          {
                  spsum=spsum+1;
                  
          }
         
       }
      }
     //if(spsum==0)
     //{
                //spavg=0;
      //}
     //else
     //{
                 spavg=spsum/sum;
     //}
    
     //if(alsum==0)
     //{
                // alavg=0;
     //}
     //else
     //{
      alavg=alsum/sum;
     //}
  String res=String.valueOf(alavg) +","+String.valueOf(spavg);
     output.collect(key,new Text( res));
    
     }
 
   
  }
 input:
area,accident(yes/no),alcohol(yes/no),time,age,speed
area1,yes,yes,11,32,120      
area1,yes,yes,11,32,120
area2,no,yes,15,34,89
area3,yes,yes,16,36,58
area1,yes,yes,11,38,27
area1,no,yes,12,40,120
area2,yes,yes,6,16,120
area3,yes,no,18,44,100
area1,no,yes,21,2,80
area1,yes,yes,22,18,60
area2,yes,yes,22,23,110
area3,no,yes,22,23,120
area1,no,yes,18,38,100
area1,yes,no,21,40,110
area2,yes,no,22,16,77
area3,no,yes,22,38,88
area1,no,yes,22,40,115
area1,yes,no,16,16,99
area2,yes,no,11,38,88
area3,yes,yes,12,40,115
area1,no,yes,6,16,99
area1,yes,no,16,16,99
area2,yes,no,11,38,88
area3,yes,yes,12,40,115
area1,no,yes,6,16,99
area1,yes,yes,11,32,120
area1,yes,yes,11,32,120
area2,no,yes,15,34,89
area3,yes,yes,16,36,58
area1,yes,yes,11,38,27
area1,no,yes,12,40,120
area2,yes,yes,6,16,120
area3,yes,no,18,44,100
area1,no,yes,21,2,80
area1,yes,yes,22,18,60
area2,yes,yes,22,23,110
area3,no,yes,22,23,120
area1,no,yes,18,38,100
area1,yes,no,21,40,110
area2,yes,no,22,16,77
area3,no,yes,22,38,88
area1,no,yes,22,40,115
area1,yes,no,16,16,99
area2,yes,no,11,38,88
area3,yes,yes,12,40,115
area1,no,yes,6,16,99
area1,yes,no,16,16,99
area2,yes,no,11,38,88
area3,yes,yes,12,40,115
area1,no,yes,6,16,99
----------------------------------------------------------------------
ouput:
[training@localhost ~]$ hadoop fs -cat  web/newacop34/part-00000
area1   0.5714286,0.42857143
area2   0.4,0.4
area3   0.75,0.5
--------------------------------------------------------------

                  

Thursday, 3 August 2017

PIG OPERATIONS

pig operations
PIG  Basic Operators
Operator
Description
Example
Arithmetic Operators
+, -, *, /, %, ?:
X = FOREACH A GENERATE f1, f2, f1%f2;
X = FOREACH A GENERATE f2, (f2==1?1:COUNT(B));
Boolean Operators
and, or, not
X = FILTER A BY (f1==8) OR (NOT (f2+f3 > f1));
Cast Operators
Casting from one datatype to another
B = FOREACH A GENERATE (int)$0 + 1;
B = FOREACH A GENERATE $0 + 1, $1 + 1.0
Comparison Operators
==, !=, >, <, >=, <=, matches
X = FILTER A BY (f1 == 8);
X = FILTER A BY (f2 == ‘apache’);
X = FILTER A BY (f1 matches ‘.*apache.*’);
Construction Operators
Used to construct tuple (), bag {} and map []
B = foreach A generate (name, age);
B = foreach A generate {(name, age)}, {name, age};
B = foreach A generate [name, gpa];
Dereference Operators
dereference tuples (tuple.id or tuple.(id,…)), bags (bag.id or bag.(id,…)) and maps (map#’key’)
X = FOREACH A GENERATE f2.t1,f2.t3 (dereferencing is used to retrieve two fields from tuple f2)
Disambiguate Operator
( :: ) used to identify field names after JOIN, COGROUP, CROSS, or FLATTEN operators
A = load ‘data1’ as (x, y);
B = load ‘data2’ as (x, y, z);
C = join A by x, B by x;
D = foreach C generate A::y;
Flatten Operator
Flatten un-nests tuples as well as bags
consider a relation that has a tuple of the form (a, (b, c)). The expression GENERATE $0, flatten($1), will cause that tuple to become (a, b, c).
Null Operator
is null, is not null
X = FILTER A BY f1 is not null;
Sign Operators
+ -> has no effect, – -> changes the sign of a positive/negative number
A = LOAD ‘data’ as (x, y, z);
B = FOREACH A GENERATE -x, y;
Relational Operators
Operator
Description
Example
COGROUP/GROUP
Groups the data in one or more relations. The COGROUP operator groups together tuples that have the same group key (key field)
A = load ‘student’ AS (name:chararray,age:int,gpa:float);
B = GROUP A BY age;
CROSS
Computes the cross product of two or more relations
X = CROSS A,B A = (1, 2, 3) B = (2, 4)
DUMP X; (4, 2, 1) (8, 9)
(1,2,3,2,4) (1, 3)
(1,2,3,8,9)
(1,2,3,1,3)
(4,2,1,2,4)
(4,2,1,8,9)
(4,2,1,1,3)
DEFINE
Assigns an alias to a UDF or streaming command.
DEFINE CMD `perl PigStreaming.pl – nameMap` input(stdin using PigStreaming(‘,’)) output(stdout using PigStreaming(‘,’));
A = LOAD ‘file’;
B = STREAM B THROUGH CMD;
DISTINCT
Removes duplicate tuples in a relation.
X = DISTINCT A; A = (8,3,4)
DUMP X; (1,2,3)
(1,2,3) (4,3,3)
(4,3,3) (4,3,3)
(8,3,4) (1,2,3)
FILTER
Selects tuples from a relation based on some condition.
X = FILTER A BY f3 == 3; A = (1,2,3)
DUMP X; (4,5,6)
(1,2,3) (7,8,9)
(4,3,3) (4,3,3)
(8,4,3) (8,4,3)
FOREACH
Generates transformation of data for each row as specified
X = FOREACH A GENERATE a1, a2; A = (1,2,3)
DUMP X; (4,2,5)
(1,2) (8,3,6)
(4,2)
(8,3)
IMPORT
Import macros defined in a separate file.
/* myscript.pig */
IMPORT ‘my_macro.pig’;
JOIN
Performs an inner join of two or more relations based on common field values.
X = JOIN A BY a1, B BY b1;
DUMP X
(1,2,1,3) A = (1,2) B = (1,3)
(1,2,1,2) (4,5) (1,2)
(4,5,4,7) (4,7)
LOAD
Loads data from the file system.
A = LOAD ‘myfile.txt’;
LOAD ‘myfile.txt’ AS (f1:int, f2:int, f3:int);
MAPREDUCE
Executes native MapReduce jobs inside a Pig script.
A = LOAD ‘WordcountInput.txt’;
B = MAPREDUCE ‘wordcount.jar’ STORE A INTO ‘inputDir’ LOAD ‘outputDir’
AS (word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;
ORDERBY
Sorts a relation based on one or more fields.
A = LOAD ‘mydata’ AS (x: int, y: map[]);
B = ORDER A BY x;
SAMPLE
Partitions a relation into two or more relations, selects a random data sample with the stated sample size.
Relation X will contain 1% of the data in relation A.
A = LOAD ‘data’ AS (f1:int,f2:int,f3:int);
X = SAMPLE A 0.01;
SPLIT
Partitions a relation into two or more relations based on some expression.
SPLIT input_var INTO output_var IF (field1 is not null), ignored_var IF (field1 is null);
STORE
Stores or saves results to the file system.
STORE A INTO ‘myoutput’ USING PigStorage (‘*’);
1*2*3
4*2*1
STREAM
Sends data to an external script or program
A = LOAD ‘data’;
B = STREAM A THROUGH `stream.pl -n 5`;
UNION
Computes the union of two or more relations. (Does not preserve the order of tuples)
X = UNION A, B; A = (1,2,3) B = (2,4)
DUMP X; (4,2,1) (8,9)
(1,2,3) (1,3)
(4,2,1)
(2,4)
(8,9)
(1,3)
Functions
Function
Syntax
Description
AVG
AVG(expression
Computes the average of the numeric values in a single-column bag.
CONCAT
CONCAT (expression, expression)
Concatenates two expressions of identical type.
COUNT
COUNT(expression)
Computes the number of elements in a bag, it ignores null.
COUNT_STAR
COUNT_STAR(expression)
Computes the number of elements in a bag, it includes null.
DIFF
DIFF (expression, expression)
Compares two fields in a tuple, any tuples that are in one bag but not the other are returned in a bag.
DIFF
DIFF (expression, expression)
Compares two fields in a tuple, any tuples that are in one bag but not the other are returned in a bag.
IsEmpty
IsEmpty(expression)
Checks if a bag or map is empty.
MAX
MAX(expression)
Computes the maximum of the numeric values or chararrays in a single-column bag
MIN
MIN(expression)
Computes the minimum of the numeric values or chararrays in a single-column bag.
SIZE
SIZE(expression)
Computes the number of elements based on any Pig data type. SIZE includes NULL values in the size computation
SUM
SUM(expression)
Computes the sum of the numeric values in a single-column bag.
TOKENIZE
TOKENIZE(expression [, ‘field_delimiter’])
Splits a string and outputs a bag of words.
Load/Store Functions
FUnction
Syntax
Description
Handling Compression
A = load ‘myinput.gz’;
store A into ‘myoutput.gz’;
PigStorage and TextLoader support gzip and bzip compression for both read (load) and write (store). BinStorage does not support compression.
BinStorage
A = LOAD ‘data’ USING BinStorage();
Loads and stores data in machine-readable format.
JsonLoader, JsonStorage
A = load ‘a.json’ using JsonLoader();
Load or store JSON data.
PigDump
STORE X INTO ‘output’ USING PigDump();
Stores data in UTF-8 format.
PigStorage
A = LOAD ‘student’ USING PigStorage(‘\t’) AS (name: chararray, age:int, gpa: float);
Loads and stores data as structured text files.
TextLoader
A = LOAD ‘data’ USING TextLoader();
Loads unstructured data in UTF-8 format.
Math Functions
Operator
Description
Example
ABS
ABS(expression)
Returns the absolute value of an expression. If the result is not negative (x ≥ 0), the result is returned. If the result is negative (x < 0), the negation of the result is returned.
ACOS
ACOS(expression)
Returns the arc cosine of an expression.
ASIN
ASIN(expression)
Returns the arc sine of an expression.
ATAN
ATAN(expression)
Returns the arc tangent of an expression.
CBRT
CBRT(expression)
Returns the cube root of an expression.
CEIL
CEIL(expression)
Returns the value of an expression rounded up to the nearest integer. This function never decreases the result value.
COS
COS(expression)
Returns the trigonometric cosine of an expression.
COSH
COSH(expression)
Returns the hyperbolic cosine of an expression.
EXP
EXP(expression)
Returns Euler’s number e raised to the power of x.
FLOOR
FLOOR(expression)
Returns the value of an expression rounded down to the nearest integer. This function never increases the result value.
LOG
LOG(expression)
Returns the natural logarithm (base e) of an expression.
LOG10
LOG10(expression)
Returns the base 10 logarithm of an expression.
RANDOM
RANDOM( )
Returns a pseudo random number (type double) greater than or equal to 0.0 and less than 1.0.
ROUND
ROUND(expression)
Returns the value of an expression rounded to an integer (if the result type is float) or rounded to a long (if the result type is double).
SIN
SIN(expression)
Returns the sine of an expression.
SINH
SINH(expression)
Returns the hyperbolic sine of an expression.
SQRT
SQRT(expression)
Returns the positive square root of an expression.
TAN
TAN(expression)
Returns the trignometric tangent of an angle.
TANH
TANH(expression)
Returns the hyperbolic tangent of an expression.
String Functions
Operator
Description
Example
INDEXOF
INDEXOF(string, ‘character’, startIndex)
Returns the index of the first occurrence of a character in a string, searching forward from a start index.
LAST_INDEX
LAST_INDEX_OF(expression)
Returns the index of the last occurrence of a character in a string, searching backward from a start index.
LCFIRST
LCFIRST(expression)
Converts the first character in a string to lower case.
LOWER
LOWER(expression)
Converts all characters in a string to lower case.
REGEX_EXTRACT
REGEX_EXTRACT (string, regex, index)
Performs regular expression matching and extracts the matched group defined by an index parameter. The function uses Java regular expression form.
REGEX_EXTRACT_ALL
REGEX_EXTRACT (string, regex)
Performs regular expression matching and extracts all matched groups. The function uses Java regular expression form.
REPLACE
REPLACE(string, ‘oldChar’, ‘newChar’);
Replaces existing characters in a string with new characters.
STRSPLIT
STRSPLIT(string, regex, limit)
Splits a string around matches of a given regular expression.
SUBSTRING
SUBSTRING(string, startIndex, stopIndex)
Returns a substring from a given string.
TRIM
TRIM(expression)
Returns a copy of a string with leading and trailing white space removed.
UCFIRST
UCFIRST(expression)
Returns a string with the first character converted to upper case.
UPPER
UPPER(expression)
Returns a string converted to upper case.
Tuple, Bag, Map Functions
Operator
Description
Example
TOTUPLE
TOTUPLE(expression [, expression …])
Converts one or more expressions to type tuple.
TOBAG
TOBAG(expression [, expression …])
Converts one or more expressions to individual tuples which are then placed in a bag.
TOMAP
TOMAP(key-expression, value-expression [, key-expression, value-expression …])
Converts key/value expression pairs into a map. Needs an even number of expressions as parameters. The elements must comply with map type rules.
TOP
TOP(topN,column,relation)
Returns the top-n tuples from a bag of tuples.
User Defined Functions (UDFs)
Pig provides extensive support for user defined functions (UDFs) as a way to specify custom processing. Pig UDFs can currently be implemented in three languages: Java, Python, JavaScript and Ruby.
Registering UDFs
Registering Java UDFs:
---register_java_udf.pig
register 'your_path_to_piggybank/piggybank.jar';
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
Registering Python UDFs (The Python script must be in your current directory):
--register_python_udf.pig
register 'production.py' using jython as bballudfs;
players  = load 'baseball' as (name:chararray, team:chararray,
                pos:bag{t:(p:chararray)}, bat:map[]);
Writing UDFs
Java UDFs:
package myudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class UPPER extends EvalFunc
{
   public String exec(Tuple input) throws IOException {
       if (input == null || input.size() == 0)
           return null;
           try{
              String str = (String)input.get(0);
              return str.toUpperCase();
           }catch(Exception e){
              throw new IOException("Caught exception processing input row ", e);
           }
      }
  }
Python UDFs
#Square - Square of a number of any data type
@outputSchemaFunction("squareSchema") -- Defines a script delegate function that defines schema for this function depending upon the input type.
def square(num):
   return ((num)*(num))
@schemaFunction("squareSchema") --Defines delegate function and is not registered to Pig.
 def squareSchema(input):
   return input

 #Percent- Percentage
 @outputSchema("percent:double") --Defines schema for a script UDF in a format that Pig understands and is able to parse
 def percent(num, total):
   return num * 100 / total
Data Types
SIMPLE TYPES
Operator
Description
Example
int
Signed 32-bit integer
10
long
Signed 64-bit integer
Data: 10L or 10l
Display: 10L
float
32-bit floating point
Data: 10.5F or 10.5f or 10.5e2f or 10.5E2F
Display: 10.5F or 1050.0F
double
64-bit floating point
Data: 10.5 or 10.5e2 or 10.5E2
Display: 10.5 or 1050.0
chararray
Character array (string) in Unicode UTF-8 format
hello world
bytearray
Byte array (blob)
boolean
boolean
true/false (case insensitive)
Complex Types
Operator
Description
Example
tuple
An ordered set of fields.
(19,2)
bag
An collection of tuples.
{(19,2), (18,1)}
map
A set of key value pairs.
[name#John,phone#5551212]

TABLE CREATION
CRATE A  SAMPLE TABLE EMP(ENO,ENAME,SAL)


Hadoop Analytics

NewolympicData

  Alison Bartosik 21 United States 2004 08-29-04 Synchronized Swimming 0 0 2 2 Anastasiya Davydova 21 Russia 2004 0...