Pages

Sunday, June 10, 2012

Python: read rows from SQL file and insert into SQLLite DB

# Using the sqllite in memory as saving sessions is not required in my case'''

sqlliteconnection = sqlite3.connect(':memory:')
sqllitecursor = sqlliteconnection.cursor()



def PutCSVDetailsInSQLLite(filename):
'Reads the SQL File and Inserts the records in SQLLite database'
# Open the CSV file in read only mode
csvfile = open(filename, 'rt')
# Create a reader
reader = csv.reader(csvfile)
for row in reader:
#Read each row and insert it in the testautomation table
sqllitecursor.execute('INSERT INTO  testautomation VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', row)
# Close CSV file as it is no longer required
csvfile.close()
# Do not forget to commit
sqlliteconnection.commit()

def CreateSQLLiteTable():
'''Creates SQL Lite database if it doesnt exist.
For this case it will be recreated as its in memory database'''
sqllitecursor.execute('DROP TABLE IF EXISTS testautomation')
# 23 Columns are being assumed to be present in the CSV file
sqllitecursor.execute('''CREATE TABLE testautomation (TC,UT,ST,RT,PT,
DataSetup,PreExecutionStep,InputFiles,InputDatabase,ExecutionStep,
OutputFiles,OutputDatabase,OutputExitStatus,Cleanup,TestCaseName,
TestCaseDescription,TestCaseNature,TestStepDescription,DataAttributes,
ExpectedResults,InputVal,ActualResults,PassFailDefect)''')
# Save the table by executing the commit statement
sqlliteconnection.commit()


Saturday, June 9, 2012

Python: Get command line parameters


def Print_Command_Parms():
'Prints the command line parameters passed to the job'
for i, cmd in enumerate(sys.argv):
logger.debug("argv[%d]='%s'" % (i, cmd))

Python: Request user input; use default if none provided


def Request_User_Input(test_mode):
'Gets user input depending on the mode (Batch/Manual)'
if test_mode == 'M':
confirmation = input('\n.............Provide Your Input (P/F): ')
if confirmation.upper() == 'P':
logger.debug('User Provided - PASS')
return 1
else:
logger.debug('User Provided - FAIL')
return 2
if test_mode == 'B':
logger.debug('User Provided - PASS')
return 1

Python: File operations - File Exists, Delete, Copy, Rename


def File_Rename(source_file, target_file):
'Rename the source file name to that of the specified target name'
if File_Exists(source_file):
if File_Exists(target_file):
logger.debug('FILE NOT RENAMED - Target file: ' + target_file + 'EXISTS')
return False
else:
os.rename(source_file, target_file)
logger.debug('FILE RENAMED - Source_File: ' + source_file + '; Target_File: ' + target_file)
return True
else:
logger.debug('FILE NOT RENAMED - Source file: ' + source_file + ' DOES NOT EXIST')
return False

def File_Copy(source_file, target_file):
'Copy the source file to the target file'
if File_Exists(source_file):
if File_Exists(target_file):
logger.debug('FILE NOT COPIED - Target file: ' + target_file + 'EXISTS')
return 0
else:
shutil.copyfile(source_file , target_file)
logger.debug('FILE COPIED - Source_File: ' + source_file + '; Target_File: ' + target_file)
return 1
else:
logger.debug('FILE NOT COPIED - Source file: ' + source_file + 'DOES NOT EXIST')
return 0

def File_Delete(source_file):
'Delete the given file'
if File_Exists(source_file):
os.remove(source_file)
logger.debug('FILE DELETED - ' + source_file)
return 1
else:
logger.debug('FILE NOT DELETED - ' + source_file + ' - DOES NOT EXIST')
return 0

def File_Exists(file_name):
'Check whether the given filename exists and is actually a file'
try:
if os.path.exists(str(file_name)) and os.path.isfile(str(file_name)):
return True
else:
return False
except:
logger.debug("Error returned while checking if file exists")
return False

Python: Get current time in required format


def Get_Current_Time():
'Returns current date time in format "%Y_%m_%d_%H_%M_%S"'
now = datetime.datetime.now()
return str(now.strftime("%Y_%m_%d_%H_%M_%S"))

Python: Find row in CSV file with a specified value in given column


def FindRowInCSV(file_name, column_position, the_value):
'''Returns the first row# where the value is present in the specified
column of the provided CSV file'''
f = open(file_name, 'rt')
try:
reader = csv.reader(f)
i = 1
for row in reader:
if str(row[column_position]) == str(the_value):
return i
i += 1
except:
logger.debug ('Error Reading CSV file: ' + str(sys.exc_info()[1]))
finally:
f.close()
return -1

Python: Read a .SQL file and execute queries


# All the queries in the file need to be executed on the same database
# The code below can also be used to execute queries directly
''' I am not removing the imported modules not being used in the code below as its being used in my other part of code'''

import fnmatch
from stat import *
import time
import csv
import sys
import subprocess
import os
import cx_Oracle # For Connecting to Oracle Database
import pymysql   # For connecting to MySQL Database
import shutil
import datetime
from sys import argv, exit
import sqlite3
import logging


def Process_SQL_File(databasetype, servername, databasename, username, password, filename):
'Reads the queries from the SQL file and executes them'
try:
f = open(filename, "r")
sql_file = f.read()
queries = (sql_file.split(';'))
for query in queries:
if query.strip():
Execute_Query(databasetype, servername, databasename, username, password, query)
f.close()
except:
logger.debug ('ERROR OCCURED: ' + str(sys.exc_info()[1]))
finally:
f.close()

def Execute_Query(databasetype, servername, databasename, username, password, query):
'Calls the appropriate function to make the db call based on the databasetype'
if query.strip():
# if Database is Oracle call Execute_Query_Oracle function
if databasetype.strip().upper() == "ORACLE":
Execute_Query_Oracle(servername, databasename, username, password, query)
elif databasetype.strip().upper() == "MYSQL":
Execute_Query_MySQL(servername, databasename, username, password, query)
else:
logger.debug(databasetype + " database is not yet supported")


def Execute_Query_Oracle(servername, databasename, username, password, query):
'Executes the query on Oracle database'
connection = cx_Oracle.connect((str(username) + '/' + str(password) + '@' + str(servername) + '/' + str(databasename)))
cursor = connection.cursor()
try:
logger.debug("Query: " + query.strip())
cursor.execute(query)
# if the query is insert or delete; perform commiit
if 'insert into'.strip().upper() in query.upper() or '''
delete from'''.strip().upper() in query.upper():
connection.commit()
# if the query is count(*) or count(1); get 1 record only as expected
elif "(*)" in query.upper() or "(1)" in query.upper():
count, = cursor.fetchone()
logger.debug(count)
# if several columns selection or select * is being done;
# return multiple columns and multiple rows'''
else:
result = cursor.fetchall()
for r in result:
logger.debug(Format_DB_Output(str(r)))
except:
logger.debug('ERROR - QUERY COULD NOT BE EXECUTED ' + str(sys.exc_info()[1]))
finally:
cursor.close()
connection.close()
def Execute_Query_MySQL(hostname, dbname, username, password, query):
'Connects to MySQL Database'
connection = pymysql.connect(host=hostname, user=username, passwd=password, db=dbname)
cursor = connection.cursor()
try:
logger.debug("Query: " + query.strip())
cursor.execute(query)
# if the query is insert or delete; perform commiit
if 'insert into'.strip().upper() in query.upper() or '''
delete from'''.strip().upper() in query.upper():
connection.commit()
# if the query is count(*) or count(1); get 1 record only as expected
elif "(*)" in query.upper() or "(1)" in query.upper():
count, = cursor.fetchone()
logger.debug(count)
# if several columns selection or select * is being done;
# return multiple columns and multiple rows'''
else:
result = cursor.fetchall()
for r in result:
logger.debug(Format_DB_Output(str(r)))
except:
logger.debug('ERROR - QUERY COULD NOT BE EXECUTED ' + str(sys.exc_info()[1]))
finally:
cursor.close()
connection.close()

Python: Simple Pattern matching in Python & Finding latest file in a directory


# The following helps you find the files in a folder with a given pattern and find the latest modified file by checking the modified timestamp of the files

''' I store the modified time in a list so that i can do a max on the value
 I store the timestamp and filename combination in a dictionary object so that i can lookup the filename of the file with the latest modified timestamp
Also, i am using the time in float format as the comparison is easy and accurate'''

def FindLatestFile(directory, lookforpattern):
timelist = []
filename_time = {}
try:
for file in os.listdir(directory):
if fnmatch.fnmatch(file, lookforpattern):
file_mod_time1 = os.stat(os.path.join(directory,file)).st_mtime
timelist.append(file_mod_time1)
filename_time[file_mod_time1] = file
print('LATEST FILE : ' + filename_time[max(timelist)])
except:
print('No file found with given pattern')

#USAGE:
FindLatestFile(r'C:\FOLDER1\outlog', 'file_name*.log')

Python: How i implemented logging in my tool

#How I implemented logging in my tool:
#Requirement is to create a log file which has the name of BATCH JOB being executed, The environment being tested and The Date and Time information as the tool can be executed multiple times back to back
# INPUT_FILE_NAME is passed as second argument to the batch job
#  TEST_ENVIRONMENT is passed as third argument to the batch job

import time
import sys
import os
import datetime
import logging

# Split the input file name and take the first part of the name i.e. except .extension
log_file_name = 'logfile_' + os.path.splitext(str(sys.argv[1]))[0] + '_' + str(sys.argv[2]) + '_' + str(datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")) + '.log'

logger = logging.getLogger('Testing_Automation_Advanced')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler(log_file_name)
# LOG everything to the log file i.e. at DEBUG level
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
# Show only INFO+ level on the standard output i.e. screen
ch.setLevel(logging.INFO)
# create formatter and add it to the handlers
fhformatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
chformatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setFormatter(chformatter)
fh.setFormatter(fhformatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

#USE
logger.debug ('Logging at DEBUG level')
logger.info('Logging at INFO level')

# You can also encapsulate the logging directly and implement some STYLEs e.g.

def Print_Log_Style(log_text, style=None):
'''Puts the provided text in the list for the log file and also prints
it on the screen'''

if style.strip().upper() == 'TITLE':
multiplier = int(30-(len(log_text)/2))
logger.debug(60*'*')
logger.debug(' '*multiplier + log_text)
logger.debug(60*'*')

if style.strip().upper() == 'HEADER':
multiplier = int(15-(len(log_text)/2))
logger.debug(30*'-')
logger.debug(' '*multiplier + log_text)
logger.debug(30*'-')
if style.strip().upper() == 'SECTION':
multiplier = int(15-(len(log_text)/2))
logger.debug(30*'=')
logger.debug(' '*multiplier + log_text)
logger.debug(30*'=')
if style.strip().upper() is None:
logger.debug(log_text)

Sunday, April 24, 2011

Ubuntu 11.04

Ubuntu 11.04 releasing on 28 April. I'd upgrade to this version but also install Fedora 15 to be able to use Gnome 3.0. Unity is decent but it should be an addon instead of the entire shell.