338 lines
10 KiB
Python
338 lines
10 KiB
Python
import os
|
|
import re
|
|
import hashlib
|
|
import time
|
|
import ssl
|
|
import configparser
|
|
from pathlib import Path
|
|
|
|
from flask import Flask, render_template, request
|
|
from flask_dropzone import Dropzone
|
|
|
|
import imaplib
|
|
from smtplib import SMTP_SSL
|
|
|
|
import email
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.application import MIMEApplication
|
|
|
|
import pdfkit
|
|
from jinja2 import Environment, FileSystemLoader
|
|
|
|
|
|
class ContractCreator:
|
|
|
|
def __init__(self):
|
|
template_loader = FileSystemLoader(searchpath="./")
|
|
template_env = Environment(loader=template_loader)
|
|
self.template = template_env.get_template('contract/template.html')
|
|
|
|
self.pdfkit_options = {
|
|
'page-size': 'A4',
|
|
'margin-top': '0.75in',
|
|
'margin-right': '0.75in',
|
|
'margin-bottom': '0.75in',
|
|
'margin-left': '0.75in',
|
|
'encoding': "UTF-8",
|
|
'custom-header' : [
|
|
('Accept-Encoding', 'gzip')
|
|
]
|
|
}
|
|
|
|
def fill_template(self, **kwargs):
|
|
return self.template.render(**kwargs)
|
|
|
|
def create_pdf(self, out_f, fields_dict):
|
|
html_str = self.fill_template(**fields_dict)
|
|
pdfkit.from_string(html_str, out_f, options=self.pdfkit_options)
|
|
|
|
|
|
ENABLED_FILETYPES = ['txt', 'csv', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx']
|
|
REGEX_EMAIL = re.compile('^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$')
|
|
|
|
######################
|
|
# Load configuration #
|
|
######################
|
|
config = configparser.ConfigParser()
|
|
config.read('config.ini')
|
|
config = config['DEFAULT']
|
|
|
|
MAIL_HOST = config['MAIL_HOST']
|
|
MAIL_LOGIN = config['MAIL_LOGIN']
|
|
MAIL_PASS = config['MAIL_PASS']
|
|
SMTP_PORT = int(config['SMTP_PORT'])
|
|
IMAP_PORT = int(config['IMAP_PORT'])
|
|
MAX_UPLOAD_SIZE = int(config['MAX_UPLOAD_SIZE']) # Bytes
|
|
CONTRACT_CLIENT_CONTACT = config['CONTRACT_CLIENT_CONTACT']
|
|
if 'BASE_DIR' in config:
|
|
BASE_DIR = Path(config['BASE_DIR'])
|
|
else:
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
if 'URL_ROOT' in config:
|
|
URL_ROOT = config['URL_ROOT']
|
|
else:
|
|
URL_ROOT = '/'
|
|
|
|
# Override configs with environment variables, if set
|
|
if 'PORTALDS4DS1_MAIL_HOST' in os.environ:
|
|
MAIL_HOST = os.environ['PORTALDS4DS1_MAIL_HOST']
|
|
if 'PORTALDS4DS1_MAIL_LOGIN' in os.environ:
|
|
MAIL_LOGIN = os.environ['PORTALDS4DS1_MAIL_LOGIN']
|
|
if 'PORTALDS4DS1_MAIL_PASS' in os.environ:
|
|
MAIL_PASS = os.environ['PORTALDS4DS1_MAIL_PASS']
|
|
if 'PORTALDS4DS1_SMTP_PORT' in os.environ:
|
|
SMTP_PORT = int(os.environ['PORTALDS4DS1_SMTP_PORT'])
|
|
if 'PORTALDS4DS1_IMAP_PORT' in os.environ:
|
|
IMAP_PORT = int(os.environ['PORTALDS4DS1_IMAP_PORT'])
|
|
if 'PORTALDS4DS1_MAX_UPLOAD_SIZE' in os.environ:
|
|
MAX_UPLOAD_SIZE = int(os.environ['PORTALDS4DS1_MAX_UPLOAD_SIZE'])
|
|
if 'PORTALDS4DS1_CONTRACT_CLIENT_CONTACT' in os.environ:
|
|
CONTRACT_CLIENT_CONTACT = os.environ['PORTALDS4DS1_CONTRACT_CLIENT_CONTACT']
|
|
if 'PORTALDS4DS1_URL_ROOT' in os.environ:
|
|
URL_ROOT = os.environ['PORTALDS4DS1_URL_ROOT']
|
|
|
|
UPLOAD_DIR = BASE_DIR / 'uploads'
|
|
if not UPLOAD_DIR.exists:
|
|
UPLOAD_DIR.mkdir(parents=True)
|
|
|
|
######################
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.config.update(
|
|
UPLOADED_PATH = UPLOAD_DIR,
|
|
MAX_CONTENT_LENGTH = MAX_UPLOAD_SIZE,
|
|
TEMPLATES_AUTO_RELOAD = True
|
|
)
|
|
|
|
dropzone = Dropzone(app)
|
|
|
|
contract_creator = ContractCreator()
|
|
|
|
|
|
@app.route(URL_ROOT)
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route(URL_ROOT + '/upload', methods=['POST'])
|
|
def handle_upload():
|
|
files = request.files
|
|
if len(files) > 20:
|
|
return 'Naložite lahko do 20 datotek hkrati.', 400
|
|
elif len(files) < 1:
|
|
return 'Priložena ni bila nobena datoteka.', 400
|
|
|
|
err = check_suffixes(files)
|
|
if err:
|
|
return err, 400
|
|
|
|
err = check_form(request.form)
|
|
if err:
|
|
return err, 400
|
|
|
|
upload_metadata = get_upload_metadata(request)
|
|
contract_file_name = generate_contract_pdf(upload_metadata)
|
|
# Add contract_file_name to metadata TODO: move somewhere else
|
|
upload_metadata['contract'] = contract_file_name
|
|
store_datafiles(files, upload_metadata)
|
|
store_metadata(upload_metadata)
|
|
send_confirm_mail(upload_metadata)
|
|
|
|
return 'Uspešno ste oddali datotek(e). Št. datotek: {}'.format(len(files))
|
|
|
|
|
|
def get_upload_metadata(request):
|
|
upload_metadata = dict()
|
|
|
|
file_hashes = create_file_hashes(request.files)
|
|
form_data = request.form.copy()
|
|
upload_timestamp = int(time.time())
|
|
upload_id = create_upload_id(form_data, upload_timestamp, file_hashes)
|
|
|
|
upload_metadata['form_data'] = form_data
|
|
upload_metadata['upload_id'] = upload_id
|
|
upload_metadata['timestamp'] = upload_timestamp
|
|
upload_metadata['file_hashes'] = file_hashes
|
|
|
|
return upload_metadata
|
|
|
|
def check_suffixes(files):
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
suffix = f.filename.split('.')[-1]
|
|
if suffix not in ENABLED_FILETYPES:
|
|
return 'Datoteka "{}" ni pravilnega formata.'.format(f.filename)
|
|
return None
|
|
|
|
|
|
def get_subdir(dir_name):
|
|
subdir = app.config['UPLOADED_PATH'] / dir_name
|
|
if not subdir.exists():
|
|
subdir.mkdir()
|
|
return subdir
|
|
|
|
|
|
def create_upload_id(form_data, upload_timestamp, file_hashes):
|
|
tip = form_data.get('tip')
|
|
ime = form_data.get('ime')
|
|
podjetje = form_data.get('podjetje')
|
|
naslov = form_data.get('naslov')
|
|
posta = form_data.get('posta')
|
|
email = form_data.get('email')
|
|
telefon = form_data.get('telefon')
|
|
|
|
# This hash serves as an unique identifier for the whole upload.
|
|
metahash = hashlib.md5((tip+ime+podjetje+naslov+posta+email+telefon).encode())
|
|
# Include file hashes to avoid metafile name collisions if they have the same form values,
|
|
# but different data files. Sort hashes first so upload order doesn't matter.
|
|
sorted_f_hashes = list(file_hashes.values())
|
|
sorted_f_hashes.sort()
|
|
metahash.update(''.join(sorted_f_hashes).encode())
|
|
metahash = metahash.hexdigest()
|
|
|
|
return metahash
|
|
|
|
|
|
def check_form(form):
|
|
tip = form.get('tip')
|
|
ime = form.get('ime')
|
|
podjetje = form.get('podjetje')
|
|
naslov = form.get('naslov')
|
|
posta = form.get('posta')
|
|
email = form.get('email')
|
|
telefon = form.get('telefon')
|
|
|
|
if tip not in ['enojez', 'prevodi']:
|
|
return 'Napačen tip besedila.'
|
|
|
|
if len(ime) > 100:
|
|
return 'Predolgo ime.'
|
|
|
|
if len(podjetje) > 100:
|
|
return 'Predolgo ime institucije.'
|
|
|
|
if len(email) > 100:
|
|
return 'Predolgi email naslov'
|
|
elif not re.search(REGEX_EMAIL, email):
|
|
return 'Email napačnega formata.'
|
|
|
|
if len(telefon) > 100:
|
|
return 'Predolga telefonska št.'
|
|
|
|
if len(naslov) > 100:
|
|
return 'Predolg naslov.'
|
|
|
|
if len(posta) > 100:
|
|
return 'Predolga pošta'
|
|
|
|
return None
|
|
|
|
|
|
def create_file_hashes(files):
|
|
res = dict()
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
h = hashlib.md5(f.filename.encode())
|
|
h.update(f.stream.read())
|
|
res[key] = h.hexdigest()
|
|
f.seek(0)
|
|
return res
|
|
|
|
|
|
def store_metadata(upload_metadata):
|
|
base = get_subdir('meta')
|
|
|
|
timestamp = upload_metadata['timestamp']
|
|
upload_id = upload_metadata['upload_id']
|
|
form_data = upload_metadata['form_data']
|
|
email = form_data['email']
|
|
file_hashes = upload_metadata['file_hashes']
|
|
contract = upload_metadata['contract']
|
|
filename = str(timestamp) + '-' + email + '-' + upload_id + '.meta'
|
|
|
|
sorted_f_hashes = list(file_hashes.values())
|
|
sorted_f_hashes.sort()
|
|
|
|
path = base / filename
|
|
with path.open('w') as f:
|
|
f.write('tip=' + form_data['tip'])
|
|
f.write('\nime=' + form_data['ime'])
|
|
f.write('\npodjetje=' + form_data['podjetje'])
|
|
f.write('\nnaslov=' + form_data['naslov'])
|
|
f.write('\nposta=' + form_data['posta'])
|
|
f.write('\nemail=' + form_data['email'])
|
|
f.write('\ntelefon=' + form_data['telefon'])
|
|
f.write('\nupload_time=' + str(upload_metadata['timestamp']))
|
|
f.write('\nupload_files=' + str(sorted_f_hashes))
|
|
f.write('\ncontract_file=' + contract)
|
|
|
|
|
|
def store_datafiles(files, upload_metadata):
|
|
base = get_subdir('files')
|
|
file_hashes = upload_metadata['file_hashes']
|
|
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
path = base / file_hashes[key]
|
|
if not path.exists():
|
|
path.mkdir()
|
|
f.save(path / f.filename)
|
|
|
|
def generate_contract_pdf(upload_metadata):
|
|
base = get_subdir('contracts')
|
|
contract_file_name = upload_metadata['upload_id'] + '.pdf'
|
|
form_data = upload_metadata['form_data']
|
|
data = {
|
|
'ime_priimek': form_data['ime'],
|
|
'naslov': form_data['naslov'],
|
|
'posta': form_data['posta'],
|
|
'kontakt_narocnik': CONTRACT_CLIENT_CONTACT,
|
|
'kontakt_imetnikpravic': form_data['ime']
|
|
}
|
|
|
|
contract_creator.create_pdf(base / contract_file_name, data)
|
|
return contract_file_name
|
|
|
|
def send_confirm_mail(upload_metadata):
|
|
body = 'Usprešno ste oddali besedila. V prilogi vam pošiljamo pogodbo.'
|
|
|
|
message = MIMEMultipart()
|
|
message['From'] = MAIL_LOGIN
|
|
message['To'] = upload_metadata['form_data']['email']
|
|
message['Subject'] = 'Pogodba za oddana besedila ' + upload_metadata['upload_id']
|
|
message.attach(MIMEText(body, "plain"))
|
|
|
|
contracts_dir = get_subdir('contracts')
|
|
base_name = upload_metadata['contract']
|
|
contract_file = contracts_dir / base_name
|
|
with open(contract_file, "rb") as f:
|
|
part = MIMEApplication(
|
|
f.read(),
|
|
Name = base_name
|
|
)
|
|
part['Content-Disposition'] = 'attachment; filename="%s"' % base_name
|
|
message.attach(part)
|
|
|
|
text = message.as_string()
|
|
|
|
# Create a secure SSL context
|
|
context = ssl.create_default_context()
|
|
|
|
with SMTP_SSL(MAIL_HOST, SMTP_PORT, context=context) as server:
|
|
server.login(MAIL_LOGIN, MAIL_PASS)
|
|
server.sendmail(message['From'], message['To'], text)
|
|
|
|
# Save copy of sent mail in Sent mailbox
|
|
imap = imaplib.IMAP4_SSL(MAIL_HOST, IMAP_PORT)
|
|
imap.login(MAIL_LOGIN, MAIL_PASS)
|
|
imap.append('Sent', '\\Seen', imaplib.Time2Internaldate(time.time()), text.encode('utf8'))
|
|
imap.logout()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|