solar update

This commit is contained in:
msinkec
2021-06-08 08:00:18 +02:00
parent 31ce97cb44
commit 6697c60c7f
11 changed files with 554 additions and 84 deletions

View File

@@ -3,6 +3,7 @@ import time
import ssl
import traceback
import re
import logging
from pathlib import Path
from datetime import datetime
@@ -19,7 +20,9 @@ from email.mime.application import MIMEApplication
import pdfkit
from jinja2 import Environment, FileSystemLoader
from . model import db, UploadRegular, UploadSolar, RegisteredUser, CorpusAccess, Institution
from werkzeug.security import generate_password_hash
from . model import db, UploadRegular, UploadSolar, RegisteredUser, UserInstitutionMapping, Institution, InstitutionContract, CorpusAccess
#REGEX_EMAIL = re.compile('^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$')
@@ -131,7 +134,7 @@ class UploadHandler:
return res
@staticmethod
def store_model(self, model_obj):
def store_model(model_obj):
try:
db.session.add(model_obj)
db.session.commit()
@@ -234,20 +237,100 @@ class UploadHandler:
return None
@staticmethod
def get_user_institution(user_id):
match = db.session.query(RegisteredUser).filter(RegisteredUser.id == user_id).one()
return match.institution
def get_user_institutions(user_id):
return UserInstitutionMapping.query.filter_by(user=user_id).all()
def has_user_corpus_access(user_id, corpus_name):
user = RegisteredUser.query.filter_by(id=user_id).first()
# TODO: check if user even is active?
# Admins always have access to everything.
if user.role == 'admin':
return True
return CorpusAccess.query.filter_by(user_id=user.id, corpus=corpus_name).first() is not None
# Check if user belongs to an institution, that has access to this corpus.
institutions = get_user_institutions(user_id)
has_access = False
for institution in institutions:
row = CorpusAccess.query.filter_by(institution=institution.id, corpus=corpus_name).first()
if row:
has_access = True
break
return has_access
def is_admin(user_id):
user = RegisteredUser.query.filter_by(id=user_id).first()
if user.role == 'admin':
return True
return False
def get_user_obj(user_id):
return RegisteredUser.query.filter_by(id=user_id).first()
def get_institution_obj(institution_id):
return Institution.query.filter_by(id=institution_id).first()
def register_new_user(name, email, password, active=True, admin=False):
model_obj = RegisteredUser(
name=name,
email=email,
role='admin' if admin else 'user',
pass_hash=generate_password_hash(password),
active=active,
registered=datetime.now()
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def add_institution(name, region):
model_obj = Institution(
name=name,
region=region
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def grant_institution_corpus_access(institution_id, corpus_name):
model_obj = CorpusAccess(
institution=institution_id,
corpus=corpus_name
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def add_user_to_institution(user_id, institution_id, role):
model_obj = UserInstitutionMapping(
user=user_id,
institution=institution_id,
role=role
)
db.session.add(model_obj)
db.session.commit()
return model_obj.id
def get_all_active_users():
return RegisteredUser.query.filter_by(active=True).all()
def is_institution_moderator(user_id, institution_id):
user_inst_mapping = UserInstitutionMapping.query.filter_by(user=user_id).first()
if not user_inst_mapping:
return False
if user_inst_mapping.role != 'moderator':
return False
return True

View File

@@ -83,31 +83,45 @@ class ContractsSolar(db.Model):
contract_type = db.Column(db.String, nullable=False)
class CorpusAccess(db.Model):
__tablename__ = 'corpus_access'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, sqlalchemy.ForeignKey('registered_user.id'), nullable=False)
corpus = db.Column(db.String, nullable=False)
class RegisteredUser(UserMixin, db.Model):
__tablename__ = 'registered_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=False)
email = db.Column(db.String, nullable=False)
email = db.Column(db.String, nullable=False, unique=True)
role = db.Column(db.String, nullable=False)
pass_hash = db.Column(db.String, nullable=False)
active = db.Column(db.Boolean, nullable=True)
last_login = db.Column(db.DateTime, nullable=True)
registered = db.Column(db.DateTime, nullable=True)
institution = db.Column(db.Integer, sqlalchemy.ForeignKey('institution.id'), nullable=True)
institution_moderator = db.Column(db.Boolean, default=False)
class UserInstitutionMapping(db.Model):
__tablename__ = 'user_institution_mapping'
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.Integer, sqlalchemy.ForeignKey('registered_user.id'), nullable=False)
institution = db.Column(db.Integer, sqlalchemy.ForeignKey('institution.id'), nullable=False)
role =db.Column(db.String, nullable=False)
class Institution(db.Model):
__tablename__ = 'institution'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=False)
name = db.Column(db.String, nullable=False, unique=True)
region = db.Column(db.String, nullable=False)
class CorpusAccess(db.Model):
__tablename__ = 'corpus_access'
id = db.Column(db.Integer, primary_key=True)
institution = db.Column(db.Integer, sqlalchemy.ForeignKey('institution.id'), nullable=False)
corpus = db.Column(db.String, nullable=False)
class InstitutionContract(db.Model):
__tablename__ = 'institution_contract'
id = db.Column(db.Integer, primary_key=True)
institution = db.Column(db.Integer, sqlalchemy.ForeignKey('institution.id'), nullable=False)
corpus = db.Column(db.String, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
file_contract = db.Column(db.String, nullable=True)

View File

@@ -3,10 +3,10 @@ import re
import traceback
import hashlib
from datetime import datetime
from sqlalchemy import desc
from sqlalchemy import desc, exists
from portal.base import UploadHandler
from portal.model import db, UploadSolar, ContractsSolar, RegisteredUser, Institution
from portal.base import UploadHandler, get_user_institutions, has_user_corpus_access
from portal.model import db, UploadSolar, ContractsSolar, RegisteredUser, Institution, InstitutionContract, UserInstitutionMapping, CorpusAccess
VALID_PROGRAMS = {'OS', 'SSG', 'MGP', 'ZG', 'NPI', 'SPI', 'SSI', 'PTI'}
@@ -27,7 +27,8 @@ class UploadHandlerSolar(UploadHandler):
sorted_f_hashes = list(file_hashes.values())
sorted_f_hashes.sort()
institution_id = UploadHandler.get_user_institution(user_id)
# If user is mapped to multiple institutions, let him chose in name of which one he makes the upload.
institution_id = get_user_institutions(user_id)[0].id
model_obj = UploadSolar(
upload_user = user_id,
@@ -44,7 +45,7 @@ class UploadHandlerSolar(UploadHandler):
grammar_corrections=form_data['jezikovni-popravki'],
upload_file_hashes=sorted_f_hashes
)
self.store_model(model_obj)
UploadHandler.store_model(model_obj)
def handle_upload(self, request, user_id):
err = self.check_upload_request(request)
@@ -73,9 +74,9 @@ class UploadHandlerSolar(UploadHandler):
return 'Uspešno ste oddali datotek(e). Št. datotek: {}'.format(len(request.files))
def handle_contract_upload(self, request, user_id):
contracts_type = request.form['tip-pogodbe']
contract_type = request.form['tip-pogodbe']
if contracts_type not in ['sola', 'ucenci-starsi']:
if contract_type not in ['sola', 'ucenci-starsi']:
return 'Neveljaven tip pogodbe.'
f_obj = None
@@ -92,35 +93,46 @@ class UploadHandlerSolar(UploadHandler):
return 'Niste naložili nobene datoteke.'
base = self.get_uploads_subdir('contracts')
f_hash = hashlib.md5(f_obj.read())
f_hash = hashlib.md5(f_obj.read()).hexdigest()
f_obj.seek(0, 0)
# First byte used for indexing, similarly like git does for example.
sub_dir = base / f_hash[:2]
if not sub_dir.exists():
sub_dir.mkdir()
path = sub_dir / f_hash[2:] + '.pdf'
path = sub_dir / (f_hash[2:] + '.pdf')
f_obj.save(path)
timestamp = datetime.now()
user_obj = RegisteredUser.query.filter_by(id=user_id).one()
institution_id = user_obj.institution
is_institution_moderator = user_obj.institution_moderator
if institution_id is None:
user_institution_mapping = UserInstitutionMapping.query.filter_by(user=user_id).first()
if user_institution_mapping is None:
return 'Vaš uporabnik ni dodeljen nobeni inštituciji.'
institution_id = user_institution_mapping.institution
is_institution_moderator = True if user_institution_mapping.role == 'moderator' else False
if contracts_type == 'sola':
if contract_type == 'sola':
if not is_institution_moderator:
return 'Vaš uporabnik nima pravic za nalaganje pogodbe s šolo.'
Institution.update().values(file_contract=f_hash).where(id=institution_id)
# TODO: insert institution contract
model_obj = InstitutionContract(
institution=institution_id,
corpus='solar',
timestamp=timestamp,
file_contract=f_hash
)
self.store_model(model_obj)
else:
model_obj = ContractsSolar(
institution=institution_id,
upload_user=user_id,
timestamp=timestamp,
file_contract=f_hash,
contract_type=contract_type,
)
model_obj = ContractsSolar(
institution=institution_id,
upload_user=user_id,
file_contract=f_hash,
contracts_type=contracts_type,
)
self.store_model(model_obj)
self.store_model(model_obj)
return 'Nalaganje pogodbe je bilo uspešno.'
@staticmethod
@@ -154,12 +166,30 @@ def get_upload_history(user_id, n=20):
return UploadSolar.query.filter_by(upload_user=user_id).order_by(desc(UploadSolar.timestamp)).limit(n).all()
def get_all_institutions():
# TODO: do filtering purely within an SQL query
res = []
for institution in Institution.query.all():
row = CorpusAccess.query.filter_by(institution=institution.id, corpus='solar').first()
if row:
res.append(institution)
return res
def get_institution_student_contracts(institution_id):
return ContractsSolar.query.filter_by(id=institution_id, contract_type='ucenci-starsi').all()
return ContractsSolar.query.filter_by(institution=institution_id, contract_type='ucenci-starsi').all()
def get_institution_contract(institution_id):
return ContractsSolar.query.filter_by(id=institution_id, contract_type='sola').first()
return InstitutionContract.query.filter_by(institution=institution_id, corpus='solar').order_by(desc(InstitutionContract.timestamp)).first()
def get_all_active_users():
# TODO: do filtering purely within an SQL query
res = []
active_users = RegisteredUser.query.filter_by(active=True).all()
for user in active_users:
if has_user_corpus_access(user.id, 'solar'):
res.append(user)
return res