597 lines
21 KiB
Python
597 lines
21 KiB
Python
import os
|
|
import hashlib
|
|
import time
|
|
import ssl
|
|
import traceback
|
|
import re
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
import imaplib
|
|
from smtplib import SMTP_SSL
|
|
|
|
import email
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.mime.application import MIMEApplication
|
|
|
|
import pdfkit
|
|
from jinja2 import Environment, FileSystemLoader
|
|
|
|
import jwt
|
|
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
from . model import *
|
|
|
|
|
|
#REGEX_EMAIL = re.compile('^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$')
|
|
REGEX_EMAIL = re.compile('^(?:[a-z0-9!#$%&\'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*|\"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])$')
|
|
|
|
MAX_FNAME_LEN = 100
|
|
|
|
|
|
class ContractCreator:
|
|
|
|
def __init__(self, base_path, template_path):
|
|
self.base = base_path
|
|
template_loader = FileSystemLoader(searchpath="./")
|
|
template_env = Environment(loader=template_loader)
|
|
self.template = template_env.get_template(template_path)
|
|
|
|
self.pdfkit_options = {
|
|
'page-size': 'A4',
|
|
'margin-top': '0.75in',
|
|
'margin-right': '0.75in',
|
|
'margin-bottom': '0.75in',
|
|
'margin-left': '0.75in',
|
|
'encoding': "UTF-8",
|
|
'custom-header' : [
|
|
('Accept-Encoding', 'gzip')
|
|
]
|
|
}
|
|
|
|
def fill_template(self, **kwargs):
|
|
return self.template.render(**kwargs)
|
|
|
|
def create_pdf(self, f_name, fields_dict):
|
|
sub_dir = self.base / Path(f_name[:2])
|
|
if not sub_dir.exists():
|
|
sub_dir.mkdir()
|
|
out_f = sub_dir / Path(f_name[2:])
|
|
html_str = self.fill_template(**fields_dict)
|
|
pdfkit.from_string(html_str, out_f, options=self.pdfkit_options)
|
|
|
|
|
|
class UploadHandler:
|
|
|
|
ENABLED_FILETYPES = ['txt', 'csv', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'xml', 'mxliff', 'tmx', 'jpg', 'jpeg', 'png']
|
|
|
|
def __init__(self, **kwargs):
|
|
self.config = kwargs
|
|
|
|
def set_contract_creator(self, contract_creator):
|
|
assert isinstance(contract_creator, ContractCreator)
|
|
self._contract_creator = contract_creator
|
|
|
|
def get_uploads_subdir(self, dir_name):
|
|
subdir = Path(self.config['UPLOADS_DIR']) / dir_name
|
|
if not subdir.exists():
|
|
subdir.mkdir(parents=True)
|
|
return subdir
|
|
|
|
@staticmethod
|
|
def extract_upload_metadata(corpus_name, request):
|
|
upload_metadata = dict()
|
|
|
|
file_hashes = UploadHandler.create_file_hashes(request.files)
|
|
file_names = file_hashes.keys()
|
|
form_data = request.form.copy()
|
|
upload_timestamp = int(time.time())
|
|
upload_id = UploadHandler.create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes)
|
|
|
|
# Strip form fieds.
|
|
for key, val in form_data.items():
|
|
form_data[key] = val.strip()
|
|
|
|
upload_metadata['corpus_name'] = corpus_name
|
|
upload_metadata['form_data'] = form_data
|
|
upload_metadata['upload_id'] = upload_id
|
|
upload_metadata['timestamp'] = upload_timestamp
|
|
upload_metadata['file_hashes_dict'] = file_hashes
|
|
upload_metadata['file_names'] = file_names
|
|
upload_metadata['contract_file'] = upload_id + '.pdf'
|
|
|
|
return upload_metadata
|
|
|
|
@staticmethod
|
|
def create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes):
|
|
# Order is important while hashing, hence the sorting.
|
|
val_buff = [str(upload_timestamp)]
|
|
for key in sorted(form_data):
|
|
val_buff.append(form_data[key])
|
|
|
|
# This hash serves as an unique identifier for the whole upload.
|
|
metahash = hashlib.md5((''.join(val_buff)).encode())
|
|
# Include file hashes to avoid metafile name collisions if they have the same form values,
|
|
# but different data files. Sort hashes first so upload order doesn't matter.
|
|
sorted_f_hashes = list(file_hashes.values())
|
|
sorted_f_hashes.sort()
|
|
metahash.update(''.join(sorted_f_hashes).encode())
|
|
metahash = metahash.hexdigest()
|
|
|
|
return metahash
|
|
|
|
@staticmethod
|
|
def create_file_hashes(files):
|
|
res = dict()
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
h = hashlib.md5(f.filename.encode())
|
|
h.update(f.stream.read())
|
|
res[f.filename] = h.hexdigest()
|
|
f.seek(0)
|
|
return res
|
|
|
|
@staticmethod
|
|
def store_model(model_obj):
|
|
try:
|
|
db.session.add(model_obj)
|
|
db.session.commit()
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
|
|
def store_datafiles(self, files, upload_metadata):
|
|
base = self.get_uploads_subdir('files')
|
|
file_hashes = upload_metadata['file_hashes_dict']
|
|
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
f_hash = file_hashes[f.filename]
|
|
|
|
# First byte used for indexing, similarly like git does for example.
|
|
sub_dir = base / f_hash[:2]
|
|
if not sub_dir.exists():
|
|
sub_dir.mkdir()
|
|
|
|
path = sub_dir / f_hash[2:]
|
|
if not path.exists():
|
|
path.mkdir()
|
|
f.save(path / f.filename)
|
|
|
|
|
|
def send_confirm_mail(self, upload_metadata, attach_contract_file=False):
|
|
upload_id = upload_metadata['upload_id']
|
|
|
|
message = MIMEMultipart()
|
|
message['From'] = self.config['MAIL_LOGIN']
|
|
message['To'] = upload_metadata['form_data']['email']
|
|
message['Subject'] = self.config['MAIL_SUBJECT'].format(upload_id=upload_id)
|
|
body = self.config['MAIL_BODY'].format(upload_id=upload_id)
|
|
message.attach(MIMEText(body, "plain"))
|
|
|
|
if attach_contract_file:
|
|
contracts_dir = self.contract_creator.base
|
|
f_name = upload_metadata['contract_file']
|
|
sub_dir = contracts_dir / Path(f_name[:2])
|
|
contract_file = sub_dir / Path(f_name[2:])
|
|
with open(contract_file, "rb") as f:
|
|
part = MIMEApplication(
|
|
f.read(),
|
|
Name = f_name
|
|
)
|
|
part['Content-Disposition'] = 'attachment; filename="%s"' % f_name
|
|
message.attach(part)
|
|
|
|
text = message.as_string()
|
|
|
|
# Create a secure SSL context
|
|
context = ssl.create_default_context()
|
|
|
|
# TODO: Implement timeout.
|
|
try:
|
|
with SMTP_SSL(self.config['MAIL_HOST'], self.config['SMTP_PORT'], context=context) as server:
|
|
server.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
|
|
server.sendmail(message['From'], message['To'], text)
|
|
|
|
# Save copy of sent mail in Sent mailbox
|
|
#imap = imaplib.IMAP4_SSL(self.config['MAIL_HOST'], self.config['IMAP_PORT'])
|
|
#imap.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
|
|
#imap.append('Sent', '\\Seen', imaplib.Time2Internaldate(time.time()), text.encode('utf8'))
|
|
#imap.logout()
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
def check_suffixes(self, files):
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
suffix = f.filename.split('.')[-1]
|
|
if self.ENABLED_FILETYPES and suffix.lower() not in self.ENABLED_FILETYPES:
|
|
return 'Datoteka "{}" ni pravilnega formata.'.format(f.filename)
|
|
return None
|
|
|
|
@staticmethod
|
|
def check_fname_lengths(files):
|
|
for key, f in files.items():
|
|
if key.startswith('file'):
|
|
if len(f.filename) > MAX_FNAME_LEN:
|
|
return 'Ime datoteke presega dolžino {} znakov.'.format(MAX_FNAME_LEN)
|
|
return None
|
|
|
|
def check_upload_request(self, request):
|
|
files = request.files
|
|
max_files = self.config['MAX_FILES_PER_UPLOAD']
|
|
if len(files) > max_files:
|
|
return 'Naložite lahko do {} datotek hkrati.'.format(max_files), 400
|
|
elif len(files) < 1:
|
|
return 'Priložena ni bila nobena datoteka.', 400
|
|
|
|
err = self.check_suffixes(files)
|
|
if err:
|
|
return err, 400
|
|
|
|
err = UploadHandler.check_fname_lengths(files)
|
|
if err:
|
|
return err, 400
|
|
|
|
return None
|
|
|
|
|
|
def get_user_institution(user_id):
|
|
mapping = UserInstitutionMapping.query.filter_by(user=user_id).first()
|
|
if mapping:
|
|
return Institution.query.filter_by(id=mapping.institution).first()
|
|
return None
|
|
|
|
|
|
def get_institution_contract(institution_id):
|
|
return InstitutionContract.query.filter_by(institution=institution_id).first()
|
|
|
|
|
|
def get_institution_cooperation_history(institution_id):
|
|
#return CooperationToken.query.join(UserCooperationTokenMapping,
|
|
# UserCooperationTokenMapping.cooperation_token == CooperationToken.id).filter(UserCooperationTokenMapping.user == user_id).all()
|
|
#
|
|
res = dict()
|
|
|
|
uch_rows = UserCooperationHistory.query.filter_by(institution=institution_id).order_by(UserCooperationHistory.school_year.desc()).all()
|
|
for row in uch_rows:
|
|
if row.user not in res:
|
|
res[row.user] = {
|
|
'coordinator': [],
|
|
'mentor': [],
|
|
'other': []
|
|
}
|
|
res[row.user][row.role].append((row.school_year, row.badge_text))
|
|
|
|
return res
|
|
|
|
|
|
def get_cooperation_history():
|
|
return UserCooperationHistory.query.all()
|
|
|
|
|
|
def add_cooperation_history_item(user_id, institution_id, role, school_year, badge_text):
|
|
model_obj = UserCooperationHistory(
|
|
user=user_id,
|
|
institution=institution_id,
|
|
role=role,
|
|
school_year=school_year,
|
|
badge_text=badge_text
|
|
)
|
|
db.session.add(model_obj)
|
|
db.session.commit()
|
|
return model_obj.id
|
|
|
|
def del_cooperation_history_item(entry_id):
|
|
db.session.query(UserCooperationHistory).filter_by(id=entry_id).delete()
|
|
db.session.commit()
|
|
|
|
def has_user_corpus_access(user_id, corpus_name):
|
|
user = RegisteredUser.query.filter_by(id=user_id).first()
|
|
|
|
# TODO: check if user even is active?
|
|
|
|
# Admins always have access to everything.
|
|
if user.role == 'admin':
|
|
return True
|
|
|
|
# Check if user belongs to an institution, that has access to this corpus.
|
|
institution = get_user_institution(user_id)
|
|
has_access = False
|
|
row = CorpusAccess.query.filter_by(institution=institution.id, corpus=corpus_name).first()
|
|
if row:
|
|
has_access = True
|
|
return has_access
|
|
|
|
|
|
def is_admin(user_id):
|
|
user = RegisteredUser.query.filter_by(id=user_id).first()
|
|
if user.role == 'admin':
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_user_obj(user_id):
|
|
return RegisteredUser.query.filter_by(id=user_id).first()
|
|
|
|
def get_user_obj_by_email(email):
|
|
return RegisteredUser.query.filter_by(email=email).first()
|
|
|
|
def get_institution_obj(institution_id):
|
|
return Institution.query.filter_by(id=institution_id).first()
|
|
|
|
|
|
def get_institution_obj_by_name(institution_name):
|
|
return Institution.query.filter_by(name=institution_name).first()
|
|
|
|
|
|
def register_new_user(name, email, password, active=True, admin=False):
|
|
model_obj = RegisteredUser(
|
|
name=name,
|
|
email=email,
|
|
role='admin' if admin else 'user',
|
|
pass_hash=generate_password_hash(password),
|
|
active=active,
|
|
registered=datetime.now()
|
|
)
|
|
db.session.add(model_obj)
|
|
db.session.commit()
|
|
return model_obj.id
|
|
|
|
|
|
def add_institution(name, region):
|
|
model_obj = Institution(
|
|
name=name,
|
|
region=region
|
|
)
|
|
db.session.add(model_obj)
|
|
db.session.commit()
|
|
return model_obj.id
|
|
|
|
|
|
def grant_institution_corpus_access(institution_id, corpus_name):
|
|
model_obj = CorpusAccess(
|
|
institution=institution_id,
|
|
corpus=corpus_name
|
|
)
|
|
db.session.add(model_obj)
|
|
db.session.commit()
|
|
return model_obj.id
|
|
|
|
|
|
def add_user_to_institution(user_id, institution_id, role):
|
|
model_obj = UserInstitutionMapping(
|
|
user=user_id,
|
|
institution=institution_id,
|
|
role=role
|
|
)
|
|
db.session.add(model_obj)
|
|
db.session.commit()
|
|
return model_obj.id
|
|
|
|
|
|
def activate_user(user_id):
|
|
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'active': True})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def update_user_password(user_id, new_password):
|
|
phash = generate_password_hash(new_password)
|
|
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'pass_hash': phash})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def update_user_role(user_id, role):
|
|
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'role': role})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def update_user_email(user_id, new_email):
|
|
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'email': new_email})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def update_user_name(user_id, new_name):
|
|
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'name': new_name})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def remove_user(user_id):
|
|
#db.session.query(UserCooperationHistory).filter(UserCooperationHistory.user == user_id).delete()
|
|
#db.session.query(UserInstitutionMapping).filter(UserInstitutionMapping.user == user_id).delete()
|
|
#db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).delete()
|
|
#db.session.commit()
|
|
db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).update({'is_removed': True})
|
|
db.session.commit()
|
|
|
|
def undo_remove_user(user_id):
|
|
db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).update({'is_removed': False})
|
|
db.session.commit()
|
|
|
|
def remove_institution(institution_id):
|
|
db.session.query(Institution).filter(Institution.id == institution_id).update({'is_removed': True})
|
|
db.session.commit()
|
|
|
|
|
|
def del_user_from_institution(user_id, institution_id):
|
|
db.session.query(UserInstitutionMapping).filter(UserInstitutionMapping.institution == institution_id).filter(UserInstitutionMapping.user == user_id).delete()
|
|
db.session.commit()
|
|
|
|
|
|
def get_all_active_users():
|
|
return RegisteredUser.query.filter_by(is_removed=False).filter_by(active=True).order_by(RegisteredUser.id).all()
|
|
|
|
|
|
def get_all_inactive_users():
|
|
return RegisteredUser.query.filter_by(is_removed=False).filter_by(active=False).order_by(RegisteredUser.id).all()
|
|
|
|
|
|
def get_all_users_join_institutions(active=True):
|
|
#return RegisteredUser.query.filter_by(active=True).order_by(RegisteredUser.id).all()
|
|
return db.session.query(RegisteredUser, UserInstitutionMapping).filter(RegisteredUser.is_removed==False).outerjoin(UserInstitutionMapping,
|
|
RegisteredUser.id == UserInstitutionMapping.user).filter(RegisteredUser.active == active).order_by(RegisteredUser.id).all()
|
|
|
|
|
|
|
|
def get_all_active_institution_users(institution_id):
|
|
return RegisteredUser.query.filter_by(is_removed=False).filter_by(active=True).join(UserInstitutionMapping,
|
|
RegisteredUser.id == UserInstitutionMapping.user).filter(UserInstitutionMapping.institution == institution_id).all()
|
|
|
|
|
|
def is_institution_coordinator(user_id, institution_id):
|
|
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).filter_by(institution=institution_id).first()
|
|
if not user_inst_mapping:
|
|
return False
|
|
if user_inst_mapping.role != 'coordinator':
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_institution_member(user_id, institution_id):
|
|
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).filter_by(institution=institution_id).first()
|
|
if not user_inst_mapping:
|
|
return False
|
|
return True
|
|
|
|
|
|
def get_actual_institution_contract_filename(f_hash):
|
|
contract = InstitutionContract.query.filter_by(file_contract=f_hash).first()
|
|
if contract:
|
|
return contract.original_filename
|
|
return None
|
|
|
|
|
|
def get_actual_studentparent_contract_filename(f_hash):
|
|
contract = ContractsSolar.query.filter_by(file_contract=f_hash).first()
|
|
if contract:
|
|
return contract.original_filename
|
|
return None
|
|
|
|
|
|
def get_password_reset_token(email, key, expires=600):
|
|
return jwt.encode({'reset_password': email,
|
|
'exp': int(time.time()) + expires},
|
|
key=key, algorithm='HS256')
|
|
|
|
|
|
def transfer_users_institution(institution_id_from, institution_id_to):
|
|
rowcount = db.session.query(UserInstitutionMapping).filter_by(institution=institution_id_from).update(
|
|
{'institution': institution_id_to})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def transfer_uploads_institution(institution_id_from, institution_id_to):
|
|
rowcount = db.session.query(UploadSolar).filter_by(institution=institution_id_from).update(
|
|
{'institution': institution_id_to})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def transfer_contracts_institution(institution_id_from, institution_id_to):
|
|
rowcount = db.session.query(ContractsSolar).filter_by(institution=institution_id_from).update(
|
|
{'institution': institution_id_to})
|
|
db.session.commit()
|
|
return rowcount
|
|
|
|
|
|
def verify_reset_token(token, key):
|
|
try:
|
|
message = jwt.decode(token,
|
|
key=key, algorithms=["HS256"])
|
|
email = message['reset_password']
|
|
exp = message['exp']
|
|
if int(time.time()) >= exp:
|
|
# Token timed out
|
|
return None
|
|
except Exception as e:
|
|
logging.error(e)
|
|
return
|
|
return RegisteredUser.query.filter_by(email=email).first()
|
|
|
|
|
|
def send_resetpass_mail(email, config):
|
|
jwt_token = get_password_reset_token(email, config['APP_SECRET_KEY'])
|
|
|
|
body = '''
|
|
Zahtevali ste ponastavitev gesla vašega uporabniškega računa.
|
|
|
|
Geslo lahko ponastavite na naslednji povezavi: https://zbiranje.slovenscina.eu/solar/resetpass/{}'''.format(jwt_token)
|
|
|
|
message = MIMEMultipart()
|
|
message['From'] = config['MAIL_LOGIN']
|
|
message['To'] = email
|
|
message['Subject'] = 'Ponastavitev gesla'
|
|
message.attach(MIMEText(body, "plain"))
|
|
text = message.as_string()
|
|
|
|
# Create a secure SSL context
|
|
context = ssl.create_default_context()
|
|
|
|
try:
|
|
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
|
|
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
|
|
server.sendmail(config['MAIL_LOGIN'], email, text)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
|
|
def send_admins_new_user_notification_mail(user_id, config):
|
|
user = RegisteredUser.query.filter_by(id=user_id).first()
|
|
body = '''
|
|
Nov uporabnik "{}" je ustvaril uporabniški račun na portalu za oddajanje besedil Šolar in čaka na odobritev.
|
|
'''.format(user.name)
|
|
|
|
message = MIMEMultipart()
|
|
message['From'] = config['MAIL_LOGIN']
|
|
message['To'] = email
|
|
message['Subject'] = 'Ponastavitev gesla'
|
|
message.attach(MIMEText(body, "plain"))
|
|
text = message.as_string()
|
|
|
|
admins = RegisteredUser.query.filter_by(role="admin").all()
|
|
|
|
# Create a secure SSL context
|
|
context = ssl.create_default_context()
|
|
|
|
for admin in admins:
|
|
try:
|
|
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
|
|
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
|
|
server.sendmail(config['MAIL_LOGIN'], admin.email, text)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
|
|
def send_user_activation_mail(user_id, config):
|
|
user = RegisteredUser.query.filter_by(id=user_id).first()
|
|
body = '''Vaš uporabniški račun "{}" na portalu Šolar je bil odobren.'''.format(user.name)
|
|
|
|
message = MIMEMultipart()
|
|
message['From'] = config['MAIL_LOGIN']
|
|
message['To'] = email
|
|
message['Subject'] = 'Ponastavitev gesla'
|
|
message.attach(MIMEText(body, "plain"))
|
|
text = message.as_string()
|
|
|
|
# Create a secure SSL context
|
|
context = ssl.create_default_context()
|
|
|
|
try:
|
|
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
|
|
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
|
|
server.sendmail(config['MAIL_LOGIN'], user.email, text)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|