@ -1,8 +1,9 @@
import re
import hashlib
import time
import ssl
import traceback
from pathlib import Path
from datetime import datetime
import imaplib
from smtplib import SMTP_SSL
@ -17,9 +18,7 @@ from email.mime.application import MIMEApplication
import pdfkit
from jinja2 import Environment , FileSystemLoader
ENABLED_FILETYPES = [ ' txt ' , ' csv ' , ' pdf ' , ' doc ' , ' docx ' , ' xls ' , ' xlsx ' , ' ppt ' , ' pptx ' , ' xml ' , ' mxliff ' , ' tmx ' ]
REGEX_EMAIL = re . compile ( ' ^[a-z0-9]+[ \ ._]?[a-z0-9]+[@] \ w+[.] \ w { 2,3}$ ' )
from . model import db , UploadUnauthenticated
class ContractCreator :
@ -49,202 +48,161 @@ class ContractCreator:
pdfkit . from_string ( html_str , out_f , options = self . pdfkit_options )
contract_creator = ContractCreator ( )
def get_upload_metadata ( corpus_name , request ) :
upload_metadata = dict ( )
file_hashes = create_file_hashes ( request . files )
file_names = file_hashes . keys ( )
form_data = request . form . copy ( )
upload_timestamp = int ( time . time ( ) )
upload_id = create_upload_id ( corpus_name , form_data , upload_timestamp , file_hashes )
upload_metadata [ ' corpus_name ' ] = corpus_name
upload_metadata [ ' form_data ' ] = form_data
upload_metadata [ ' upload_id ' ] = upload_id
upload_metadata [ ' timestamp ' ] = upload_timestamp
upload_metadata [ ' file_hashes ' ] = file_hashes
upload_metadata [ ' file_names ' ] = file_names
return upload_metadata
def check_suffixes ( files ) :
for key , f in files . items ( ) :
if key . startswith ( ' file ' ) :
suffix = f . filename . split ( ' . ' ) [ - 1 ]
if suffix not in ENABLED_FILETYPES :
return ' Datoteka " {} " ni pravilnega formata. ' . format ( f . filename )
return None
def get_subdir ( uploads_path , dir_name ) :
subdir = uploads_path / dir_name
if not subdir . exists ( ) :
subdir . mkdir ( parents = True )
return subdir
def create_upload_id ( corpus_name , form_data , upload_timestamp , file_hashes ) :
ime = form_data . get ( ' ime ' )
podjetje = form_data . get ( ' podjetje ' )
naslov = form_data . get ( ' naslov ' )
posta = form_data . get ( ' posta ' )
email = form_data . get ( ' email ' )
telefon = form_data . get ( ' telefon ' )
# This hash serves as an unique identifier for the whole upload.
metahash = hashlib . md5 ( ( corpus_name + ime + podjetje + naslov + posta + email + telefon ) . encode ( ) )
# Include file hashes to avoid metafile name collisions if they have the same form values,
# but different data files. Sort hashes first so upload order doesn't matter.
sorted_f_hashes = list ( file_hashes . values ( ) )
sorted_f_hashes . sort ( )
metahash . update ( ' ' . join ( sorted_f_hashes ) . encode ( ) )
metahash = metahash . hexdigest ( )
return metahash
def check_form ( form ) :
ime = form . get ( ' ime ' )
podjetje = form . get ( ' podjetje ' )
naslov = form . get ( ' naslov ' )
posta = form . get ( ' posta ' )
email = form . get ( ' email ' )
telefon = form . get ( ' telefon ' )
if len ( ime ) > 100 :
return ' Predolgo ime. '
if len ( podjetje ) > 100 :
return ' Predolgo ime institucije. '
if len ( email ) > 100 :
return ' Predolgi email naslov '
elif not re . search ( REGEX_EMAIL , email ) :
return ' Email napačnega formata. '
if len ( telefon ) > 100 :
return ' Predolga telefonska št. '
if len ( naslov ) > 100 :
return ' Predolg naslov. '
if len ( posta ) > 100 :
return ' Predolga pošta '
return None
def create_file_hashes ( files ) :
res = dict ( )
for key , f in files . items ( ) :
if key . startswith ( ' file ' ) :
h = hashlib . md5 ( f . filename . encode ( ) )
h . update ( f . stream . read ( ) )
res [ f . filename ] = h . hexdigest ( )
f . seek ( 0 )
return res
def store_metadata ( uploads_path , upload_metadata ) :
base = get_subdir ( uploads_path , ' meta ' )
timestamp = upload_metadata [ ' timestamp ' ]
upload_id = upload_metadata [ ' upload_id ' ]
form_data = upload_metadata [ ' form_data ' ]
email = form_data [ ' email ' ]
file_hashes = upload_metadata [ ' file_hashes ' ]
contract = upload_metadata [ ' contract ' ]
filename = str ( timestamp ) + ' - ' + email + ' - ' + upload_id + ' .meta '
sorted_f_hashes = list ( file_hashes . values ( ) )
sorted_f_hashes . sort ( )
path = base / filename
with path . open ( ' w ' ) as f :
f . write ( ' korpus= ' + upload_metadata [ ' corpus_name ' ] )
f . write ( ' \n ime= ' + form_data [ ' ime ' ] )
f . write ( ' \n podjetje= ' + form_data [ ' podjetje ' ] )
f . write ( ' \n naslov= ' + form_data [ ' naslov ' ] )
f . write ( ' \n posta= ' + form_data [ ' posta ' ] )
f . write ( ' \n email= ' + form_data [ ' email ' ] )
f . write ( ' \n datoteke= ' + str ( sorted_f_hashes ) )
f . write ( ' \n pogodba= ' + contract )
def store_datafiles ( uploads_path , files , upload_metadata ) :
base = get_subdir ( uploads_path , ' files ' )
file_hashes = upload_metadata [ ' file_hashes ' ]
for key , f in files . items ( ) :
if key . startswith ( ' file ' ) :
path = base / file_hashes [ f . filename ]
if not path . exists ( ) :
path . mkdir ( )
f . save ( path / f . filename )
def generate_contract_pdf ( uploads_path , upload_metadata , contract_client_contact ) :
base = get_subdir ( uploads_path , ' contracts ' )
contract_file_name = upload_metadata [ ' upload_id ' ] + ' .pdf '
form_data = upload_metadata [ ' form_data ' ]
files_table_str = [ ]
for file_name in upload_metadata [ ' file_names ' ] :
files_table_str . append ( ' <tr><td style= " text-align: center; " > ' )
files_table_str . append ( file_name )
files_table_str . append ( ' </td></tr> ' )
files_table_str = ' ' . join ( files_table_str )
data = {
' ime_priimek ' : form_data [ ' ime ' ] ,
' naslov ' : form_data [ ' naslov ' ] ,
' posta ' : form_data [ ' posta ' ] ,
' kontakt_narocnik ' : contract_client_contact ,
' kontakt_imetnikpravic ' : form_data [ ' ime ' ] ,
' files_table_str ' : files_table_str
}
contract_creator . create_pdf ( base / contract_file_name , data )
return contract_file_name
def send_confirm_mail ( subject , body , uploads_path , upload_metadata , mail_host , mail_login , mail_pass , imap_port = 993 , smtp_port = 465 ) :
upload_id = upload_metadata [ ' upload_id ' ]
message = MIMEMultipart ( )
message [ ' From ' ] = mail_login
message [ ' To ' ] = upload_metadata [ ' form_data ' ] [ ' email ' ]
message [ ' Subject ' ] = subject . format ( upload_id = upload_id )
body = body . format ( upload_id = upload_id )
message . attach ( MIMEText ( body , " plain " ) )
contracts_dir = get_subdir ( uploads_path , ' contracts ' )
base_name = upload_metadata [ ' contract ' ]
contract_file = contracts_dir / base_name
with open ( contract_file , " rb " ) as f :
part = MIMEApplication (
f . read ( ) ,
Name = base_name
)
part [ ' Content-Disposition ' ] = ' attachment; filename= " %s " ' % base_name
message . attach ( part )
text = message . as_string ( )
# Create a secure SSL context
context = ssl . create_default_context ( )
with SMTP_SSL ( mail_host , smtp_port , context = context ) as server :
server . login ( mail_login , mail_pass )
server . sendmail ( message [ ' From ' ] , message [ ' To ' ] , text )
# Save copy of sent mail in Sent mailbox
imap = imaplib . IMAP4_SSL ( mail_host , imap_port )
imap . login ( mail_login , mail_pass )
imap . append ( ' Sent ' , ' \\ Seen ' , imaplib . Time2Internaldate ( time . time ( ) ) , text . encode ( ' utf8 ' ) )
imap . logout ( )
class UploadHandler :
def __init__ ( self , * * kwargs ) :
self . config = kwargs
self . contract_creator = ContractCreator ( )
def extract_upload_metadata ( self , corpus_name , request ) :
upload_metadata = dict ( )
file_hashes = self . create_file_hashes ( request . files )
file_names = file_hashes . keys ( )
form_data = request . form . copy ( )
upload_timestamp = int ( time . time ( ) )
upload_id = self . create_upload_id ( corpus_name , form_data , upload_timestamp , file_hashes )
upload_metadata [ ' corpus_name ' ] = corpus_name
upload_metadata [ ' form_data ' ] = form_data
upload_metadata [ ' upload_id ' ] = upload_id
upload_metadata [ ' timestamp ' ] = upload_timestamp
upload_metadata [ ' file_hashes_dict ' ] = file_hashes
upload_metadata [ ' file_names ' ] = file_names
upload_metadata [ ' contract_file ' ] = upload_id + ' .pdf '
return upload_metadata
def get_uploads_subdir ( self , dir_name ) :
subdir = self . config [ ' UPLOADS_DIR ' ] / dir_name
if not subdir . exists ( ) :
subdir . mkdir ( parents = True )
return subdir
def create_upload_id ( self , corpus_name , form_data , upload_timestamp , file_hashes ) :
# Order is important while hashing, hence the sorting.
val_buff = [ str ( upload_timestamp ) ]
for key in sorted ( form_data ) :
val_buff . append ( form_data [ key ] )
# This hash serves as an unique identifier for the whole upload.
metahash = hashlib . md5 ( ( ' ' . join ( val_buff ) ) . encode ( ) )
# Include file hashes to avoid metafile name collisions if they have the same form values,
# but different data files. Sort hashes first so upload order doesn't matter.
sorted_f_hashes = list ( file_hashes . values ( ) )
sorted_f_hashes . sort ( )
metahash . update ( ' ' . join ( sorted_f_hashes ) . encode ( ) )
metahash = metahash . hexdigest ( )
return metahash
def create_file_hashes ( self , files ) :
res = dict ( )
for key , f in files . items ( ) :
if key . startswith ( ' file ' ) :
h = hashlib . md5 ( f . filename . encode ( ) )
h . update ( f . stream . read ( ) )
res [ f . filename ] = h . hexdigest ( )
f . seek ( 0 )
return res
def store_metadata_unauthenticated ( self , upload_metadata ) :
timestamp = datetime . fromtimestamp ( upload_metadata [ ' timestamp ' ] )
form_data = upload_metadata [ ' form_data ' ]
file_hashes = upload_metadata [ ' file_hashes_dict ' ]
sorted_f_hashes = list ( file_hashes . values ( ) )
sorted_f_hashes . sort ( )
try :
upload_unauthenticated = UploadUnauthenticated (
upload_hash = upload_metadata [ ' upload_id ' ] ,
timestamp = timestamp ,
form_name = form_data [ ' ime ' ] ,
form_org = form_data [ ' podjetje ' ] ,
form_address = form_data [ ' naslov ' ] ,
form_zipcode = form_data [ ' posta ' ] ,
form_email = form_data [ ' email ' ] ,
file_contract = upload_metadata [ ' contract_file ' ] ,
upload_file_hashes = sorted_f_hashes
)
db . session . add ( upload_unauthenticated )
db . session . commit ( )
except Exception :
traceback . print_exc ( )
def store_metadata_authenticated ( self , upload_metadata ) :
pass
def store_datafiles ( self , files , upload_metadata ) :
base = self . get_uploads_subdir ( ' files ' )
file_hashes = upload_metadata [ ' file_hashes_dict ' ]
for key , f in files . items ( ) :
if key . startswith ( ' file ' ) :
path = base / file_hashes [ f . filename ]
if not path . exists ( ) :
path . mkdir ( )
f . save ( path / f . filename )
def generate_upload_contract_pdf ( self , upload_metadata ) :
base = self . get_uploads_subdir ( ' contracts ' )
form_data = upload_metadata [ ' form_data ' ]
files_table_str = [ ]
for file_name in upload_metadata [ ' file_names ' ] :
files_table_str . append ( ' <tr><td style= " text-align: center; " > ' )
files_table_str . append ( file_name )
files_table_str . append ( ' </td></tr> ' )
files_table_str = ' ' . join ( files_table_str )
data = {
' ime_priimek ' : form_data [ ' ime ' ] ,
' naslov ' : form_data [ ' naslov ' ] ,
' posta ' : form_data [ ' posta ' ] ,
' kontakt_narocnik ' : self . config [ ' CONTRACT_CLIENT_CONTACT ' ] ,
' kontakt_imetnikpravic ' : form_data [ ' ime ' ] ,
' files_table_str ' : files_table_str
}
self . contract_creator . create_pdf ( base / upload_metadata [ ' contract_file ' ] , data )
def send_confirm_mail ( self , upload_metadata ) :
upload_id = upload_metadata [ ' upload_id ' ]
message = MIMEMultipart ( )
message [ ' From ' ] = self . config [ ' MAIL_LOGIN ' ]
message [ ' To ' ] = upload_metadata [ ' form_data ' ] [ ' email ' ]
message [ ' Subject ' ] = self . config [ ' MAIL_SUBJECT ' ] . format ( upload_id = upload_id )
body = self . config [ ' MAIL_BODY ' ] . format ( upload_id = upload_id )
message . attach ( MIMEText ( body , " plain " ) )
contracts_dir = self . get_uploads_subdir ( ' contracts ' )
base_name = upload_metadata [ ' contract_file ' ]
contract_file = contracts_dir / base_name
with open ( contract_file , " rb " ) as f :
part = MIMEApplication (
f . read ( ) ,
Name = base_name
)
part [ ' Content-Disposition ' ] = ' attachment; filename= " %s " ' % base_name
message . attach ( part )
text = message . as_string ( )
# Create a secure SSL context
context = ssl . create_default_context ( )
with SMTP_SSL ( self . config [ ' MAIL_HOST ' ] , self . config [ ' SMTP_PORT ' ] , context = context ) as server :
server . login ( self . config [ ' MAIL_LOGIN ' ] , self . config [ ' MAIL_PASS ' ] )
server . sendmail ( message [ ' From ' ] , message [ ' To ' ] , text )
# Save copy of sent mail in Sent mailbox
imap = imaplib . IMAP4_SSL ( self . config [ ' MAIL_HOST ' ] , self . config [ ' IMAP_PORT ' ] )
imap . login ( self . config [ ' MAIL_LOGIN ' ] , self . config [ ' MAIL_PASS ' ] )
imap . append ( ' Sent ' , ' \\ Seen ' , imaplib . Time2Internaldate ( time . time ( ) ) , text . encode ( ' utf8 ' ) )
imap . logout ( )