You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
portal-oddajanje-solar/portal/base.py

312 lines
11 KiB

import hashlib
import time
import ssl
import traceback
from pathlib import Path
from datetime import datetime
import imaplib
from smtplib import SMTP_SSL
import email
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import pdfkit
from jinja2 import Environment, FileSystemLoader
from . model import db, UploadUnauthenticated, UploadSolar
ENABLED_FILETYPES = ['txt', 'csv', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'xml', 'mxliff', 'tmx']
REGEX_EMAIL = re.compile('^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$')
class ContractCreator:
def __init__(self):
template_loader = FileSystemLoader(searchpath="./")
template_env = Environment(loader=template_loader)
self.template = template_env.get_template('contract/template.html')
self.pdfkit_options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
'custom-header' : [
('Accept-Encoding', 'gzip')
]
}
def fill_template(self, **kwargs):
return self.template.render(**kwargs)
def create_pdf(self, out_f, fields_dict):
html_str = self.fill_template(**fields_dict)
pdfkit.from_string(html_str, out_f, options=self.pdfkit_options)
class UploadHandler:
def __init__(self, **kwargs):
self.config = kwargs
self.contract_creator = ContractCreator()
def extract_upload_metadata(self, corpus_name, request):
upload_metadata = dict()
file_hashes = self.create_file_hashes(request.files)
file_names = file_hashes.keys()
form_data = request.form.copy()
upload_timestamp = int(time.time())
upload_id = self.create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes)
upload_metadata['corpus_name'] = corpus_name
upload_metadata['form_data'] = form_data
upload_metadata['upload_id'] = upload_id
upload_metadata['timestamp'] = upload_timestamp
upload_metadata['file_hashes_dict'] = file_hashes
upload_metadata['file_names'] = file_names
upload_metadata['contract_file'] = upload_id + '.pdf'
return upload_metadata
def get_uploads_subdir(self, dir_name):
subdir = Path(self.config['UPLOADS_DIR']) / dir_name
if not subdir.exists():
subdir.mkdir(parents=True)
return subdir
def create_upload_id(self, corpus_name, form_data, upload_timestamp, file_hashes):
# Order is important while hashing, hence the sorting.
val_buff = [str(upload_timestamp)]
for key in sorted(form_data):
val_buff.append(form_data[key])
# This hash serves as an unique identifier for the whole upload.
metahash = hashlib.md5((''.join(val_buff)).encode())
# Include file hashes to avoid metafile name collisions if they have the same form values,
# but different data files. Sort hashes first so upload order doesn't matter.
sorted_f_hashes = list(file_hashes.values())
sorted_f_hashes.sort()
metahash.update(''.join(sorted_f_hashes).encode())
metahash = metahash.hexdigest()
return metahash
def create_file_hashes(self, files):
res = dict()
for key, f in files.items():
if key.startswith('file'):
h = hashlib.md5(f.filename.encode())
h.update(f.stream.read())
res[f.filename] = h.hexdigest()
f.seek(0)
return res
def store_metadata_unauthenticated(self, upload_metadata):
timestamp = datetime.fromtimestamp(upload_metadata['timestamp'])
form_data = upload_metadata['form_data']
file_hashes = upload_metadata['file_hashes_dict']
sorted_f_hashes = list(file_hashes.values())
sorted_f_hashes.sort()
try:
upload_unauthenticated = UploadUnauthenticated(
upload_hash=upload_metadata['upload_id'],
timestamp=timestamp,
form_name=form_data['ime'],
form_org=form_data['podjetje'],
form_address=form_data['naslov'],
form_zipcode=form_data['posta'],
form_email=form_data['email'],
file_contract=upload_metadata['contract_file'],
upload_file_hashes=sorted_f_hashes,
corpus_name=todo
)
db.session.add(upload_unauthenticated)
db.session.commit()
except Exception:
traceback.print_exc()
def store_metadata_solar(self, upload_metadata):
timestamp = datetime.fromtimestamp(upload_metadata['timestamp'])
form_data = upload_metadata['form_data']
file_hashes = upload_metadata['file_hashes_dict']
sorted_f_hashes = list(file_hashes.values())
sorted_f_hashes.sort()
try:
upload_solar = UploadSolar(
upload_user = todo,
institution = todo,
upload_hash=upload_metadata['upload_id'],
timestamp=timestamp,
form_program=form_data['program'],
form_subject=form_data['subject'],
form_grade=form_data['grade'],
form_text_type=form_data['text_type'],
form_school_year=form_data['school_year'],
form_grammar_corrections=form_data['grammar_corrections'],
upload_file_hashes=sorted_f_hashes
)
db.session.add(upload_unauthenticated)
db.session.commit()
except Exception:
traceback.print_exc()
def store_datafiles(self, files, upload_metadata):
base = self.get_uploads_subdir('files')
file_hashes = upload_metadata['file_hashes_dict']
for key, f in files.items():
if key.startswith('file'):
path = base / file_hashes[f.filename]
if not path.exists():
path.mkdir()
f.save(path / f.filename)
def generate_upload_contract_pdf(self, upload_metadata):
base = self.get_uploads_subdir('contracts')
form_data = upload_metadata['form_data']
files_table_str = []
for file_name in upload_metadata['file_names']:
files_table_str.append('<tr><td style="text-align: center;">')
files_table_str.append(file_name)
files_table_str.append('</td></tr>')
files_table_str = ''.join(files_table_str)
data = {
'ime_priimek': form_data['ime'],
'naslov': form_data['naslov'],
'posta': form_data['posta'],
'kontakt_narocnik': self.config['CONTRACT_CLIENT_CONTACT'],
'kontakt_imetnikpravic': form_data['ime'],
'files_table_str': files_table_str
}
self.contract_creator.create_pdf(base / upload_metadata['contract_file'], data)
def send_confirm_mail(self, upload_metadata):
upload_id = upload_metadata['upload_id']
message = MIMEMultipart()
message['From'] = self.config['MAIL_LOGIN']
message['To'] = upload_metadata['form_data']['email']
message['Subject'] = self.config['MAIL_SUBJECT'].format(upload_id=upload_id)
body = self.config['MAIL_BODY'].format(upload_id=upload_id)
message.attach(MIMEText(body, "plain"))
contracts_dir = self.get_uploads_subdir('contracts')
base_name = upload_metadata['contract_file']
contract_file = contracts_dir / base_name
with open(contract_file, "rb") as f:
part = MIMEApplication(
f.read(),
Name = base_name
)
part['Content-Disposition'] = 'attachment; filename="%s"' % base_name
message.attach(part)
text = message.as_string()
# Create a secure SSL context
context = ssl.create_default_context()
with SMTP_SSL(self.config['MAIL_HOST'], self.config['SMTP_PORT'], context=context) as server:
server.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
server.sendmail(message['From'], message['To'], text)
# Save copy of sent mail in Sent mailbox
imap = imaplib.IMAP4_SSL(self.config['MAIL_HOST'], self.config['IMAP_PORT'])
imap.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
imap.append('Sent', '\\Seen', imaplib.Time2Internaldate(time.time()), text.encode('utf8'))
imap.logout()
def handle_upload_unauthenticated(request, corpus_name, upload_handler):
files = request.files
if len(files) > upload_handler.MAX_FILES_PER_UPLOAD:
return 'Naložite lahko do {} datotek hkrati.'.format(upload_handler.MAX_FILES_PER_UPLOAD), 400
elif len(files) < 1:
return 'Priložena ni bila nobena datoteka.', 400
err = check_suffixes(files)
if err:
return err, 400
err = check_form(request.form)
if err:
return err, 400
# Parse request.
upload_metadata = upload_handler.extract_upload_metadata(corpus_name, request)
logging.info('Upload with id "{}" supplied form data: {}'.format(upload_metadata['upload_id'],
str(upload_metadata['form_data'])))
# Generate contract PDF file based on the uploads metadata.
upload_handler.generate_upload_contract_pdf(upload_metadata)
# Store uploaded files to disk.
upload_handler.store_datafiles(files, upload_metadata)
# Store metadata to database.
upload_handler.store_metadata_unauthenticated(upload_metadata)
# Send confirmation mail along with the contract to the submitted email address.
upload_handler.send_confirm_mail(upload_metadata)
return 'Uspešno ste oddali datotek(e). Št. datotek: {}'.format(len(files))
def check_suffixes(files):
for key, f in files.items():
if key.startswith('file'):
suffix = f.filename.split('.')[-1]
if suffix not in ENABLED_FILETYPES:
return 'Datoteka "{}" ni pravilnega formata.'.format(f.filename)
return None
def check_form(form):
ime = form.get('ime')
podjetje = form.get('podjetje')
naslov = form.get('naslov')
posta = form.get('posta')
email = form.get('email')
telefon = form.get('telefon')
if len(ime) > 100:
return 'Predolgo ime.'
if len(podjetje) > 100:
return 'Predolgo ime institucije.'
if len(email) > 100:
return 'Predolgi email naslov'
elif not re.search(REGEX_EMAIL, email):
return 'Email napačnega formata.'
if len(telefon) > 100:
return 'Predolga telefonska št.'
if len(naslov) > 100:
return 'Predolg naslov.'
if len(posta) > 100:
return 'Predolga pošta'
return None