User:Statsrick/PYTHON code
Appearance
15 minutes and 5 minutes round-up folders |
---|
from datetime import datetime, timedelta
date_format = '%Y/%m/%d/%H/%M'
def round_up(t ,minutes):
t -= timedelta(minutes=t.minute % minutes, seconds=t.second, microseconds=t.microsecond)
t += timedelta(minutes=minutes)
return t
now = datetime.utcnow()
now_15_folder = round_up(now, 15).strftime(date_format)
print now_15_folder
now_5_folder_list = round_up(now, 5).strftime(date_format).split('/')
now_5_folder_list.insert(-1,now_15_folder.split('/')[-1])
now_5_folder = '/'.join(now_5_folder_list)
print now_5_folder
|
Python meta code...use a python script to write another python script |
---|
# Using a csv file as input, writes a long if-then-else statement python UDF for Redshift
import os
import pandas as pd
def if_then(infile,key,err,header,spaces):
if header=='T':
df=pd.read_csv(infile,sep='\t',skiprows=0)
else:
df=pd.read_csv(infile,sep='\t')
ncols=df.shape[1]
nrows=df.shape[0]
for index,row in df.iterrows():
keyvalue=str(row[0])
strvalue='~'.join(str(elem) for elem in row[1:ncols])
if index==nrows-1:
#lastvalue='~'.join('X' * min(len(str(elem)),4) for elem in row[1:ncols])
lastvalue=err
if index==0:
ifvalue='if'
else:
ifvalue='elif'
if keyvalue.isdigit()==True:
f.write(spaces+ifvalue+' '+key+'=='+str(row[0])+' : a=\''+strvalue+'\'\n')
else:
f.write(spaces+ifvalue+' '+key+'==\''+row[0]+'\' : a=\''+strvalue+'\'\n')
f.write(spaces+'else : a=\''+lastvalue+'\'\n')
if __name__ == "__main__":
WORKDIR='/tmp/'
if not os.path.exists(WORKDIR):
os.makedirs(WORKDIR)
IN_FILE_NAME=WORKDIR+'classy.tsv'
OUT_FILE_NAME=WORKDIR+'cidmap.sql'
f=open(OUT_FILE_NAME, 'w')
f.write("GRANT USAGE ON LANGUAGE plpythonu TO public;\n")
f.write("/* returns: 0=country,1=lotameid,2=bu,3=domain,4=prettyName,5=mos_id */\n")
f.write("create or replace function cidmap(c int,fld int)\n")
f.write("RETURNS varchar\n")
f.write("IMMUTABLE AS $$\n")
f.write("try:\n")
if_then(infile=IN_FILE_NAME,key='c',err='XX~9999~XXX~unknown~unknown~0',header='T',spaces=' ')
f.write("except : a='XX~9999~XXX~unknown~unknown~0'\n")
f.write("return a.split('~')[fld]\n")
f.write("$$ LANGUAGE plpythonu;\n")
f.close()
|
Another AWS Lambda Function that calls a stored python script in S3...handy for Lambdas with large zipped-up python packages. |
---|
# Be sure to include all modules in your stored python script as well in the zip file!
# Handy for Lambdas with large zipped-up python packages (you can't see the code).
# Using this form, you can simply change the script stored in the S3 directory to change the lambda.
# TIP: name the script in s3 the same name as the Lambda function!
import urllib
import boto3
import os
s3=boto3.client('s3',region_name='us-west-2')
# Function that is invoked by AWS Lambda
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
print("CONTENT TYPE: " + response['ContentType'])
s3.download_file('hearstdataservices', 'lambda/lambda_function_redshift_udfs.py', '/tmp/file.py')
os.system("/usr/bin/python2.7 /tmp/file.py")
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist.'.format(key, bucket))
raise e
# Debug the code on local environment
if __name__ == "__main__":
event={
"Records": [
{
"eventVersion": "2.0",
"eventTime": "1970-01-01T00:00:00.000Z",
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"s3": {
"configurationId": "testConfigRule",
"object": {
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901",
"key": "trigger_file.txt",
"size": 1024
},
"bucket": {
"arn": "arn:aws:s3:::hearstkinesisdata",
"name": "hearstdataservices",
"ownerIdentity": {
"principalId": "EXAMPLE"
}
},
"s3SchemaVersion": "1.0"
},
"responseElements": {
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH",
"x-amz-request-id": "EXAMPLE123456789"
},
"awsRegion": "us-east-1",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"eventSource": "aws:s3"
}
]
}
lambda_handler(event=event, context=None)
|
Putting it all together...an AWS Lambda Function! |
---|
from __future__ import print_function
import boto3
import botocore.exceptions
import urllib
import gzip
from datetime import datetime, timedelta
import sys
import os
import tarfile
S3_BUCKET = 'adobelogfiles'
S3_INPUT_PREFIX = 'adobe/'
S3_OUTPUT_PREFIX = 'adobe/processeddaily/mgweb/'
S3_MASTER_DATA_PREFIX = 'adobe/master-data/'
LOCAL_DIR = '/tmp'
s3 = boto3.client('s3',region_name='us-east-1')
date_format = '%Y/%m/%d'
today = datetime.utcnow().date()
dt = today.strftime(date_format)
dtm1 = (today - timedelta(days=1)).strftime(date_format)
def get_master_data_list(entity_name):
s3_key = S3_MASTER_DATA_PREFIX + entity_name + '.txt'
s3_file_content = get_s3_file_content(s3_key)
if s3_file_content is None:
print('exiting...get_master_data_list')
sys.exit()
return s3_file_content.split('\n')
def download_s3_key(s3_key, output_file):
output_path = LOCAL_DIR+'/'+output_file
msg = 'downloading s3://{}/{} to {}'.format(S3_BUCKET, s3_key, output_path)
print(msg + '...')
s3.download_file(S3_BUCKET, s3_key, output_path)
print('-> done ' + msg)
return output_path
def upload_s3_file(local_file, s3_key):
s3_location = 's3://{}/{}'.format(S3_BUCKET, s3_key)
msg = 'uploading {} to {}'.format(local_file, s3_location)
print(msg + '...')
s3.upload_file(local_file, S3_BUCKET, s3_key)
waiter = s3.get_waiter('object_exists')
waiter.wait(Bucket=S3_BUCKET, Key = s3_key)
print('-> done ' + msg)
def get_s3_file_content(s3_key):
try:
response = s3.get_object(Bucket=S3_BUCKET, Key=s3_key)
return response['Body'].read()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print('s3 object not exists (s3://{}/{})'.format(S3_BUCKET, s3_key))
return None
def get_column_headers(brand,date):
dtdash=date.replace('/','-')
#get tar
s3_key = S3_INPUT_PREFIX + date + '/' + brand + '_' + dtdash + '-lookup_data.tar.gz'
s3.download_file(S3_BUCKET, s3_key, '/tmp/lookup_data.tar.gz')
tar = tarfile.open('/tmp/lookup_data.tar.gz', 'r:gz')
column_headers = tar.extractfile( dict(zip(tar.getnames(), tar.getmembers()))['column_headers.tsv'] ).read()
os.remove('/tmp/lookup_data.tar.gz')
for line in column_headers.split('\n'):
return line.split('\t')
def get_column_mapping(column_headers):
mapping = map(lambda x: column_headers.index(x), vars)
return mapping
def create_normalized_data_local(local_file, column_mapping):
outfilename = LOCAL_DIR + '/' + 'daily_processed.tsv.gz'
msg = 'creating local normalized data file ({})'.format(outfilename)
print(msg + '...')
with gzip.open(outfilename, 'wb') as outfile:
with gzip.open(local_file, 'rb') as infile:
for num,line in enumerate(infile):
columns = line.split('\t')
formatted_list = map(lambda x: columns[x], column_mapping)
if num == 0: #first line
line_break = ''
else:
line_break = '\n'
outfile.write(line_break+'\t'.join(formatted_list))
print('-> done ' + msg)
return outfilename
def process_s3_key(key):
key_list=key.split('/')
date='/'.join(key_list[-4:-1])
brand=key_list[-1].split('_')[0].split('-')[1]
prefix=key_list[-1].split('_')[0]
print("==========================> running for:{} on date: {}".format(brand, date))
column_headers = get_column_headers(brand,date)
#print(column_headers)
column_mapping = get_column_mapping(column_headers)
#print('columns mapping: {}'.format(column_mapping))
infilename = download_s3_key(key, 'daily.tsv.gz')
outfilename = create_normalized_data_local(infilename, column_mapping)
output_s3_hit_data_file = '{S3_PREFIX}{DATE}/{BRAND}/{PREFIX}.tsv.gz'.format(DATE=date,BRAND=brand,PREFIX=prefix,S3_PREFIX=S3_OUTPUT_PREFIX)
upload_s3_file(outfilename, output_s3_hit_data_file)
os.remove(infilename)
os.remove(outfilename)
# Function that is invoked by AWS Lambda
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
print("key: " + key)
try:
response = s3.get_object(Bucket=bucket, Key=key)
print("CONTENT TYPE: " + response['ContentType'])
# return response['ContentType']
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
#User code
global S3_BUCKET
global vars
S3_BUCKET = bucket
vars = get_master_data_list('vars')
print('vars: {}'.format(vars))
process_s3_key(key)
# Debug the code on local environment
if __name__ == "__main__":
print('dt: ' + dt)
brands = get_master_data_list('brands')
print('brands: {}'.format(brands))
nbrands = len(brands)
vars = get_master_data_list('vars')
print('vars: {}'.format(vars))
some_key = 'adobe/omni-mgweb/2016/02/12/file2].tsv.gz'
process_s3_key(some_key)
|
Using a python shell to run an AWS Redshift query |
---|
#!/usr/bin/python
import boto3
import os
import pgpasslib
import pandas as pd
import psycopg2
from datetime import datetime,timedelta
s3_client = boto3.client('s3',region_name='us-east-1')
AWSUSER='[default]' #AWS Credential user
RSHOST='hostname' #Redshift cluster host name
RSUSER='username' #Redshift cluster user name
RSPORT=5439 #Redshift cluster port
RSDBNAME='db' #Redshift cluster database name
RSPWD = pgpasslib.getpass(RSHOST, RSPORT, RSDBNAME, RSUSER) #Redshift cluster password stored in ~/.pgpass
HOME=os.path.expanduser('~')
#Retrieve your AWS credentials stored in ~/.aws/credentials
with open(HOME+'/.aws/credentials') as f:
lines = f.read().splitlines()
for i in range(len(lines)):
if lines[i]==AWSUSER:
AWSKEY=lines[i+1].split('=')[1].strip()
AWSSECRET=lines[i+2].split('=')[1].strip()
#This is setting up the connection to your Redshift cluster
conn = psycopg2.connect(
host=RSHOST,
user=RSUSER,
port=RSPORT,
password=RSPWD,
dbname=RSDBNAME)
#This is creating some date variables
now = datetime.now()
day_format = '%Y/%m/%d'
current_day_str = now.strftime(day_format)
yesterday_str = (now - timedelta(days=1)).strftime(day_format)
#print yesterday_str
#Write query inline and put results into a data frame
SQL_QUERY="select url,business_unit from v_content limit 10;"
df = pd.read_sql_query(SQL_QUERY, con=conn)
#Read in an external sql file into a python variable
SQL_FILE=HOME+'/query.sql'
f=open (SQL_FILE, "r")
ff=f.read()
f.close()
#Replace macro variables in sql file with secret values (this is needed with COPY and UNLOAD statements in Redshift)
CODE=ff.replace('${AWSKEY}', AWSKEY)
CODE=CODE.replace('${AWSSECRET}', AWSSECRET)
#Run query on Redshift and send results to a python data frame
df = pd.read_sql_query(CODE, con=conn)
#Output data frame to a local file in a csv/tsv/json format
LOCAL_OUTPUT_FILE=HOME+'/testdata.json'
df.to_csv(LOCAL_OUTPUT_FILE, sep='\t', encoding='utf-8', index=False) #Print df to csv with column names
#df.to_csv(LOCAL_OUTPUT_FILE, sep='\t', encoding='utf-8', index=False, header=False) #Print df to csv without column names
#df.reset_index().to_json(LOCAL_OUTPUT_FILE,orient='records') #Print to json
#Upload local file to s3 location using boto3
UPLOAD_KEY='reports/testdata.tsv'
S3_BUCKET='bucket_name'
s3_client.upload_file(LOCAL_OUTPUT_FILE, S3_BUCKET, UPLOAD_KEY)
#Execute just a query
cur = conn.cursor()
cur.execute(SQL_QUERY)
conn.commit()
conn.close()
|
Calculate previous and next quarter hour |
---|
import time
import datetime
from datetime import timedelta
tm='10/3/2014 23:55:56'
#tm='10/3/2014 23:5544:56'
fmt='%m/%d/%Y %H:%M:%S'
try:
dt = datetime.datetime.strptime(tm, fmt)
cleandt = dt.strftime("%Y-%m-%d %H:%M:%S")
#how many secs have passed this hour
nsecs = dt.minute*60+dt.second+dt.microsecond*1e-6
#number of seconds to next quarter hour mark
ndelta = (nsecs//900)*900+899-nsecs
#number of seconds to prev quarter hour mark
pdelta = (nsecs//900)*900-nsecs
pq=dt + datetime.timedelta(seconds=pdelta)
nq=dt + datetime.timedelta(seconds=ndelta)
startq=pq.strftime("%Y-%m-%d %H:%M:%S")
stopq=nq.strftime("%Y-%m-%d %H:%M:%S")
print startq,cleandt,stopq
except ValueError:
dt = ''
|
Convert datetime to unix time |
---|
dttm="10/3/2014 10:15:00"
from datetime import datetime
import time
d = datetime.strptime(dttm, "%m/%d/%Y %H:%M:%S")
print time.mktime(d.timetuple())
|
How to read a file in one line at a time (fast) |
---|
# source: http://axialcorps.com/2013/09/27/dont-slurp-how-to-read-files-in-python/
# a simple filter that prepends line numbers
# import sys EDIT: unused, pointed out in comments here and on HN
for fname in ( 'file.txt', 'file2.txt, ):
with open(fname, 'r+') as f:
lineno = 0
# this reads in one line at a time from stdin
for line in f:
lineno += 1
print '{:>6} {}'.format(lineno, line[:-1])
|
How to read a file into memory |
---|
with open('/path/to/file', 'r+') as f:
contents = f.read()
# do more stuff
|
How to read a compressed GZIP file |
---|
import gzip
f = gzip.open('file.txt.gz', 'rb')
file_content = f.read()
f.close()
|
How to create a compressed GZIP file |
---|
import gzip
content = "Lots of content here"
f = gzip.open('file.txt.gz', 'wb')
f.write(content)
f.close()
|
How to GZIP compress an existing file |
---|
import gzip
f_in = open('file.txt', 'rb')
f_out = gzip.open('file.txt.gz', 'wb')
f_out.writelines(f_in)
f_out.close()
f_in.close()
|
LOOPS! |
---|
#Example of loops [https://wiki.python.org/moin/ForLoop Source]
For loop from 0 to 2, therefore running 3 times.
for x in range(0, 3):
print "We're on time %d" % (x)
While loop from 1 to infinity
x = 1
while True:
print "To infinity and beyond! We're getting close, on %d now!" % (x)
x += 1
|
How to use subtrings |
---|
input="artid=16659853;cat=entertainment;page=things-not-to-say-to-latinas-video;sect=_mobile;site=cosmopolitan;sub=celebrity;subcat=celebrity;subsub=news;tool=blog_entry_lite"
try:
f3 = input.find("subsub=")
if f3>0:
tsubsub = input[input.find("subsub=")+7:]
f33 = tsubsub.find(";")
if f33>0:
subsub = tsubsub[:tsubsub.find(";")]
else:
subsub = tsubsub
else:
subsub = ""
if subsub!="":
tmp = subsub.strip()
except:
tmp=""
print tmp
|
How to parse a string using Regular Expressions |
---|
import re
input="artid=16659853;cat=entertainment;page=things-not-to-say-to-latinas-video;sect=_mobile;site=cosmopolitan;sub=celebrity;subcat=celebrity;subsub=news;tool=blog_entry_lite"
site = re.search(r"site=(.*?);", input[input.find("site="):])
sub = re.search(r"sub=(.*?);", input[input.find("sub="):])
subsub = re.search(r"subsub=(.*?);", input[input.find("subsub="):])
if not subsub:
subsub = re.search(r"subsub=(.*?)", input[input.find("subsub="):])
page = re.search(r"page=(.*?);", input[input.find("page="):])
try:
tmp = "http://www." + site.group(1) + ".com" + sub.group(1) + "/" + subsub.group(1) + "/" + page.group(1)
except:
tmp = ""
print tmp
#another tmp = re.sub("[^A-Za-z0-9\\s]+", "",input).lower()
|
Example of URL encode and decode |
---|
import urllib
urllib.unquote(input).decode('utf8')
urllib.unquote(input).encode('utf8')
|
Pandas code snippetes |
---|
import pandas as pd
#List unique values in a DataFrame column
pd.unique(df.column_name.ravel())
#Convert Series datatype to numeric, getting rid of any non-numeric values
df['col'] = df['col'].astype(str).convert_objects(convert_numeric=True)
#Grab DataFrame rows where column has certain values
valuelist = ['value1', 'value2', 'value3']
df = df[df.column.isin(value_list)]
#Grab DataFrame rows where column doesn't have certain values
valuelist = ['value1', 'value2', 'value3']
df = df[~df.column.isin(value_list)]
#Delete column from DataFrame
del df['column']
#Select from DataFrame using criteria from multiple columns
newdf = df[(df['column_one']>2004) & (df['column_two']==9)]
#Rename several DataFrame columns
df = df.rename(columns = {
'col1 old name':'col1 new name',
'col2 old name':'col2 new name',
'col3 old name':'col3 new name',
})
#lower-case all DataFrame column names
df.columns = map(str.lower, df.columns)
#even more fancy DataFrame column re-naming
#lower-case all DataFrame column names (for example)
df.rename(columns=lambda x: x.split('.')[-1], inplace=True)
#Loop through rows in a DataFrame
for index, row in df.iterrows():
print index, row['some column']
#Next few examples show how to work with text data in Pandas.
#Full list of .str functions: http://pandas.pydata.org/pandas-docs/stable/text.html
#Slice values in a DataFrame column (aka Series)
df.column.str[0:2]
#Lower-case everything in a DataFrame column
df.column_name = df.column_name.str.lower()
#Get length of data in a DataFrame column
df.column_name.str.len()
#Sort dataframe by multiple columns
df = df.sort(['col1','col2','col3'],ascending=[1,1,0])
#get top n for each group of columns in a sorted dataframe
#(make sure dataframe is sorted first)
top5 = df.groupby(['groupingcol1', 'groupingcol2']).head(5)
#Grab DataFrame rows where specific column is null/notnull
newdf = df[df['column'].isnull()]
#select from DataFrame using multiple keys of a hierarchical index
df.xs(('index level 1 value','index level 2 value'), level=('level 1','level 2'))
#Change all NaNs to None (useful before loading to a db)
df = df.where((pd.notnull(df)), None)
#Get quick count of rows in a DataFrame
len(df.index)
#Pivot data (with flexibility about what what
#becomes a column and what stays a row).
#Syntax works on Pandas >= .14
pd.pivot_table(
df,values='cell_value',
index=['col1', 'col2', 'col3'], #these stay as columns
columns=['col4']) #data values in this column become their own column
#change data type of DataFrame column
df.column_name = df.column_name.astype(np.int64)
# Get rid of non-numeric values throughout a DataFrame:
for col in refunds.columns.values:
refunds[col] = refunds[col].replace('[^0-9]+.-', '', regex=True)
#Set DataFrame column values based on other column values
df['column_to_change'][(df['column1'] == some_value) & (df['column2'] == some_other_value)] = new_value
#Clean up missing values in multiple DataFrame columns
df = df.fillna({
'col1': 'missing',
'col2': '99.999',
'col3': '999',
'col4': 'missing',
'col5': 'missing',
'col6': '99'
})
#Concatenate two DataFrame columns into a new, single column
#(useful when dealing with composite keys, for example)
df['newcol'] = df['col1'].map(str) + df['col2'].map(str)
#Doing calculations with DataFrame columns that have missing values
#In example below, swap in 0 for df['col1'] cells that contain null
df['new_col'] = np.where(pd.isnull(df['col1']),0,df['col1']) + df['col2']
# Split delimited values in a DataFrame column into two new columns
df['new_col1'], df['new_col2'] = zip(*df['original_col'].apply(lambda x: x.split(': ', 1)))
# Collapse hierarchical column indexes
df.columns = df.columns.get_level_values(0)
#Create a DataFrame from a Python dictionary
df = pd.DataFrame(list(a_dictionary.items()), columns = ['column1', 'column2'])
|