Search This Blog

Tuesday, 7 March 2017

pig notes

PIG
Pig was originally created at Yahoo! to answer a similar need to Hive
  • Many developers did not have the Java and/or MapReduce knowledge required to write standard MapReduce programs
  • But still needed to query data

Pig is a dataflow language
  • Language is called PigLatin.
  • Relatively simple syntax.
  • Under the covers, PigLatin scripts are turned into MapReduce jobs and executed on the cluster.
  • Installation of Pig requires no modification to the cluster
  • The Pig interpreter runs on the client machine
  • Turns PigLatin into standard Java MapReduce jobs, which are then submitted to the JobTracker
  • There is (currently) no shared metadata, so no need for a shared metastore of any kind.

Pig Concepts:
  • In Pig, a single element of data is an atom
  • A collection of atoms – such as a row, or a partial row – is a tuple
  • Tuples are collected together into bags
  • Typically, a PigLatin script starts by loading one or more datasets into bags, and then creates new bags by modifying those it already has existed.

Pig Features
  • Pig supports many features which allow developers to perform sophisticated data analysis without writing Java MapReduce code
Joining datasets
Grouping data
Referring to elements by position rather than name
Useful for datasets with many elements
Loading non-delimited data using a custom SerDe
Creation of user-defined functions, written in Java
And more


A Sample Pig Script:

  • Here, we load a file into a bag called emps
  • Then we create a new bag called rich which contains just those
  • records where the salary portion is greater than 100000
  • Finally, we write the contents of the srtd bag to a new directory in HDFS
  • By default, the data will be written in tab-separated format Alternatively, to write the contents of a bag to the screen, say

Let’s look at a simple example by writing the program to calculate the maximum recorded temperature by year for the weather dataset in Pig Latin. The complete program is only a few lines long:

-- max_temp.pig: Finds the maximum temperature by year

grunt>records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);

grunt> filtered_records = FILTER records BY temperature != 9999 AND
 (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);

grunt>grouped_records = GROUP filtered_records BY year;

grunt>max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);

grunt>DUMP max_temp;

For simplicity, the program assumes that the input is tab-delimited text, with each line
having just year, temperature, and quality fields.

This line describes the input data we want to process. The year:chararray notation describes the field’s name and type; a chararray is like a Java string, and an int is like a Java int. The LOAD
operator takes a URI argument; here we are just using a local file, but we could refer to an HDFS URI. The AS clause (which is optional) gives the fields names to make it convenient to refer to them in subsequent statements.

A tuple is just like a row of data in a database table, with multiple fields in a particular order. In this example, the LOAD function produces a set of (year,temperature, quality) tuples that are present in the input file. We write a relation with one tuple per line, where tuples are represented as comma-separated items in parentheses:
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)

Relations are given names, or aliases, so they can be referred to. This relation is given
the records alias. We can examine the contents of an alias using the DUMP operator:
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

We can also see the structure of a relation—the relation’s schema—using the
DESCRIBE operator on the relation’s alias:

grunt> DESCRIBE records;
records: {year: chararray,temperature: int,quality: int}

The second statement removes records that have a missing temperature (indicated by
a value of 9999) or an unsatisfactory quality reading. For this small dataset, no records
are filtered out:
grunt> filtered_records = FILTER records BY temperature != 9999 AND
>> (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);

grunt> DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
The third statement uses the GROUP function to group the records relation by the
year field. Let’s use DUMP to see what it produces:

grunt> grouped_records = GROUP filtered_records BY year;
grunt> DUMP grouped_records;
(1949,{(1949,111,1),(1949,78,1)})
(1950,{(1950,0,1),(1950,22,1),(1950,-11,1)})

We now have two rows, or tuples, one for each year in the input data. The first field in
each tuple is the field being grouped by (the year), and the second field is a bag of tuples
for that year. A bag is just an unordered collection of tuples, which in Pig Latin is represented using curly braces.

By grouping the data in this way, we have created a row per year, so now all that remains is to find the maximum temperature for the tuples in each bag. Before we do this, let’s
understand the structure of the grouped_records relation:

grunt> DESCRIBE grouped_records;
grouped_records: {group: chararray,filtered_records: {year: chararray,temperature: int,quality: int}}

This tells us that the grouping field is given the alias group by Pig, and the second field
is the same structure as the filtered_records relation that was being grouped. With
this information, we can try the fourth transformation:

grunt> max_temp = FOREACH grouped_records GENERATE group,
>> MAX(filtered_records.temperature);

FOREACH processes every row to generate a derived set of rows, using a GENERATE
clause to define the fields in each derived row. In this example, the first field is
group, which is just the year. The second field is a little more complex.
The filtered_records.temperature reference is to the temperature field of the
filtered_records bag in the grouped_records relation. MAX is a built-in function for
calculating the maximum value of fields in a bag. In this case, it calculates the maximum
temperature for the fields in each filtered_records bag. Let’s check the result:
grunt> DUMP max_temp;
(1949,111)
(1950,22)
So we’ve successfully calculated the maximum temperature for each year.

With the ILLUSTRATE operator, Pig provides a tool for generating a reasonably complete
and concise dataset. Here is the output from running ILLUSTRATE
grunt> ILLUSTRATE max_temp;
-------------------------------------------------------------------------------
| records | year:chararray | temperature:int | quality:int |
-------------------------------------------------------------------------------
| | 1949 | 78 | 1 |
| | 1949 | 111 | 1 |
| | 1949 | 9999 | 1 |
-------------------------------------------------------------------------------
----------------------------------------------------------------------------------------
| filtered_records | year:chararray | temperature:int | quality:int |
----------------------------------------------------------------------------------------
| | 1949 | 78 | 1 |
| | 1949 | 111 | 1 |
----------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------
| grouped_records | group:chararray | filtered_records:bag{:tuple(year:chararray, |
temperature:int,quality:int)} |
--------------------------------------------------------------------------------------------
| | 1949 | {(1949, 78, 1), (1949, 111, 1)} |
--------------------------------------------------------------------------------------------
---------------------------------------------------
| max_temp | group:chararray | :int |
---------------------------------------------------
| | 1949 | 111 |
---------------------------------------------------

 Pig Latin relational operators:

Category                                 Operator                                            Description
Loading and storing                LOAD                                                   Loads data from the
filesystem or other storage into a relation

STORE                                                 Saves a relation to the
                                                            filesystem or other storage

DUMP                                                  Prints a relation to the
                                                            console

Filtering                                   FILTER                                                             Removes unwanted rows
                                                                                                            from a relation

DISTINCT                                             Removes duplicate rows
                                                            from a relation

FOREACH...GENERATE                        Adds or removes fields from
                                                            a relation

MAPREDUCE                                       Runs a MapReduce job using
                                                            a relation as input
STREAM                                              Transforms a relation using
                                                            an external program


SAMPLE                                               Selects a random sample of a
                                                            relation

Grouping and joining              JOIN                                                     Joins two or more relations

COGROUP                                           Groups the data in two or
                                                            more relations

GROUP                                                Groups the data in a single
                                                            relation

CROSS                                                             Creates the cross-product of
                                                            two or more relations
           
Sorting                                     ORDER                                                Sorts a relation by one or
                                                                                                            more fields

LIMIT                                                   Limits the size of a relation to
a maximum number of tuples

Combining and splitting          UNION                                                 Combines two or more
                                                                                                            relations into one

SPLIT                                                    Splits a relation into two or
                                                            more relations

Pig Latin diagnostic operators:
Operator                                            Description
DESCRIBE                                            Prints a relation’s schema

EXPLAIN                                               Prints the logical and physical plans

ILLUSTRATE                                         Shows a sample execution of the logical plan, using a generated subset of the input

Category                                 Command                                          Description
Hadoop Filesystem
cat                                       Prints the contents of one or more files
   
    cd                                        Changes the current directory
  
   copyFromLocal                     Copies a local file or directory to a Hadoop filesystem

 copyToLocal                         Copies a file or directory on a Hadoop filesystem to the local filesystem

cp                                          Copies a file or directory to another directory
 
   fs                                          Accesses Hadoop’s filesystem shell
 
  ls                                            Lists files

 mkdir                          Creates a new directory
   
mv                                           Moves a file or directory to another directory

pwd                                           Prints the path of the current working directory

rm                                             Deletes a file or directory

rmf                                           Forcibly deletes a file or directory (does not fail if the file or directory does not exist)

                  
Hadoop MapReduce
 
  kill                                         Kills a MapReduce job

Utility                                        exec                                       Runs a script in a new Grunt shell

 Help                                       Shows the available commands and options

 quit                                         Exits the interpreter
 run                                          Runs a script within the existing Grunt shell

 set                                          Sets Pig options and MapReduce job properties

sh                                            Run a shell command from within Grunt

The filesystem commands can operate on files or directories in any Hadoop filesystem, and they are very similar to the hadoop fs commands (which is not surprising, as both are simple wrappers around the Hadoop FileSystem interface). You can access all of
the Hadoop filesystem shell commands using Pig’s fs command. For example, fs -ls will show a file listing, and fs -help will show help on all the available commands.


Types

Pig has four numeric types:

Int
Long
Float
double,
which are identical to theirava counterparts. There is also a bytearray type, like Java’s byte array type for representing blob of binary data, and chararray, which, like java.lang.String, represents textual data in UTF-16 format, although it can be loaded or stored in UTF-8 format.
Pig does not have types corresponding to Java’s boolean,6 byte, short, or char primitive types.  These are all easily represented using Pig’s int type, or chararray for char.

The numeric, textual, and binary types are simple atomic types. Pig Latin also has three complex types for representing nested structures: tuple, bag, and map.

Category                     Type                Description                                         Literal example

Numeric                      int                    32-bit signed integer                          1

long                 64-bit signed integer                          1L

float                 32-bit floating-point number              1.0F

double             64-bit floating-point number              1.0

Text                             chararray        Character array in UTF-16 format     'a'
Binary                          bytearray        Byte array                                           Not supported

Complex                      tuple                Sequence of fields of any type            (1,'pomegranate')

Bag                  An unordered collection
of tuples, possibly with
             duplicates                               {(1,'pomegranate'),(2)}

map                 A set of key-value pairs.
 Keys must be character arrays;
values may be any type          ['a'#'pomegranate']

Schemas
A relation in Pig may have an associated schema, which gives the fields in the relation names and types. We’ve seen how an AS clause in a LOAD statement is used to attach a schema to a relation:

grunt> env = LOAD '/user/training/climate’
AS (year:int, temp:int, city:chararray);
grunt> DESCRIBE env;
records: {year: int, temp: int, city:chararray}

Pig is more flexible in defining schema. First we can load data and then we can decide which data type is suitable for the field names, where as in SQL data types can be declared before the data is loaded into the system. Pig is designed for analyzing plain input files with no
Associated type information, so it is quite natural to choose types for fields later than you would with an RDBMS.

It’s also possible to omit type declarations completely,:

grunt> env = LOAD '/home/training/climate'  AS (year, temp, city);
grunt> DESCRIBE env;
records: {year: bytearray, temp: bytearray, city: bytearray}

In this case, we have specified only the names of the fields in the schema as  year, temp, and city. The types default to bytearray, the most general type, representing a binary string.
You don’t need to specify types for every field; you can leave some to default to byte
array, as we have done for year in this declaration:
grunt> env = LOAD '/home/training/climate'  
 AS (year, temp: int, city:chararray);
grunt> DESCRIBE env;
records: {year: bytearray, temp: int, city:chararray}

However, if you specify a schema in this way, you do need to specify every field. Also, there’s no way to specify the type of a field without specifying the name. On the other hand, the schema is entirely optional and can be omitted by not specifying an AS clause:
grunt> env = LOAD '/home/training/climate' ;
grunt> DESCRIBE env;
Schema for records unknown.

Fields in a relation with no schema can be referenced only using positional notation:
$0 refers to the first field in a relation, $1 to the second, and so on. Their types default
to bytearray.

grunt> env1 = FOREACH env GENERATE $0, $1, $2;
grunt> DUMP env1;
(1949,11,Banglore)
(1950,22,Chennai)
(1950,-11,Delhi)
(1949,42,Mumbai)
(1949,29,Pune)
grunt> DESCRIBE env1;
env1: {bytearray,bytearray,bytearray}

Although it can be convenient to not to have assigning types for fields, doing so can improve the clarity and efficiency of Pig Latin programs, and is generally recommended.

Validation and nulls
An SQL database will enforce the constraints in a table’s schema at load time: for
example, trying to load a string into a column that is declared to be a numeric type will
fail. In Pig, if the value cannot be cast to the type declared in the schema, then it will substitute a null value. Let’s see how this works if we have the following input for the
weather data, which has an “ram” character in place of an integer:

1949    11        Banglore
1950    22        Chennai
1950    -11       Delhi
1949    ram     Mumbai
1949    29        Pune

Pig handles the corrupt line by producing a null for the offending value, which is displayed
as the absence of a value when dumped to screen (and also when saved using STORE):

grunt> currepted_env = LOAD  '/user/training/currepted_climate'
 AS (year:chararray, temp:int, city:chararray);
grunt> DUMP currepted_env;
(1949,11,Banglore)
(1950,22,Chennai)
(1950,-11,Delhi)
(1949, ,Mumbai)
(1949,29,Pune)


Pig produces a warning for the invalid field (not shown here), but does not halt its
processing. For large datasets, it is very common to have corrupt, invalid, or merely
unexpected data, and it is generally infeasible to incrementally fix every unparsable
record. Instead, we can pull out all of the invalid records in one go, so we can take
action on them, perhaps by fixing our program or by filtering them out.

grunt> corrupt_records = FILTER records BY temperature is null;
grunt> DUMP corrupt_records;
(1950,,1)
Note the use of the is null operator, which is analogous to SQL. In practice, we would
include more information from the original record, such as an identifier and the value
that could not be parsed, to help our analysis of the bad data.
We can find the number of corrupt records using the following idiom for counting the
number of rows in a relation:
grunt> grouped = GROUP corrupt_records ALL;
grunt> all_grouped = FOREACH grouped GENERATE group, COUNT(corrupt_records);
grunt> DUMP all_grouped;
(all,1)
Another useful technique is to use the SPLIT operator to partition the data into “good”
and “bad” relations, which can then be analyzed separately:

grunt> SPLIT currepted_env INTO good_records IF temp is not null,
>> bad_records IF temp is null;
grunt> DUMP good_records;
(1949,11,Banglore)
(1950,22,Chennai)
(1950,-11,Delhi)
(1949,29,Pune)
grunt> DUMP bad_records;
(1949, ,Mumbai)


Choosing Between Pig and Hive:
Typically, organizations wanting an abstraction on top of standard MapReduce will choose to use either Hive or Pig
Which one is chosen depends on the skillset of the target users
Those with an SQL background will naturally gravitate towards
Hive
Those who do not know SQL will often choose Pig
Each has strengths and weaknesses; it is worth spending some time investigating each so you can make an informed decision

Some organizations are now choosing to use both

No comments:

Post a Comment

Hadoop Analytics

NewolympicData

  Alison Bartosik 21 United States 2004 08-29-04 Synchronized Swimming 0 0 2 2 Anastasiya Davydova 21 Russia 2004 0...