portal-oddajanje-solar/portal/solar.py

953 lines
34 KiB
Python
Raw Normal View History

import logging
import re
2021-05-24 08:15:54 +00:00
import hashlib
2021-10-12 09:11:37 +00:00
import time
import traceback
import ssl
from datetime import datetime
from sqlalchemy import desc
2023-01-14 12:45:33 +00:00
from sqlalchemy import func
2021-10-12 09:11:37 +00:00
from pathlib import Path
from smtplib import SMTP_SSL
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import pdfkit
from jinja2 import Environment, FileSystemLoader
import jwt
from werkzeug.security import generate_password_hash
from . model import *
2023-01-14 12:45:33 +00:00
VALID_PROGRAMS = {'OS', 'SSG', 'MGP', 'ZG', 'NPI', 'SPI', 'SSI', 'PTI'}
2021-12-20 10:15:01 +00:00
VALID_SUBJECTS = {'SLO', 'DJP', 'DDP', 'DNP', 'DSP', 'DIP'}
VALID_TEXT_TYPES = {'E', 'PB', 'T', 'R'}
VALID_GRAMMAR_CORRECTIONS = {'DD', 'N', 'DN'}
VALID_REGIONS = {'CE', 'GO', 'KK', 'KP', 'KR', 'LJ', 'MB', 'MS', 'NM', 'PO', 'SG'}
2021-10-12 09:11:37 +00:00
#REGEX_EMAIL = re.compile('^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$')
REGEX_EMAIL = re.compile('^(?:[a-z0-9!#$%&\'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*|\"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])$')
MAX_FNAME_LEN = 100
MAXLEN_FORM = 150
2021-10-12 09:11:37 +00:00
class ContractCreator:
def __init__(self, base_path, template_path):
self.base = base_path
template_loader = FileSystemLoader(searchpath="./")
template_env = Environment(loader=template_loader)
self.template = template_env.get_template(template_path)
self.pdfkit_options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': "UTF-8",
'custom-header' : [
('Accept-Encoding', 'gzip')
]
}
def fill_template(self, **kwargs):
return self.template.render(**kwargs)
def create_pdf(self, f_name, fields_dict):
sub_dir = self.base / Path(f_name[:2])
if not sub_dir.exists():
sub_dir.mkdir()
out_f = sub_dir / Path(f_name[2:])
html_str = self.fill_template(**fields_dict)
pdfkit.from_string(html_str, out_f, options=self.pdfkit_options)
class UploadHandlerSolar():
2021-11-16 17:45:20 +00:00
ENABLED_FILETYPES = ['txt', 'csv', 'pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'xml', 'mxliff', 'tmx', 'jpg', 'jpeg', 'png']
2021-10-12 09:11:37 +00:00
def __init__(self, **kwargs):
self.config = kwargs
def set_contract_creator(self, contract_creator):
assert isinstance(contract_creator, ContractCreator)
self._contract_creator = contract_creator
def get_uploads_subdir(self, dir_name):
subdir = Path(self.config['UPLOADS_DIR']) / dir_name
if not subdir.exists():
subdir.mkdir(parents=True)
return subdir
@staticmethod
def create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes):
# Order is important while hashing, hence the sorting.
val_buff = [str(upload_timestamp)]
for key in sorted(form_data):
val_buff.append(form_data[key])
# This hash serves as an unique identifier for the whole upload.
metahash = hashlib.md5((''.join(val_buff)).encode())
# Include file hashes to avoid metafile name collisions if they have the same form values,
# but different data files. Sort hashes first so upload order doesn't matter.
sorted_f_hashes = list(file_hashes.values())
sorted_f_hashes.sort()
metahash.update(''.join(sorted_f_hashes).encode())
metahash = metahash.hexdigest()
return metahash
@staticmethod
def extract_upload_metadata(corpus_name, request):
upload_metadata = dict()
file_hashes = UploadHandlerSolar.create_file_hashes(request.files)
file_names = file_hashes.keys()
form_data = request.form.copy()
upload_timestamp = int(time.time())
upload_id = UploadHandlerSolar.create_upload_id(corpus_name, form_data, upload_timestamp, file_hashes)
# Strip form fieds.
for key, val in form_data.items():
form_data[key] = val.strip()
upload_metadata['corpus_name'] = corpus_name
upload_metadata['form_data'] = form_data
upload_metadata['upload_id'] = upload_id
upload_metadata['timestamp'] = upload_timestamp
upload_metadata['file_hashes_dict'] = file_hashes
upload_metadata['file_names'] = file_names
upload_metadata['contract_file'] = upload_id + '.pdf'
return upload_metadata
@staticmethod
def store_model(model_obj):
try:
db.session.add(model_obj)
db.session.commit()
except Exception:
traceback.print_exc()
2021-10-12 09:11:37 +00:00
@staticmethod
def create_file_hashes(files):
res = dict()
for key, f in files.items():
if key.startswith('file'):
h = hashlib.md5(f.filename.encode())
h.update(f.stream.read())
res[f.filename] = h.hexdigest()
f.seek(0)
return res
@staticmethod
def store_metadata(upload_metadata, user_id):
timestamp = datetime.fromtimestamp(upload_metadata['timestamp'])
form_data = upload_metadata['form_data']
file_hashes = upload_metadata['file_hashes_dict']
2021-12-14 14:36:36 +00:00
sorted_file_items = sorted(file_hashes.items(), key=lambda item: item[1])
2021-05-05 12:26:26 +00:00
institution_id = get_user_institution(user_id).id
2021-05-05 12:26:26 +00:00
2021-12-20 10:15:01 +00:00
region = form_data['regija']
program = form_data['program']
subject = form_data['predmet'],
subject_custom = form_data['predmet-custom'],
grade = form_data['letnik'],
text_type = form_data['vrsta'],
text_type_custom = form_data['vrsta-custom'],
school_year = form_data['solsko-leto'],
grammar_corrections = form_data['jezikovni-popravki'],
upload_file_codes = []
for i in range(len(sorted_file_items)):
file_code = '{}_{}_{}_{}_{}_{}_{}_{}'.format(
2022-01-06 11:27:10 +00:00
region,
program,
2021-12-20 10:15:01 +00:00
subject[0],
grade[0],
text_type[0],
school_year[0],
grammar_corrections[0],
i)
upload_file_codes.append(file_code)
2021-05-24 08:15:54 +00:00
model_obj = UploadSolar(
upload_user = user_id,
institution = institution_id,
upload_hash=upload_metadata['upload_id'],
timestamp=timestamp,
2021-12-20 10:15:01 +00:00
region=region,
program=program,
subject=subject,
subject_custom=subject_custom,
grade=grade,
text_type=text_type,
text_type_custom=text_type_custom,
school_year=school_year,
grammar_corrections=grammar_corrections,
2021-12-14 14:36:36 +00:00
upload_file_hashes=[x[1] for x in sorted_file_items],
upload_file_names=[x[0] for x in sorted_file_items],
2021-12-20 10:15:01 +00:00
upload_file_codes=upload_file_codes,
2021-05-24 08:15:54 +00:00
)
2021-10-12 09:11:37 +00:00
UploadHandlerSolar.store_model(model_obj)
def store_datafiles(self, files, upload_metadata):
base = self.get_uploads_subdir('files')
file_hashes = upload_metadata['file_hashes_dict']
for key, f in files.items():
if key.startswith('file'):
f_hash = file_hashes[f.filename]
# First byte used for indexing, similarly like git does for example.
sub_dir = base / f_hash[:2]
if not sub_dir.exists():
sub_dir.mkdir()
path = sub_dir / f_hash[2:]
if not path.exists():
path.mkdir()
f.save(path / f.filename)
2021-05-05 12:26:26 +00:00
def handle_upload(self, request, user_id):
2021-05-24 08:15:54 +00:00
err = self.check_upload_request(request)
if err:
return err, 400
2021-05-05 12:26:26 +00:00
err = self.check_form(request.form)
if err:
return err, 400
2021-05-05 12:26:26 +00:00
# Parse request.
upload_metadata = self.extract_upload_metadata('solar', request)
2021-05-05 12:26:26 +00:00
logging.info('Upload from user "{}" with upload id "{}" supplied form data: {}'.format(
user_id,
upload_metadata['upload_id'],
str(upload_metadata['form_data']
)))
2021-05-05 12:26:26 +00:00
# Store uploaded files to disk.
self.store_datafiles(request.files, upload_metadata)
2021-05-05 12:26:26 +00:00
# Store to database.
self.store_metadata(upload_metadata, user_id)
2021-05-05 12:26:26 +00:00
2021-12-15 20:23:26 +00:00
return 'Število uspešno oddanih datotek: {}. Podatki o oddaji so na voljo v zavihku Zgodovina sodelovanja.'.format(len(request.files))
2021-05-05 12:26:26 +00:00
2021-05-24 08:15:54 +00:00
def handle_contract_upload(self, request, user_id):
2021-06-08 06:00:18 +00:00
contract_type = request.form['tip-pogodbe']
2021-05-24 08:15:54 +00:00
2021-06-08 06:00:18 +00:00
if contract_type not in ['sola', 'ucenci-starsi']:
2021-05-24 08:15:54 +00:00
return 'Neveljaven tip pogodbe.'
2021-09-16 15:25:20 +00:00
#for key, f in request.files.items():
for f in request.files.getlist("file[]"):
mimetype = f.content_type
if mimetype != 'application/pdf':
return 'Datoteka "{}" ni formata PDF.'.format(f.filename)
if not f:
return 'Niste naložili nobene datoteke.'
base = self.get_uploads_subdir('contracts')
f_hash = hashlib.md5(f.read()).hexdigest()
f.seek(0, 0)
# First byte used for indexing, similarly like git does for example.
sub_dir = base / f_hash[:2]
if not sub_dir.exists():
sub_dir.mkdir()
path = sub_dir / (f_hash[2:] + '.pdf')
f.save(path)
timestamp = datetime.now()
user_institution_mapping = UserInstitutionMapping.query.filter_by(user=user_id).first()
if user_institution_mapping is None:
return 'Vaš uporabnik ni dodeljen nobeni inštituciji.'
institution_id = user_institution_mapping.institution
is_institution_coordinator = True if user_institution_mapping.role == 'coordinator' else False
if contract_type == 'sola':
if not is_institution_coordinator:
return 'Vaš uporabnik nima pravic za nalaganje pogodbe s šolo.'
# TODO: insert institution contract
model_obj = InstitutionContract(
institution=institution_id,
corpus='solar',
timestamp=timestamp,
file_contract=f_hash,
original_filename=f.filename
)
self.store_model(model_obj)
else:
model_obj = ContractsSolar(
institution=institution_id,
upload_user=user_id,
timestamp=timestamp,
file_contract=f_hash,
contract_type=contract_type,
original_filename=f.filename
)
self.store_model(model_obj)
2021-05-24 08:15:54 +00:00
return 'Nalaganje pogodbe je bilo uspešno.'
@staticmethod
def check_form(form):
2021-12-20 10:15:01 +00:00
region = form['regija']
program = form['program']
predmet = form['predmet']
letnik = int(form['letnik'])
vrsta = form['vrsta']
solsko_leto = form['solsko-leto']
jezikovni_popravki = form['jezikovni-popravki']
2021-05-05 12:26:26 +00:00
2021-12-20 10:15:01 +00:00
if region not in VALID_REGIONS:
return 'Invalid region "{}"'.format(region)
if program not in VALID_PROGRAMS:
return 'Invalid program "{}"'.format(program)
if predmet not in VALID_SUBJECTS:
return 'Invalid subject "{}"'.format(predmet)
if letnik < 1 or letnik > 9:
return 'Invalid grade: {}'.format(letnik)
if vrsta not in VALID_TEXT_TYPES:
return 'Invalid text type "{}"'.format(vrsta)
if not re.match('^\d{0,2}-\d{0,2}$', solsko_leto):
return 'Invalid school year "{}"'.format(solsko_leto)
if jezikovni_popravki not in VALID_GRAMMAR_CORRECTIONS:
return 'Invalid text type "{}"'.format(jezikovni_popravki)
2021-05-05 12:26:26 +00:00
for key, val in form.items():
if len(val) > MAXLEN_FORM:
return 'Value in form field "{}" exceeds max len of {}'.format(key, MAXLEN_FORM)
2021-05-24 08:15:54 +00:00
2021-10-12 09:11:37 +00:00
def send_confirm_mail(self, upload_metadata, attach_contract_file=False):
upload_id = upload_metadata['upload_id']
message = MIMEMultipart()
message['From'] = self.config['MAIL_LOGIN']
message['To'] = upload_metadata['form_data']['email']
message['Subject'] = self.config['MAIL_SUBJECT'].format(upload_id=upload_id)
body = self.config['MAIL_BODY'].format(upload_id=upload_id)
message.attach(MIMEText(body, "plain"))
if attach_contract_file:
contracts_dir = self.contract_creator.base
f_name = upload_metadata['contract_file']
sub_dir = contracts_dir / Path(f_name[:2])
contract_file = sub_dir / Path(f_name[2:])
with open(contract_file, "rb") as f:
part = MIMEApplication(
f.read(),
Name = f_name
)
part['Content-Disposition'] = 'attachment; filename="%s"' % f_name
message.attach(part)
text = message.as_string()
# Create a secure SSL context
context = ssl.create_default_context()
# TODO: Implement timeout.
try:
with SMTP_SSL(self.config['MAIL_HOST'], self.config['SMTP_PORT'], context=context) as server:
server.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
server.sendmail(message['From'], message['To'], text)
# Save copy of sent mail in Sent mailbox
#imap = imaplib.IMAP4_SSL(self.config['MAIL_HOST'], self.config['IMAP_PORT'])
#imap.login(self.config['MAIL_LOGIN'], self.config['MAIL_PASS'])
#imap.append('Sent', '\\Seen', imaplib.Time2Internaldate(time.time()), text.encode('utf8'))
#imap.logout()
except Exception:
traceback.print_exc()
def check_suffixes(self, files):
for key, f in files.items():
if key.startswith('file'):
suffix = f.filename.split('.')[-1]
if self.ENABLED_FILETYPES and suffix.lower() not in self.ENABLED_FILETYPES:
return 'Datoteka "{}" ni pravilnega formata.'.format(f.filename)
return None
@staticmethod
def check_fname_lengths(files):
for key, f in files.items():
if key.startswith('file'):
if len(f.filename) > MAX_FNAME_LEN:
return 'Ime datoteke presega dolžino {} znakov.'.format(MAX_FNAME_LEN)
return None
def check_upload_request(self, request):
files = request.files
max_files = self.config['MAX_FILES_PER_UPLOAD']
if len(files) > max_files:
return 'Naložite lahko do {} datotek hkrati.'.format(max_files), 400
elif len(files) < 1:
return 'Priložena ni bila nobena datoteka.', 400
err = self.check_suffixes(files)
if err:
return err, 400
err = UploadHandlerSolar.check_fname_lengths(files)
if err:
return err, 400
return None
2021-05-24 08:15:54 +00:00
def get_upload_history(user_id, n=20):
if n == -1:
return UploadSolar.query.filter_by(upload_user=user_id).order_by(desc(UploadSolar.timestamp)).all()
2021-05-24 08:15:54 +00:00
return UploadSolar.query.filter_by(upload_user=user_id).order_by(desc(UploadSolar.timestamp)).limit(n).all()
2021-09-16 15:25:20 +00:00
def get_institution_upload_history(institution_id, n=20):
return UploadSolar.query.filter_by(institution=institution_id).order_by(desc(UploadSolar.timestamp)).limit(n).all()
2021-12-14 14:36:36 +00:00
def get_upload_object(upload_id):
return UploadSolar.query.filter_by(id=upload_id).first()
def get_all_upload_history(n=20):
if n == -1:
return UploadSolar.query.order_by(desc(UploadSolar.timestamp)).all()
return UploadSolar.query.order_by(desc(UploadSolar.timestamp)).limit(n).all()
2021-09-16 15:25:20 +00:00
2021-06-08 06:00:18 +00:00
def get_all_institutions():
# TODO: do filtering purely within an SQL query
res = []
2021-09-16 15:25:20 +00:00
for institution in Institution.query.filter_by(is_removed=False).all():
2021-06-08 06:00:18 +00:00
row = CorpusAccess.query.filter_by(institution=institution.id, corpus='solar').first()
if row:
res.append(institution)
return res
def get_institution_student_contracts(institution_id, user_id=None):
if not user_id:
return ContractsSolar.query.filter_by(institution=institution_id, contract_type='ucenci-starsi').all()
return ContractsSolar.query.filter_by(institution=institution_id, contract_type='ucenci-starsi', upload_user=user_id).all()
2021-05-24 08:15:54 +00:00
def get_institution_contract(institution_id):
2021-06-08 06:00:18 +00:00
return InstitutionContract.query.filter_by(institution=institution_id, corpus='solar').order_by(desc(InstitutionContract.timestamp)).first()
2021-05-24 08:15:54 +00:00
def get_top_uploading_institutions():
res = dict()
institutions = get_all_institutions()
for institution in institutions:
uploads = UploadSolar.query.filter_by(institution=institution.id).all()
for upload in uploads:
if institution.name not in res:
res[institution.name] = 0
res[institution.name] += len(upload.upload_file_hashes)
if len(res) >= 5:
return dict(sorted(res.items(), key=lambda x:x[1], reverse=True)[:5])
return dict(sorted(res.items(), key=lambda x:x[1], reverse=True))
def get_top_uploading_users(institution_id):
res = dict()
users = get_all_active_institution_users(institution_id)
for user in users:
uploads = UploadSolar.query.filter_by(upload_user=user.id).all()
for upload in uploads:
if user.name not in res:
res[user.name] = 0
res[user.name] += len(upload.upload_file_hashes)
if len(res) >= 5:
return dict(sorted(res.items(), key=lambda x:x[1], reverse=True)[:5])
return dict(sorted(res.items(), key=lambda x:x[1], reverse=True))
def get_institution_upload_stats(institution_id):
res = {
'region': [],
'program': [],
}
# Region
for region in VALID_REGIONS:
count = UploadSolar.query.filter_by(institution=institution_id, region=region).count()
res['region'].append((region, count))
res['region'] = sorted(res['region'], key=lambda x:x[1], reverse=True)
# Program
for program in VALID_PROGRAMS:
count = UploadSolar.query.filter_by(institution=institution_id, program=program).count()
res['program'].append((program, count))
res['program'] = sorted(res['program'], key=lambda x:x[1], reverse=True)
return res
2023-01-14 12:45:33 +00:00
def get_region_stats():
ret = {'CE': [0,0], 'GO': [0,0], 'KK': [0,0], 'KP': [0,0], 'KR': [0,0], 'LJ': [0,0], 'MB': [0,0], 'MS': [0,0], 'NM': [0,0], 'PO': [0,0], 'SG': [0,0]}
os = db.session.query(UploadSolar.region, func.count(UploadSolar.id)).filter_by(program="OS").group_by(UploadSolar.region).all()
neos = db.session.query(UploadSolar.region, func.count(UploadSolar.id)).filter(sqlalchemy.not_(UploadSolar.program.contains("OS"))).group_by(UploadSolar.region).all()
#logging.error(os)
#logging.error(neos)
2023-01-14 13:02:11 +00:00
for key, val in os:
if key not in VALID_REGIONS:
continue
2023-01-14 12:45:33 +00:00
ret[key][0] = val
2023-01-14 13:02:11 +00:00
for key, val in neos:
if key not in VALID_REGIONS:
continue
2023-01-14 12:45:33 +00:00
ret[key][1] = val
logging.error(ret)
return ret
2021-06-08 06:00:18 +00:00
def get_all_active_users():
# TODO: do filtering purely within an SQL query
res = []
active_users = RegisteredUser.query.filter_by(active=True).all()
for user in active_users:
if has_user_corpus_access(user.id, 'solar'):
res.append(user)
return res
2021-05-24 08:15:54 +00:00
2021-12-20 10:15:01 +00:00
def update_upload_item(item_id, region, program, subject, subject_custom, grade, text_type, text_type_custom, school_year, grammar_corrections):
rowcount = db.session.query(UploadSolar).filter_by(id=item_id).update({
2021-12-20 10:15:01 +00:00
'region': region,
'program': program,
'subject': subject,
'subject_custom': subject_custom,
'grade': grade,
'text_type': text_type,
'text_type_custom': text_type_custom,
'school_year': school_year,
'grammar_corrections': grammar_corrections
})
db.session.commit()
return rowcount
2021-10-12 09:11:37 +00:00
def get_user_institution(user_id):
mapping = UserInstitutionMapping.query.filter_by(user=user_id).first()
if mapping:
return Institution.query.filter_by(id=mapping.institution).first()
return None
def get_institution_contract(institution_id):
return InstitutionContract.query.filter_by(institution=institution_id).first()
def get_institution_cooperation_history(institution_id):
items = db.session.query(UserCooperationHistory.role,
UserCooperationHistory.timestamp,
RegisteredUser.id,
RegisteredUser.name
).select_from(
UserCooperationHistory,
).join(
RegisteredUser,
UserCooperationHistory.user == RegisteredUser.id,
).filter(
UserCooperationHistory.institution == institution_id,
).order_by(UserCooperationHistory.timestamp.desc()).all()
res = []
prev_schoolyear = None
item_buff = []
users_seen = set()
for item in items:
timestamp = item.timestamp
year = timestamp.year
month = timestamp.month
if month >= 9 :
school_year = '{}/{}'.format(year, str(year+1)[-2:])
else:
school_year = '{}/{}'.format(year-1, str(year)[-2:])
if school_year != prev_schoolyear:
if len(item_buff) > 0:
res.append((prev_schoolyear, item_buff))
prev_schoolyear = school_year
item_buff = []
users_seen = set()
if not item.id in users_seen:
users_seen.add(item.id)
item_buff.append(item)
if len(item_buff) > 0:
res.append((prev_schoolyear, item_buff))
return res
2021-10-12 09:11:37 +00:00
def get_cooperation_history():
return UserCooperationHistory.query.all()
def add_cooperation_history_item(user_id, institution_id, role):
2021-10-12 09:11:37 +00:00
model_obj = UserCooperationHistory(
user=user_id,
institution=institution_id,
role=role,
timestamp=datetime.now()
2021-10-12 09:11:37 +00:00
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
#def del_cooperation_history_item(entry_id):
# db.session.query(UserCooperationHistory).filter_by(id=entry_id).delete()
# db.session.commit()
2021-10-12 09:11:37 +00:00
def has_user_corpus_access(user_id, corpus_name):
user = RegisteredUser.query.filter_by(id=user_id).first()
# TODO: check if user even is active?
# Admins always have access to everything.
if user.role == 'admin':
return True
# Check if user belongs to an institution, that has access to this corpus.
institution = get_user_institution(user_id)
has_access = False
row = CorpusAccess.query.filter_by(institution=institution.id, corpus=corpus_name).first()
if row:
has_access = True
return has_access
def is_admin(user_id):
user = RegisteredUser.query.filter_by(id=user_id).first()
if user.role == 'admin':
return True
return False
def get_user_obj(user_id):
return RegisteredUser.query.filter_by(id=user_id).first()
def get_user_obj_by_email(email):
return RegisteredUser.query.filter_by(email=email).first()
def get_institution_obj(institution_id):
return Institution.query.filter_by(id=institution_id).first()
def get_institution_obj_by_name(institution_name):
return Institution.query.filter_by(name=institution_name).first()
def register_new_user(name, email, password, active=True, admin=False):
model_obj = RegisteredUser(
name=name,
email=email,
role='admin' if admin else 'user',
pass_hash=generate_password_hash(password),
active=active,
registered=datetime.now()
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def add_institution(name, region):
model_obj = Institution(
name=name,
region=region
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def grant_institution_corpus_access(institution_id, corpus_name):
model_obj = CorpusAccess(
institution=institution_id,
corpus=corpus_name
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def add_user_to_institution(user_id, institution_id, role):
model_obj = UserInstitutionMapping(
user=user_id,
institution=institution_id,
role=role
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def activate_user(user_id):
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'active': True})
db.session.commit()
return rowcount
def update_user_password(user_id, new_password):
phash = generate_password_hash(new_password)
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'pass_hash': phash})
db.session.commit()
return rowcount
def update_user_role(user_id, role):
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'role': role})
db.session.commit()
return rowcount
def update_user_institution_role(user_id, institution_id, role):
rowcount = db.session.query(UserInstitutionMapping).filter_by(user=user_id, institution=institution_id).update({'role': role})
db.session.commit()
return rowcount
2021-10-12 09:11:37 +00:00
def update_user_email(user_id, new_email):
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'email': new_email})
db.session.commit()
return rowcount
def update_user_name(user_id, new_name):
rowcount = db.session.query(RegisteredUser).filter_by(id=user_id).update({'name': new_name})
db.session.commit()
return rowcount
def update_institution_data(institution_id, new_name, new_region):
rowcount = db.session.query(Institution).filter_by(id=institution_id).update({'name': new_name, 'region': new_region})
db.session.commit()
return rowcount
def remove_user(user_id):
db.session.query(UserCooperationHistory).filter(UserCooperationHistory.user == user_id).delete()
db.session.query(UserInstitutionMapping).filter(UserInstitutionMapping.user == user_id).delete()
db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).delete()
db.session.commit()
#db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).update({'is_removed': True})
#db.session.commit()
#def undo_remove_user(user_id):
# db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).update({'is_removed': False})
# db.session.commit()
def remove_institution(institution_id):
db.session.query(CorpusAccess).filter(CorpusAccess.institution == institution_id).delete()
db.session.query(Institution).filter(Institution.id == institution_id).delete()
db.session.commit()
#db.session.query(Institution).filter(Institution.id == institution_id).update({'is_removed': True})
#db.session.commit()
def del_user_from_institution(user_id, institution_id):
db.session.query(UserInstitutionMapping).filter(UserInstitutionMapping.institution == institution_id).filter(UserInstitutionMapping.user == user_id).delete()
db.session.commit()
def get_user_institution_role_str(user_id, institution_id):
res = UserInstitutionMapping.query.filter_by(
user=user_id
).filter_by(
institution=institution_id
).first()
if not res:
return ''
role_str_map = {
'coordinator': 'Koordinator/-ka',
'mentor': 'Mentor/-ica',
'other': 'Druga vloga'
}
return role_str_map[res.role]
2021-10-12 09:11:37 +00:00
def get_all_active_users():
return RegisteredUser.query.filter_by(active=True).order_by(RegisteredUser.id).all()
def get_all_inactive_users():
return RegisteredUser.query.filter_by(active=False).order_by(RegisteredUser.id).all()
def get_all_users_join_institutions(active=True):
#return RegisteredUser.query.filter_by(active=True).order_by(RegisteredUser.id).all()
return db.session.query(RegisteredUser, UserInstitutionMapping).outerjoin(UserInstitutionMapping,
RegisteredUser.id == UserInstitutionMapping.user).filter(RegisteredUser.active == active).order_by(RegisteredUser.id).all()
def get_all_active_institution_users(institution_id):
return RegisteredUser.query.filter_by(active=True).join(UserInstitutionMapping,
RegisteredUser.id == UserInstitutionMapping.user).filter(UserInstitutionMapping.institution == institution_id).all()
def is_institution_coordinator(user_id, institution_id):
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).filter_by(institution=institution_id).first()
if not user_inst_mapping:
return False
if user_inst_mapping.role != 'coordinator':
return False
return True
def is_institution_member(user_id, institution_id):
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).filter_by(institution=institution_id).first()
if not user_inst_mapping:
return False
return True
def get_actual_institution_contract_filename(f_hash):
contract = InstitutionContract.query.filter_by(file_contract=f_hash).first()
if contract:
return contract.original_filename
return None
def get_actual_studentparent_contract_filename(f_hash):
contract = ContractsSolar.query.filter_by(file_contract=f_hash).first()
if contract:
return contract.original_filename
return None
def get_password_reset_token(email, key, expires=600):
2023-01-14 12:45:33 +00:00
token = jwt.encode({'reset_password': email,
2021-10-12 09:11:37 +00:00
'exp': int(time.time()) + expires},
key=key, algorithm='HS256')
2023-01-14 12:45:33 +00:00
logging.error(token)
return token
2021-10-12 09:11:37 +00:00
def transfer_users_institution(institution_id_from, institution_id_to):
rowcount = db.session.query(UserInstitutionMapping).filter_by(institution=institution_id_from).update(
{'institution': institution_id_to})
db.session.commit()
return rowcount
def transfer_uploads_institution(institution_id_from, institution_id_to):
rowcount = db.session.query(UploadSolar).filter_by(institution=institution_id_from).update(
{'institution': institution_id_to})
db.session.commit()
return rowcount
def transfer_contracts_institution(institution_id_from, institution_id_to):
rowcount = db.session.query(ContractsSolar).filter_by(institution=institution_id_from).update(
{'institution': institution_id_to})
db.session.commit()
return rowcount
def verify_reset_token(token, key):
try:
message = jwt.decode(token,
key=key, algorithms=["HS256"])
email = message['reset_password']
exp = message['exp']
if int(time.time()) >= exp:
# Token timed out
return None
except Exception as e:
logging.error(e)
return
return RegisteredUser.query.filter_by(email=email).first()
def send_resetpass_mail(email, config):
jwt_token = get_password_reset_token(email, config['APP_SECRET_KEY'])
body = '''
Zahtevali ste ponastavitev gesla vašega uporabniškega računa.
Geslo lahko ponastavite na naslednji povezavi: https://{}/resetpass/{}'''.format(config['SERVER_NAME'], jwt_token)
2021-10-12 09:11:37 +00:00
message = MIMEMultipart()
message['From'] = config['MAIL_LOGIN']
message['To'] = email
2023-01-14 12:45:33 +00:00
message['Subject'] = 'Portal Šolar: Ponastavitev gesla'
2021-10-12 09:11:37 +00:00
message.attach(MIMEText(body, "plain"))
text = message.as_string()
# Create a secure SSL context
context = ssl.create_default_context()
try:
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
server.sendmail(config['MAIL_LOGIN'], email, text)
except Exception:
traceback.print_exc()
def send_admins_new_user_notification_mail(user_id, config):
user = RegisteredUser.query.filter_by(id=user_id).first()
body = '''
Nov uporabnik "{}" je ustvaril uporabniški račun na portalu za oddajanje besedil Šolar in čaka na odobritev.
'''.format(user.name)
admins = RegisteredUser.query.filter_by(role="admin").all()
# Create a secure SSL context
context = ssl.create_default_context()
for admin in admins:
message = MIMEMultipart()
message['From'] = config['MAIL_LOGIN']
message['To'] = admin.email
message['Subject'] = 'Nova registracija'
message.attach(MIMEText(body, "plain"))
text = message.as_string()
try:
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
server.sendmail(config['MAIL_LOGIN'], admin.email, text)
except Exception:
traceback.print_exc()
def send_user_activation_mail(user_id, config):
user = RegisteredUser.query.filter_by(id=user_id).first()
body = '''Vaš uporabniški račun "{}" na portalu Šolar je bil odobren.'''.format(user.name)
message = MIMEMultipart()
message['From'] = config['MAIL_LOGIN']
message['To'] = user.email
2023-01-14 12:45:33 +00:00
message['Subject'] = 'Portal Šolar: Vaš uporabniški račun je odobren'
2021-10-12 09:11:37 +00:00
message.attach(MIMEText(body, "plain"))
text = message.as_string()
# Create a secure SSL context
context = ssl.create_default_context()
try:
with SMTP_SSL(config['MAIL_HOST'], config['SMTP_PORT'], context=context) as server:
server.login(config['MAIL_LOGIN'], config['MAIL_PASS'])
server.sendmail(config['MAIL_LOGIN'], user.email, text)
except Exception:
traceback.print_exc()
2021-12-20 10:15:01 +00:00