User:Statsrick/PIG code
Appearance
- Create macro variable that you can use in your code
%declare CURR_DATE `date +%Y-%m-%d\ %H:%M:%S`; B = foreach A generate '$CURR_DATE' as dt:chararray;
- Windowing in Pig with Over (but no windowing)
{code} select s, min(i) over (partition by s) from T {code} is done in Pig as: {code} A = load 'T'; B = group A by s; C = foreach B generate flatten(A), MIN(A.i) as min; D = foreach C generate A::s, min; {code}
- Windowing functions in Pig
http://pig.apache.org/docs/r0.12.0/api/org/apache/pig/piggybank/evaluation/Over.html#Over()
PIG:
DEFINE Over org.apache.pig.piggybank.evaluation.Over(); DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch; F = foreach (group E4 by domain) { GENERATE FLATTEN(Stitch(E4, Over(E4.visits,'avg(long)',-1,-1) )); }; DUMP F
same as SQL:
create table F as select startq,url_hash,dom,R,F, avg(visits) over (partition by domain) as avg_by_domain from DD;
PIG:
DEFINE Over org.apache.pig.piggybank.evaluation.Over(); DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch; E = group DD by (startq,dom); F = foreach E { F1 = order DD by pageviews desc; GENERATE FLATTEN(Stitch(F1, Over(F1,'row_number') )) as (startq,url_hash,dom,pageviews,visits,rn); }; DUMP F;
same as SQL:
create table F as select startq,url_hash,dom,pageviews,visits, row_number() over (partition by startq,dom order by pageviews desc) as rn from DD;
PIG:
DEFINE Over org.apache.pig.piggybank.evaluation.Over(); DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch; E = group DD by (startq,dom); F = foreach E { F1 = order DD by pageviews desc; GENERATE FLATTEN(Stitch(F1, Over(F1,'ntile',-1,-1,5) )) as (startq,url_hash,dom,pageviews,visits,nt); }; DUMP F;
same as SQL:
create table F as select startq,url_hash,dom,pageviews,visits, ntile(5) over (partition by startq,dom order by pageviews desc) as nt from DD;
- Date time conversions
--http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html REGISTER /home/hadoop/pig/pig-0.12.0.jar; REGISTER /home/hadoop/pig/lib/piggybank.jar; A = LOAD 's3://hearstlogfiles/google/NetworkBackfillImpressions_271283/2014/09/24/NetworkBackfillImpressions_271283_20140924_00.gz' USING PigStorage(','); B = LIMIT A 20; C = STREAM B THROUGH `tail -n +2`; D = FOREACH C GENERATE CONCAT(CONCAT(SUBSTRING($0, 0,10),' '),SUBSTRING($0, 11,19) ) as dt_string:chararray, ToDate(CONCAT(CONCAT(SUBSTRING($0, 0,10),' '),SUBSTRING($0, 11,19)), 'yyyy-MM-dd HH:mm:ss') AS (dt:datetime); E = FOREACH D GENERATE dt_string, dt, SubtractDuration(dt,(chararray)'PT4H') AS (dt1:datetime); DUMP E;
- Pig versus SQL
SQL Query
insert into ValuableClicksPerDMA select dma, count(*) from geoinfo join ( select name, ipaddr from users join clicks on (users.name = clicks.user) where value > 0; ) using ipaddr group by dma;
The Pig Latin for this will look like:
Users = load 'users' as (name, age, ipaddr); Clicks = load 'clicks' as (user, url, value); ValuableClicks = filter Clicks by value > 0; UserClicks = join Users by name, ValuableClicks by user; Geoinfo = load 'geoinfo' as (ipaddr, dma); UserGeo = join UserClicks by ipaddr, Geoinfo by ipaddr; ByDMA = group UserGeo by dma; ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo); store ValuableClicksPerDMA into 'ValuableClicksPerDMA';
- Read in a few lines and filter with matching
rmf /omnioutput A = LOAD 's3://hearsticrossing/Clicks/logdate=2014-03-10/hour=10/minutes=45/part-000000.gz' USING org.apache.pig.piggybank.storage.CSVExcelStorage('\t', 'YES_MULTILINE'); B = FILTER A BY ($24 matches '.*N.*'); X = LIMIT B 100; STORE X INTO '/omnioutput' USING PigStorage();
- Ingest a few columns of Omniture data, group it, and put into HDFS (to list: hadoop fs -ls /omnioutput) (to view: hadoop fs -cat /omnioutput/part-r-00000)
--pig REGISTER /home/hadoop/lib/pig/pig.jar; REGISTER /home/hadoop/lib/pig/piggybank.jar; rmf /omnioutput A = LOAD 's3://hearstlogfiles/npweb/01-hearstsfgate_2014-02-19.tsv.gz' USING PigStorage('\t'); T = FILTER A BY ($475 IS NULL) AND ($373 IS NOT NULL) AND ($375 IS NOT NULL); B = FOREACH T GENERATE (chararray)$371 as pubdt:chararray, (chararray)$372 as title:chararray, (chararray)$373 as article_id:chararray, (chararray)$375 as author:chararray, (chararray)CONCAT((chararray)$121,(chararray)$122) as hid:chararray, (chararray)CONCAT((chararray)$460,(chararray)$461) as vid:chararray; C = GROUP B BY (pubdt,title,article_id,author) PARALLEL 18; --DESCRIBE C; D = FOREACH C GENERATE FLATTEN(group),COUNT(B.hid) as pageviews,COUNT(B.vid) as visits PARALLEL 18; STORE D INTO '/omnioutput' USING PigStorage();
- Read line-by-line with REGEX_EXTRACT_ALL
data
122.161.182.200 - Joe [21/Jul/2009:13:14:17 -0700] "GET /rss.pl HTTP/1.1" 200 35942 "-" "IE/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/4.0; SLCC1; .NET CLR 2.0.50727; .NET CLR 3.5.21022; InfoPath.2; .NET CLR 3.5.30729; .NET CLR 3.0.30618; OfficeLiveConnector.1.3; OfficeLivePatch.1.3; MSOffice 12)"
pig script
logs_base = FOREACH raw_logs GENERATE FLATTEN ( REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"') ) AS (remoteAddr:chararray, remoteLogname:chararray, user:chararray, time:chararray, request:chararray, status:int, bytes_string:chararray, referrer:chararray, browser:chararray);
- Ingest iCrossing data into S3
--pig REGISTER /home/hadoop/lib/pig/pig.jar REGISTER /home/hadoop/lib/pig/piggybank.jar; A = LOAD 's3://hearsticrossing/Clicks/logdate=2014-03-10/hour=10/minutes=45/part-000000.gz' USING org.apache.pig.piggybank.storage.CSVExcelStorage('\t', 'YES_MULTILINE'); B = FOREACH A GENERATE REGEX_EXTRACT($0, '([0-9]+)', 1) as client_id:int, REGEX_EXTRACT($1, '([0-9]+)', 1) as domain_id:int, CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT( (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),0,4), '([0-9]+)', 1),'-') , (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),5,7), '([0-9]+)', 1),'-')), (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),8,10), '([0-9]+)', 1),' ')), (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),11,13), '([0-9]+)', 1),':')), (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($2),14,16), '([0-9]+)', 1),':')), REGEX_EXTRACT( SUBSTRING(TRIM($2),17,19), '([0-9]+)', 1)) as log_time_gmt:chararray, (chararray)$3 as fpc_id:chararray, (chararray)$4 as permanent_id:chararray, (chararray)$5 as referrer_domain:chararray, (chararray)$6 as referrer:chararray, (chararray)$7 as document_domain:chararray, (chararray)$8 as document_url:chararray, (chararray)$9 as traffic_source:chararray, (chararray)$10 as country_code:chararray, (chararray)$11 as city:chararray, (chararray)$12 as region:chararray, (chararray)$13 as ua_device_type:chararray, (chararray)$14 as article_author:chararray, (chararray)$15 as article_id:chararray, (chararray)$16 as article_title:chararray, (chararray)$17 as channel:chararray, REGEX_EXTRACT($18, '([0-9]+)', 1) as days_since_last_published:int, REGEX_EXTRACT($19, '([0-9]+)', 1) as ga_id:int, (chararray)$20 as search_term:chararray, (chararray)$21 as keywords:chararray, (chararray)$22 as page_name:chararray, (chararray)$23 as page_type:chararray, CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT((chararray)CONCAT( (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),0,4), '([0-9]+)', 1),'-') , (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),5,7), '([0-9]+)', 1),'-')), (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),8,10), '([0-9]+)', 1),' ')), (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),11,13), '([0-9]+)', 1),':')), (chararray)CONCAT(REGEX_EXTRACT( SUBSTRING(TRIM($24),14,16), '([0-9]+)', 1),':')), REGEX_EXTRACT( SUBSTRING(TRIM($24),17,19), '([0-9]+)', 1)) as publish_date_gmt:chararray, (chararray)$25 as site_heir:chararray, (chararray)$26 as site_name:chararray, (chararray)$27 as site_catalyst_cookie:chararray, (chararray)$28 as site_catalyst_fpc:chararray ; STORE B INTO 's3://hearsticrossing/Clicks_clean_pig/' USING PigStorage();
- Logfile read in
REGISTER file:/home/hadoop/lib/pig/piggybank.jar; DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT(); DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO(); DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix(); DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME(); DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT(); DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT(); %default YEAR `date +%Y`; RAW_LOGS = LOAD '$INPUT' as (line:chararray); SRC = FOREACH RAW_LOGS GENERATE FLATTEN( EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+from=<([^>]+)>,\\s+size=(\\d+),\\s+class=(\\d+),\\s+nrcpts=(\\d+),\\s+msgid=<([^>]+)>.*relay=(\\S+)')) AS ( month: chararray, day: chararray, time: chararray, mailserver: chararray, pid: chararray, sendmailid: chararray, src: chararray, size: chararray, classnumber: chararray, nrcpts: chararray, msgid: chararray, relay: chararray ); T1 = FOREACH SRC GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp; FILTER_T1 = FILTER T1 BY NOT sendmailid IS NULL; DUMP FILTER_T1; R1 = FOREACH FILTER_T1 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt; DUMP R1; -- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds toEpoch1 = FOREACH R1 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 as epoch:long; DUMP toEpoch1; DEST = FOREACH RAW_LOGS GENERATE FLATTEN( EXTRACT(line, '(\\S+)\\s+(\\d+)\\s+(\\S+)\\s+(\\S+)\\s+sendmail\\[(\\d+)\\]:\\s+(\\w+):\\s+to=<([^>]+)>,\\s+delay=([^,]+),\\s+xdelay=([^,]+),.*relay=(\\S+)\\s+\\ [\\S+\\],\\s+dsn=\\S+,\\s+stat=(.*)') ) AS ( month: chararray, day: chararray, time: chararray, mailserver: chararray, pid: chararray, sendmailid: chararray, dest: chararray, delay: chararray, xdelay: chararray, relay: chararray, stat: chararray ); T2 = FOREACH DEST GENERATE sendmailid, FORMAT('%s-%s-%s %s', $YEAR, month, day, time) as timestamp, dest, stat; FILTER_T2 = FILTER T2 BY NOT sendmailid IS NULL; R2 = FOREACH FILTER_T2 GENERATE sendmailid, DATE_TIME(timestamp, 'yyyy-MMM-d HH:mm:ss') as dt, dest, stat; -- ISOToUnix returns milliseconds, so we divide by 1000 to get seconds toEpoch2 = FOREACH R2 GENERATE sendmailid, dt, ISOToUnix(dt) / 1000 AS epoch:long, dest, stat; R3 = JOIN toEpoch1 BY sendmailid, toEpoch2 BY sendmailid; R4 = FOREACH R3 GENERATE $0, $5 - $2, $6, $7; R5 = ORDER R4 BY $1 DESC; STORE R5 INTO '$OUTPUT';
Back to Rick's Library