Jump to content

User:Statsrick/PIG code

From Wikipedia, the free encyclopedia
  • Create macro variable that you can use in your code
%declare CURR_DATE `date +%Y-%m-%d\ %H:%M:%S`;
B = foreach A generate '$CURR_DATE' as dt:chararray;


  • Windowing in Pig with Over (but no windowing)
 {code}
select s, min(i) over (partition by s) from T
{code}
is done in Pig as:
{code}
A = load 'T';
B = group A by s;
C = foreach B generate flatten(A), MIN(A.i) as min; 
D = foreach C generate A::s, min;
{code}


  • Windowing functions in Pig
http://pig.apache.org/docs/r0.12.0/api/org/apache/pig/piggybank/evaluation/Over.html#Over()

PIG:

DEFINE Over org.apache.pig.piggybank.evaluation.Over();
DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch;
F = foreach (group E4 by domain)  {
  GENERATE FLATTEN(Stitch(E4, Over(E4.visits,'avg(long)',-1,-1) ));
};
DUMP F

same as SQL:

create table F as
select startq,url_hash,dom,R,F,
avg(visits) over (partition by domain) as avg_by_domain
from DD;


PIG:

DEFINE Over org.apache.pig.piggybank.evaluation.Over();
DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch;
E = group DD by (startq,dom);
F = foreach E {
F1 = order DD by pageviews desc;
GENERATE
FLATTEN(Stitch(F1, Over(F1,'row_number') )) as (startq,url_hash,dom,pageviews,visits,rn);
};
DUMP F;

same as SQL:

create table F as
select startq,url_hash,dom,pageviews,visits,
row_number() over (partition by startq,dom order by pageviews desc) as rn
from DD;


PIG:

DEFINE Over org.apache.pig.piggybank.evaluation.Over();
DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch;
E = group DD by (startq,dom);
F = foreach E {
F1 = order DD by pageviews desc;
GENERATE
FLATTEN(Stitch(F1, Over(F1,'ntile',-1,-1,5) )) as (startq,url_hash,dom,pageviews,visits,nt);
};
DUMP F;

same as SQL:

create table F as
select startq,url_hash,dom,pageviews,visits,
ntile(5) over (partition by startq,dom order by pageviews desc) as nt
from DD;


  • Date time conversions
--http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html
REGISTER /home/hadoop/pig/pig-0.12.0.jar;
REGISTER /home/hadoop/pig/lib/piggybank.jar;
A = LOAD 's3://hearstlogfiles/google/NetworkBackfillImpressions_271283/2014/09/24/NetworkBackfillImpressions_271283_20140924_00.gz' USING PigStorage(',');
B = LIMIT A 20;
C = STREAM B THROUGH `tail -n +2`;
D = FOREACH C GENERATE
CONCAT(CONCAT(SUBSTRING($0, 0,10),' '),SUBSTRING($0, 11,19) ) as dt_string:chararray,
ToDate(CONCAT(CONCAT(SUBSTRING($0, 0,10),' '),SUBSTRING($0, 11,19)), 'yyyy-MM-dd HH:mm:ss') AS (dt:datetime);
E = FOREACH D GENERATE
dt_string,
dt,
SubtractDuration(dt,(chararray)'PT4H') AS (dt1:datetime);
DUMP E;
  • Pig versus SQL

SQL Query

insert into ValuableClicksPerDMA
select dma, count(*)
from geoinfo join (
select name, ipaddr
from users join clicks on (users.name = clicks.user)
where value > 0; 
) using ipaddr
group by dma;

The Pig Latin for this will look like:

Users                = load 'users' as (name, age, ipaddr);
Clicks               = load 'clicks' as (user, url, value);
ValuableClicks       = filter Clicks by value > 0;
UserClicks           = join Users by name, ValuableClicks by user;
Geoinfo              = load 'geoinfo' as (ipaddr, dma);
UserGeo              = join UserClicks by ipaddr, Geoinfo by ipaddr;
ByDMA                = group UserGeo by dma;
ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo);
store ValuableClicksPerDMA into 'ValuableClicksPerDMA';
  • Read in a few lines and filter with matching
rmf /omnioutput
A = LOAD 's3://hearsticrossing/Clicks/logdate=2014-03-10/hour=10/minutes=45/part-000000.gz' USING org.apache.pig.piggybank.storage.CSVExcelStorage('\t', 'YES_MULTILINE');
B = FILTER A BY ($24 matches '.*N.*');
X = LIMIT B 100;
STORE X INTO '/omnioutput' USING PigStorage();
  • Ingest a few columns of Omniture data, group it, and put into HDFS (to list: hadoop fs -ls /omnioutput) (to view: hadoop fs -cat /omnioutput/part-r-00000)
--pig
REGISTER /home/hadoop/lib/pig/pig.jar;
REGISTER /home/hadoop/lib/pig/piggybank.jar;
rmf /omnioutput
A = LOAD 's3://hearstlogfiles/npweb/01-hearstsfgate_2014-02-19.tsv.gz' USING PigStorage('\t');
T = FILTER A BY ($475 IS NULL) AND ($373 IS NOT NULL) AND ($375 IS NOT NULL);
B = FOREACH T GENERATE
(chararray)$371 as pubdt:chararray,
(chararray)$372 as title:chararray,
(chararray)$373 as article_id:chararray,
(chararray)$375 as author:chararray,
(chararray)CONCAT((chararray)$121,(chararray)$122) as hid:chararray,
(chararray)CONCAT((chararray)$460,(chararray)$461) as vid:chararray;
C = GROUP B BY (pubdt,title,article_id,author) PARALLEL 18;
--DESCRIBE C;
D = FOREACH C GENERATE FLATTEN(group),COUNT(B.hid) as pageviews,COUNT(B.vid) as visits PARALLEL 18;
STORE D INTO '/omnioutput' USING PigStorage();
  • Read line-by-line with REGEX_EXTRACT_ALL

data

122.161.182.200 - Joe [21/Jul/2009:13:14:17 -0700] "GET /rss.pl HTTP/1.1" 200 35942 "-" "IE/4.0 (compatible; MSIE 7.0; Windows NT 6.0;  Trident/4.0; SLCC1; .NET CLR 2.0.50727; 
.NET CLR 3.5.21022; InfoPath.2; .NET CLR 3.5.30729; .NET CLR 3.0.30618; OfficeLiveConnector.1.3; OfficeLivePatch.1.3; MSOffice 12)"

pig script

logs_base = FOREACH raw_logs GENERATE FLATTEN (
REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"') ) AS 
(remoteAddr:chararray, remoteLogname:chararray, user:chararray,  time:chararray, request:chararray, status:int, bytes_string:chararray, referrer:chararray, browser:chararray);
  • Ingest iCrossing data into S3
--pig
REGISTER /home/hadoop/lib/pig/pig.jar
REGISTER /home/hadoop/lib/pig/piggybank.jar;
A = LOAD 's3://hearsticrossing/Clicks/logdate=2014-03-10/hour=10/minutes=45/part-000000.gz' USING org.apache.pig.piggybank.storage.CSVExcelStorage('\t', 'YES_MULTILINE');
B = FOREACH A GENERATE
REGEX_EXTRACT($0, '([0-9]+)', 1) as client_id:int,
REGEX_EXTRACT($1, '([0-9]+)', 1) as domain_id:int,
CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT(
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),0,4), '([0-9]+)', 1),'-') ,
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),5,7), '([0-9]+)', 1),'-')),
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),8,10), '([0-9]+)', 1),' ')),
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),11,13), '([0-9]+)', 1),':')),
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),14,16), '([0-9]+)', 1),':')),
REGEX_EXTRACT( SUBSTRING(TRIM($2),17,19), '([0-9]+)', 1)) as log_time_gmt:chararray,
(chararray)$3 as fpc_id:chararray,
(chararray)$4 as permanent_id:chararray,
(chararray)$5 as referrer_domain:chararray,
(chararray)$6 as referrer:chararray,
(chararray)$7 as document_domain:chararray,
(chararray)$8 as document_url:chararray,
(chararray)$9 as traffic_source:chararray,
(chararray)$10 as country_code:chararray,
(chararray)$11 as city:chararray,
(chararray)$12 as region:chararray,
(chararray)$13 as ua_device_type:chararray,
(chararray)$14 as article_author:chararray,
(chararray)$15 as article_id:chararray,
(chararray)$16 as article_title:chararray,
(chararray)$17 as channel:chararray,
REGEX_EXTRACT($18, '([0-9]+)', 1) as days_since_last_published:int,
REGEX_EXTRACT($19, '([0-9]+)', 1) as ga_id:int,
(chararray)$20 as search_term:chararray,
(chararray)$21 as keywords:chararray,
(chararray)$22 as page_name:chararray,
(chararray)$23 as page_type:chararray,
CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT(
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),0,4), '([0-9]+)', 1),'-') ,
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),5,7), '([0-9]+)', 1),'-')),
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),8,10), '([0-9]+)', 1),' ')),
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),11,13), '([0-9]+)', 1),':')),
(chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),14,16), '([0-9]+)', 1),':')),
REGEX_EXTRACT( SUBSTRING(TRIM($24),17,19), '([0-9]+)', 1)) as publish_date_gmt:chararray,
(chararray)$25 as site_heir:chararray,
(chararray)$26 as site_name:chararray,
(chararray)$27 as site_catalyst_cookie:chararray,
(chararray)$28 as site_catalyst_fpc:chararray
;
STORE B INTO 's3://hearsticrossing/Clicks_clean_pig/' USING PigStorage();
  • Logfile read in
REGISTER file:/home/hadoop/lib/pig/piggybank.jar;
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT(); 
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME();
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT(); 
DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT();
%default YEAR `date +%Y`;
RAW_LOGS = LOAD '$INPUT' as (line:chararray);
SRC = FOREACH RAW_LOGS GENERATE
FLATTEN(
EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+from=<([^>]+)>,\\s+size=(\\d+),\\s+class=(\\d+),\\s+nrcpts=(\\d+),\\s+msgid=<([^>]+)>.*relay=(\\S+)'))
AS (
month: chararray,
day: chararray,
time: chararray, 
mailserver: chararray,
pid: chararray,
sendmailid: chararray,
src: chararray,
size: chararray,
classnumber: chararray,
nrcpts: chararray,
msgid: chararray,
relay: chararray
);
T1 = FOREACH SRC GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp;
FILTER_T1 = FILTER T1 BY NOT sendmailid IS NULL;
DUMP FILTER_T1;
R1 = FOREACH FILTER_T1 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt;
DUMP R1;
 -- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds 
toEpoch1 = FOREACH R1 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 as epoch:long;
DUMP toEpoch1;
DEST = FOREACH RAW_LOGS GENERATE
FLATTEN( 
EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+to=<([^>]+)>,\\s+delay=([^,]+),\\s+xdelay=([^,]+),.*relay=(\\S+)\\s+\\  [\\S+\\],\\s+dsn=\\S+,\\s+stat=(.*)') 
)
AS (
month: chararray,
day: chararray,
time: chararray,
mailserver: chararray,
pid: chararray,
sendmailid: chararray,
dest: chararray, 
delay: chararray,
xdelay: chararray,
relay: chararray,
stat: chararray
);
T2 = FOREACH DEST GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp, dest, stat;
FILTER_T2 = FILTER T2 BY NOT sendmailid IS NULL;
R2 = FOREACH FILTER_T2 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt, dest, stat;
 -- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds
toEpoch2 = FOREACH R2 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 AS epoch:long, dest, stat;
R3 = JOIN toEpoch1 BY sendmailid, toEpoch2 BY sendmailid;
R4 = FOREACH R3 GENERATE $0, $5 - $2, $6, $7;
R5 = ORDER R4 BY $1 DESC;
STORE R5 INTO '$OUTPUT';


Back to Rick's Library