portal-oddajanje-solar/portal/base.py

412 lines
14 KiB
Python
Raw Normal View History

import os
import hashlib
import time
import ssl
import traceback
import re
2021-06-08 06:00:18 +00:00
import logging
from pathlib import Path
from datetime import datetime
import imaplib
from smtplib import SMTP_SSL
import email
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import pdfkit
from jinja2 import Environment, FileSystemLoader
import jwt
2021-06-08 06:00:18 +00:00
from werkzeug.security import generate_password_hash
from . model import db, UploadRegular, UploadSolar, RegisteredUser, UserInstitutionMapping, Institution, InstitutionContract, CorpusAccess
2021-05-05 12:26:26 +00:00
2021-05-27 06:43:19 +00:00
#REGEX_EMAIL = re.compile('^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$')
REGEX_EMAIL = re.compile('^(?:[a-z0-9!#$%&\'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*|\"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])$')
MAX_FNAME_LEN = 100
class ContractCreator:
def __init__(self, base_path, template_path):
self.base = base_path
template_loader = FileSystemLoader(searchpath="./")
template_env = Environment(loader=template_loader)
self.template = template_env.get_template(template_path)
self.pdfkit_options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
'custom-header' : [
('Accept-Encoding', 'gzip')
]
}
def fill_template(self, **kwargs):
return self.template.render(**kwargs)
def create_pdf(self, f_name, fields_dict):
sub_dir = self.base / Path(f_name[:2])
if not sub_dir.exists():
sub_dir.mkdir()
out_f = sub_dir / Path(f_name[2:])
html_str = self.fill_template(**fields_dict)
pdfkit.from_string(html_str, out_f, options=self.pdfkit_options)
class UploadHandler:
2021-05-25 12:51:40 +00:00
ENABLED_FILETYPES = ['txt', 'csv', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'xml', 'mxliff', 'tmx']
def __init__(self, **kwargs):
self.config = kwargs
def set_contract_creator(self, contract_creator):
assert isinstance(contract_creator, ContractCreator)
self._contract_creator = contract_creator
def get_uploads_subdir(self, dir_name):
subdir = Path(self.config['UPLOADS_DIR']) / dir_name
if not subdir.exists():
subdir.mkdir(parents=True)
return subdir
@staticmethod
def extract_upload_metadata(corpus_name, request):
upload_metadata = dict()
file_hashes = UploadHandler.create_file_hashes(request.files)
file_names = file_hashes.keys()
form_data = request.form.copy()
upload_timestamp = int(time.time())
upload_id = UploadHandler.create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes)
2021-05-27 06:43:19 +00:00
# Strip form fieds.
for key, val in form_data.items():
form_data[key] = val.strip()
upload_metadata['corpus_name'] = corpus_name
upload_metadata['form_data'] = form_data
upload_metadata['upload_id'] = upload_id
upload_metadata['timestamp'] = upload_timestamp
upload_metadata['file_hashes_dict'] = file_hashes
upload_metadata['file_names'] = file_names
upload_metadata['contract_file'] = upload_id + '.pdf'
return upload_metadata
@staticmethod
def create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes):
# Order is important while hashing, hence the sorting.
val_buff = [str(upload_timestamp)]
for key in sorted(form_data):
val_buff.append(form_data[key])
# This hash serves as an unique identifier for the whole upload.
metahash = hashlib.md5((''.join(val_buff)).encode())
# Include file hashes to avoid metafile name collisions if they have the same form values,
# but different data files. Sort hashes first so upload order doesn't matter.
sorted_f_hashes = list(file_hashes.values())
sorted_f_hashes.sort()
metahash.update(''.join(sorted_f_hashes).encode())
metahash = metahash.hexdigest()
return metahash
@staticmethod
def create_file_hashes(files):
res = dict()
for key, f in files.items():
if key.startswith('file'):
h = hashlib.md5(f.filename.encode())
h.update(f.stream.read())
res[f.filename] = h.hexdigest()
f.seek(0)
return res
@staticmethod
2021-06-08 06:00:18 +00:00
def store_model(model_obj):
2021-05-05 12:26:26 +00:00
try:
db.session.add(model_obj)
2021-05-05 12:26:26 +00:00
db.session.commit()
except Exception:
traceback.print_exc()
def store_datafiles(self, files, upload_metadata):
base = self.get_uploads_subdir('files')
file_hashes = upload_metadata['file_hashes_dict']
for key, f in files.items():
if key.startswith('file'):
f_hash = file_hashes[f.filename]
# First byte used for indexing, similarly like git does for example.
sub_dir = base / f_hash[:2]
if not sub_dir.exists():
sub_dir.mkdir()
path = sub_dir / f_hash[2:]
if not path.exists():
path.mkdir()
f.save(path / f.filename)
2021-05-25 11:59:38 +00:00
def send_confirm_mail(self, upload_metadata, attach_contract_file=False):
upload_id = upload_metadata['upload_id']
message = MIMEMultipart()
message['From'] = self.config['MAIL_LOGIN']
message['To'] = upload_metadata['form_data']['email']
message['Subject'] = self.config['MAIL_SUBJECT'].format(upload_id=upload_id)
body = self.config['MAIL_BODY'].format(upload_id=upload_id)
message.attach(MIMEText(body, "plain"))
2021-05-25 11:59:38 +00:00
if attach_contract_file:
contracts_dir = self.contract_creator.base
f_name = upload_metadata['contract_file']
sub_dir = contracts_dir / Path(f_name[:2])
contract_file = sub_dir / Path(f_name[2:])
with open(contract_file, "rb") as f:
part = MIMEApplication(
f.read(),
Name = f_name
)
part['Content-Disposition'] = 'attachment; filename="%s"' % f_name
message.attach(part)
text = message.as_string()
# Create a secure SSL context
context = ssl.create_default_context()
2021-05-27 06:43:19 +00:00
# TODO: Implement timeout.
try:
with SMTP_SSL(self.config['MAIL_HOST'], self.config['SMTP_PORT'], context=context) as server:
server.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
server.sendmail(message['From'], message['To'], text)
# Save copy of sent mail in Sent mailbox
#imap = imaplib.IMAP4_SSL(self.config['MAIL_HOST'], self.config['IMAP_PORT'])
#imap.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
#imap.append('Sent', '\\Seen', imaplib.Time2Internaldate(time.time()), text.encode('utf8'))
#imap.logout()
except Exception:
traceback.print_exc()
2021-05-05 12:26:26 +00:00
2021-05-25 13:13:25 +00:00
def check_suffixes(self, files):
for key, f in files.items():
if key.startswith('file'):
suffix = f.filename.split('.')[-1]
2021-05-25 13:13:25 +00:00
if self.ENABLED_FILETYPES and suffix not in self.ENABLED_FILETYPES:
return 'Datoteka "{}" ni pravilnega formata.'.format(f.filename)
return None
2021-05-05 12:26:26 +00:00
@staticmethod
def check_fname_lengths(files):
for key, f in files.items():
if key.startswith('file'):
if len(f.filename) > MAX_FNAME_LEN:
return 'Ime datoteke presega dolžino {} znakov.'.format(MAX_FNAME_LEN)
return None
def check_upload_request(self, request):
files = request.files
max_files = self.config['MAX_FILES_PER_UPLOAD']
if len(files) > max_files:
return 'Naložite lahko do {} datotek hkrati.'.format(max_files), 400
elif len(files) < 1:
return 'Priložena ni bila nobena datoteka.', 400
2021-05-25 13:13:25 +00:00
err = self.check_suffixes(files)
if err:
return err, 400
err = UploadHandler.check_fname_lengths(files)
if err:
return err, 400
return None
2021-06-08 06:00:18 +00:00
def get_user_institution(user_id):
mapping = UserInstitutionMapping.query.filter_by(user=user_id).first()
if mapping:
return Institution.query.filter_by(id=mapping.institution).first()
return None
2021-05-05 12:26:26 +00:00
2021-05-24 08:15:54 +00:00
def has_user_corpus_access(user_id, corpus_name):
user = RegisteredUser.query.filter_by(id=user_id).first()
2021-06-08 06:00:18 +00:00
# TODO: check if user even is active?
# Admins always have access to everything.
if user.role == 'admin':
return True
# Check if user belongs to an institution, that has access to this corpus.
institution = get_user_institution(user_id)
2021-06-08 06:00:18 +00:00
has_access = False
row = CorpusAccess.query.filter_by(institution=institution.id, corpus=corpus_name).first()
if row:
has_access = True
2021-06-08 06:00:18 +00:00
return has_access
def is_admin(user_id):
user = RegisteredUser.query.filter_by(id=user_id).first()
2021-05-24 08:15:54 +00:00
if user.role == 'admin':
return True
2021-06-08 06:00:18 +00:00
return False
2021-05-24 08:15:54 +00:00
def get_user_obj(user_id):
return RegisteredUser.query.filter_by(id=user_id).first()
2021-06-08 06:00:18 +00:00
2021-05-24 08:15:54 +00:00
def get_institution_obj(institution_id):
return Institution.query.filter_by(id=institution_id).first()
2021-06-08 06:00:18 +00:00
def register_new_user(name, email, password, active=True, admin=False):
model_obj = RegisteredUser(
name=name,
email=email,
role='admin' if admin else 'user',
pass_hash=generate_password_hash(password),
active=active,
registered=datetime.now()
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def add_institution(name, region):
model_obj = Institution(
name=name,
region=region
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def grant_institution_corpus_access(institution_id, corpus_name):
model_obj = CorpusAccess(
institution=institution_id,
corpus=corpus_name
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def add_user_to_institution(user_id, institution_id, role):
model_obj = UserInstitutionMapping(
user=user_id,
institution=institution_id,
role=role
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def activate_user(user_id):
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'active': True})
db.session.commit()
return rowcount
def update_user_password(user_id, new_password):
phash = generate_password_hash(new_password)
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'pass_hash': pass_hash})
db.session.commit()
return rowcount
def del_user_from_institution(user_id, institution_id):
db.session.query(UserInstitutionMapping).filter(UserInstitutionMapping.institution == institution_id).filter(UserInstitutionMapping.user == user_id).delete()
db.session.commit()
2021-06-08 06:00:18 +00:00
def get_all_active_users():
return RegisteredUser.query.filter_by(active=True).order_by(RegisteredUser.id).all()
def get_all_inactive_users():
return RegisteredUser.query.filter_by(active=False).order_by(RegisteredUser.id).all()
def get_all_active_users_join_institutions():
#return RegisteredUser.query.filter_by(active=True).order_by(RegisteredUser.id).all()
return db.session.query(RegisteredUser, UserInstitutionMapping).outerjoin(UserInstitutionMapping,
RegisteredUser.id == UserInstitutionMapping.user).order_by(RegisteredUser.id).all()
def get_all_active_institution_users(institution_id):
return RegisteredUser.query.filter_by(active=True).join(UserInstitutionMapping,
RegisteredUser.id == UserInstitutionMapping.user).filter(UserInstitutionMapping.institution == institution_id).all()
2021-06-08 06:00:18 +00:00
def is_institution_moderator(user_id, institution_id):
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).filter_by(institution=institution_id).first()
2021-06-08 06:00:18 +00:00
if not user_inst_mapping:
return False
if user_inst_mapping.role != 'moderator':
return False
return True
def is_institution_member(user_id, institution_id):
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).filter_by(institution=institution_id).first()
if not user_inst_mapping:
return False
return True
def get_password_reset_token(email, expires=500):
return jwt.encode({'reset_password': email,
'exp': time() + expires},
key=os.getenv('APP_SECRET_KEY'), algorithm='HS256')
def verify_reset_token(token):
try:
email = jwt.decode(token,
key=os.getenv('APP_SECRET_KEY'), algorithms=["HS256"])['reset_password']
except Exception as e:
logging.error(e)
return
return RegisteredUser.query.filter_by(email=email).first()
def send_resetpass_mail(email, config):
jwt_token = get_password_reset_token(email)
text = '''
Zahtevali ste ponastavitev gesla vašega uporabniškega računa.
Geslo lahko ponastavite na naslednji povezavi: https://zbiranje.slovenscina.eu/solar/resetpass/{}'''.format(
'https://zbiranje.slovenscina.eu/solar/resetpass/{}'.format(jwt_token))
# Create a secure SSL context
context = ssl.create_default_context()
try:
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
server.sendmail(config['MAIL_LOGIN'], email, text)
except Exception:
traceback.print_exc()