PIG
Pig was originally created at
Yahoo! to answer a similar need to Hive
- Many developers did not
have the Java and/or MapReduce knowledge required to write standard
MapReduce programs
- But still needed to
query data
Pig is a dataflow language
- Language is called
PigLatin.
- Relatively simple
syntax.
- Under the covers, PigLatin
scripts are turned into MapReduce jobs and executed on the cluster.
- Installation of Pig requires no modification to the cluster
- The Pig interpreter runs on the client machine
- Turns PigLatin into
standard Java MapReduce jobs, which are then submitted to the JobTracker
- There is (currently) no shared metadata, so no need for a shared
metastore of any kind.
Pig Concepts:
- In Pig, a single element of data is an atom
- A collection of atoms – such as a row,
or a partial row – is a tuple
- Tuples are collected together into bags
- Typically, a PigLatin script starts by
loading one or more datasets into bags, and then creates new bags by
modifying those it already has existed.
Pig Features
- Pig supports many features which allow
developers to perform sophisticated data analysis without writing Java
MapReduce code
– Joining datasets
– Grouping data
– Referring to elements by
position rather than name
– Useful for datasets with
many elements
– Loading non-delimited data
using a custom SerDe
– Creation of user-defined functions,
written in Java
– And more
A Sample Pig Script:
- Here, we load a file into a bag called emps
- Then we create a new bag called rich which contains just those
- records where the salary portion is greater than 100000
- Finally, we write the contents of the srtd bag to a new directory in
HDFS
- By default, the data
will be written in tab-separated format Alternatively, to write the contents of a bag to the screen, say
Let’s
look at a simple example by writing the program to calculate the maximum recorded
temperature by year for the weather dataset in Pig Latin. The complete program
is only a few lines long:
-- max_temp.pig: Finds the maximum temperature by year
grunt>records =
LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int,
quality:int);
grunt> filtered_records = FILTER records BY
temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR
quality == 9);
grunt>grouped_records
= GROUP filtered_records BY year;
grunt>max_temp =
FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
grunt>DUMP
max_temp;
For simplicity, the program assumes that the input is tab-delimited
text, with each line
having just
year, temperature, and quality fields.
This line
describes the input data we want to process. The year:chararray notation describes the field’s name and
type; a chararray is like a Java
string, and an int is like a Java int. The LOAD
operator
takes a URI argument; here we are just using a local file, but we could refer
to an HDFS URI. The AS clause (which is optional) gives the fields names to
make it convenient to refer to them in subsequent statements.
A tuple is just like a row
of data in a database table, with multiple fields in a particular order. In
this example, the LOAD function produces a set of (year,temperature, quality)
tuples that are present in the input file. We write a relation with one tuple
per line, where tuples are represented as comma-separated items in parentheses:
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
Relations
are given names, or aliases, so they can be referred to. This relation
is given
the records alias. We can
examine the contents of an alias using the DUMP operator:
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
We can also see the structure of a relation—the relation’s schema—using the
DESCRIBE
operator on the relation’s alias:
grunt> DESCRIBE records;
records: {year: chararray,temperature: int,quality: int}
The second
statement removes records that have a missing temperature (indicated by
a value of
9999) or an unsatisfactory quality reading. For this small dataset, no records
are filtered
out:
grunt> filtered_records = FILTER records BY
temperature != 9999 AND
>> (quality == 0 OR quality == 1 OR quality ==
4 OR quality == 5 OR quality == 9);
grunt> DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
The third
statement uses the GROUP function to group the records relation by the
year field. Let’s use DUMP to see what it
produces:
grunt> grouped_records = GROUP filtered_records BY
year;
grunt> DUMP grouped_records;
(1949,{(1949,111,1),(1949,78,1)})
(1950,{(1950,0,1),(1950,22,1),(1950,-11,1)})
We now have two rows, or tuples, one for each year in the
input data. The first field in
each
tuple is the field being grouped by (the year), and the second field is a bag
of tuples
for
that year. A bag is just an unordered collection of tuples, which in Pig
Latin is represented using curly braces.
By grouping the data in this way, we have created a row
per year, so now all that remains is to find the maximum temperature for the
tuples in each bag. Before we do this, let’s
understand
the structure of the grouped_records
relation:
grunt> DESCRIBE
grouped_records;
grouped_records: {group: chararray,filtered_records:
{year: chararray,temperature: int,quality: int}}
This tells us that the grouping field is given the alias group by
Pig, and the second field
is
the same structure as the filtered_records
relation that was being grouped. With
this
information, we can try the fourth transformation:
grunt> max_temp
= FOREACH grouped_records GENERATE group,
>> MAX(filtered_records.temperature);
FOREACH processes every row to generate a derived set of
rows, using a GENERATE
clause
to define the fields in each derived row. In this example, the first field is
group, which is just the year. The
second field is a little more complex.
The
filtered_records.temperature reference is to the temperature field of the
filtered_records bag in
the grouped_records relation. MAX is a built-in function for
calculating
the maximum value of fields in a bag. In this case, it calculates the maximum
temperature
for the fields in each filtered_records
bag. Let’s check the result:
grunt> DUMP
max_temp;
(1949,111)
(1950,22)
So
we’ve successfully calculated the maximum temperature for each year.
With the
ILLUSTRATE operator, Pig provides a tool for generating a reasonably complete
and concise
dataset. Here is the output from running ILLUSTRATE
grunt> ILLUSTRATE max_temp;
-------------------------------------------------------------------------------
| records | year:chararray | temperature:int | quality:int |
-------------------------------------------------------------------------------
| | 1949 | 78 | 1 |
| | 1949 | 111 | 1 |
| | 1949 | 9999 | 1 |
-------------------------------------------------------------------------------
----------------------------------------------------------------------------------------
| filtered_records | year:chararray | temperature:int | quality:int |
----------------------------------------------------------------------------------------
| | 1949 | 78 | 1 |
| | 1949 | 111 | 1 |
----------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------
| grouped_records | group:chararray |
filtered_records:bag{:tuple(year:chararray, |
temperature:int,quality:int)} |
--------------------------------------------------------------------------------------------
| | 1949 | {(1949, 78, 1), (1949, 111, 1)} |
--------------------------------------------------------------------------------------------
---------------------------------------------------
| max_temp | group:chararray | :int |
---------------------------------------------------
| | 1949 | 111 |
---------------------------------------------------
Pig Latin
relational operators:
Category Operator
Description
Loading and storing LOAD Loads
data from the
filesystem or other storage into a relation
STORE Saves
a relation to the
filesystem
or other storage
DUMP Prints
a relation to the
console
Filtering FILTER
Removes
unwanted rows
from
a relation
DISTINCT Removes
duplicate rows
from
a relation
FOREACH...GENERATE Adds or removes fields from
a
relation
MAPREDUCE Runs
a MapReduce job using
a
relation as input
STREAM Transforms
a relation using
an
external program
SAMPLE Selects
a random sample of a
relation
Grouping and joining JOIN Joins two or more
relations
COGROUP Groups
the data in two or
more
relations
GROUP Groups
the data in a single
relation
CROSS Creates
the cross-product of
two
or more relations
Sorting ORDER
Sorts
a relation by one or
more
fields
LIMIT Limits
the size of a relation to
a maximum number of tuples
Combining and splitting UNION Combines
two or more
relations
into one
SPLIT Splits
a relation into two or
more
relations
Pig Latin diagnostic operators:
Operator Description
DESCRIBE Prints
a relation’s schema
EXPLAIN Prints
the logical and physical plans
ILLUSTRATE Shows
a sample execution of the logical plan, using a generated subset of the input
Category Command Description
Hadoop Filesystem
cat Prints
the contents of one or more files
cd Changes
the current directory
copyFromLocal Copies
a local file or directory to a Hadoop filesystem
copyToLocal Copies
a file or directory on a Hadoop filesystem to the local filesystem
cp Copies
a file or directory to another directory
fs Accesses
Hadoop’s filesystem shell
ls Lists
files
mkdir Creates
a new directory
mv Moves a
file or directory to another directory
pwd Prints
the path of the current working directory
rm Deletes
a file or directory
rmf Forcibly
deletes a file or directory (does not fail if the file or directory does not
exist)
Hadoop MapReduce
kill Kills
a MapReduce job
Utility exec Runs a
script in a new Grunt shell
Help Shows
the available commands and options
quit Exits
the interpreter
run Runs a
script within the existing Grunt shell
set Sets
Pig options and MapReduce job properties
sh Run a shell command from within Grunt
The filesystem commands can operate on files or directories in any
Hadoop filesystem, and they are very similar to the hadoop fs commands (which is not surprising, as both are simple wrappers around
the Hadoop FileSystem interface). You can access all of
the Hadoop
filesystem shell commands using Pig’s fs
command. For example, fs -ls will show a file listing, and fs
-help will show help on all the available commands.
Types
Pig has four
numeric types:
Int
Long
Float
double,
which are
identical to theirava counterparts. There is also a bytearray type, like Java’s byte array type for representing blob of binary data, and chararray, which, like java.lang.String, represents textual data in UTF-16 format, although it can be loaded or
stored in UTF-8 format.
Pig does not have types corresponding to Java’s boolean,6 byte, short, or char primitive types. These are all
easily represented using Pig’s int type, or chararray
for char.
The numeric, textual, and binary types are simple atomic types. Pig
Latin also has three complex types for representing nested structures: tuple,
bag, and map.
Category Type Description Literal example
Numeric int 32-bit signed integer 1
long 64-bit signed integer 1L
float
32-bit
floating-point number 1.0F
double 64-bit floating-point number 1.0
Text chararray
Character array in UTF-16 format 'a'
Binary bytearray
Byte array Not supported
Complex tuple Sequence of fields of any type (1,'pomegranate')
Bag An unordered collection
of tuples, possibly with
duplicates {(1,'pomegranate'),(2)}
map A set of key-value pairs.
Keys must be character arrays;
values may be any type ['a'#'pomegranate']
Schemas
A relation in Pig may have an associated schema, which gives the fields
in the relation names and types. We’ve seen how an AS clause in a LOAD
statement is used to attach a schema to a relation:
grunt> env = LOAD '/user/training/climate’
AS (year:int,
temp:int, city:chararray);
grunt> DESCRIBE env;
records: {year: int, temp: int, city:chararray}
Pig is more
flexible in defining schema. First we can load data and then we can decide
which data type is suitable for the field names, where as in SQL data types can
be declared before the data is loaded into the system. Pig is designed for
analyzing plain input files with no
Associated
type information, so it is quite natural to choose types for fields later than you
would with an RDBMS.
It’s also possible
to omit type declarations completely,:
grunt> env =
LOAD '/home/training/climate' AS
(year, temp, city);
grunt> DESCRIBE
env;
records: {year: bytearray, temp: bytearray, city:
bytearray}
In this case, we have specified only the names of the fields in the
schema as year, temp, and city. The types default to bytearray, the most general type, representing a binary string.
You don’t need to specify types for every field; you can leave some to
default to byte
array, as we have done for year in this declaration:
grunt> env = LOAD '/home/training/climate'
AS (year, temp: int, city:chararray);
grunt> DESCRIBE
env;
records: {year: bytearray, temp: int, city:chararray}
However, if you specify a schema in this way, you do need to specify
every field. Also, there’s no way to specify the type of a field without
specifying the name. On the other hand, the schema is entirely optional and can
be omitted by not specifying an AS clause:
grunt> env
= LOAD '/home/training/climate' ;
grunt> DESCRIBE
env;
Schema for records unknown.
Fields in a relation with no schema can be referenced only using
positional notation:
$0 refers to the first field in a relation, $1 to the second,
and so on. Their types default
to bytearray.
grunt> env1
= FOREACH env GENERATE $0, $1, $2;
grunt> DUMP
env1;
(1949,11,Banglore)
(1950,22,Chennai)
(1950,-11,Delhi)
(1949,42,Mumbai)
(1949,29,Pune)
grunt> DESCRIBE
env1;
env1: {bytearray,bytearray,bytearray}
Although it can be convenient to not to have assigning types for fields,
doing so can improve the clarity and efficiency of Pig Latin programs, and is
generally recommended.
Validation
and nulls
An SQL
database will enforce the constraints in a table’s schema at load time: for
example,
trying to load a string into a column that is declared to be a numeric type
will
fail. In Pig,
if the value cannot be cast to the type declared in the schema, then it will substitute
a null value. Let’s see how this works if we have the
following input for the
weather
data, which has an “ram” character in place of an integer:
1949 11 Banglore
1950 22 Chennai
1950 -11 Delhi
1949 ram Mumbai
1949 29 Pune
Pig
handles the corrupt line by producing a null for the offending value, which is
displayed
as
the absence of a value when dumped to screen (and also when saved using STORE):
grunt> currepted_env = LOAD '/user/training/currepted_climate'
AS (year:chararray, temp:int, city:chararray);
grunt> DUMP currepted_env;
(1949,11,Banglore)
(1950,22,Chennai)
(1950,-11,Delhi)
(1949, ,Mumbai)
(1949,29,Pune)
Pig produces a warning for the invalid field (not shown
here), but does not halt its
processing.
For large datasets, it is very common to have corrupt, invalid, or merely
unexpected
data, and it is generally infeasible to incrementally fix every unparsable
record.
Instead, we can pull out all of the invalid records in one go, so we can take
action
on them, perhaps by fixing our program or by filtering them out.
grunt> corrupt_records = FILTER records BY
temperature is null;
grunt> DUMP corrupt_records;
(1950,,1)
Note
the use of the is null operator, which is analogous to
SQL. In practice, we would
include
more information from the original record, such as an identifier and the value
that
could not be parsed, to help our analysis of the bad data.
We
can find the number of corrupt records using the following idiom for counting
the
number
of rows in a relation:
grunt> grouped = GROUP corrupt_records ALL;
grunt> all_grouped = FOREACH grouped GENERATE group,
COUNT(corrupt_records);
grunt> DUMP all_grouped;
(all,1)
Another useful technique is to use the SPLIT
operator to partition the data into “good”
and “bad” relations, which can then be analyzed
separately:
grunt>
SPLIT currepted_env INTO good_records IF temp is not null,
>> bad_records
IF temp is null;
grunt>
DUMP good_records;
(1949,11,Banglore)
(1950,22,Chennai)
(1950,-11,Delhi)
(1949,29,Pune)
grunt>
DUMP bad_records;
(1949, ,Mumbai)
Choosing
Between Pig and Hive:
Typically,
organizations wanting an abstraction on top of standard MapReduce will choose
to use either Hive or Pig
Which one
is chosen depends on the skillset of the target users
– Those with
an SQL background will naturally gravitate towards
Hive
– Those who
do not know SQL will often choose Pig
Each has
strengths and weaknesses; it is worth spending some time investigating each so
you can make an informed decision
Some
organizations are now choosing to use both
No comments:
Post a Comment