434 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			434 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import os
 | |
| import re
 | |
| import configparser
 | |
| from pathlib import Path
 | |
| from werkzeug.security import check_password_hash
 | |
| 
 | |
| from flask import Flask, render_template, request, redirect, flash, safe_join, send_file
 | |
| from flask_dropzone import Dropzone
 | |
| from flask_migrate import Migrate, MigrateCommand
 | |
| from flask_script import Manager
 | |
| from flask_login import LoginManager, login_required, login_user, current_user, logout_user
 | |
| from portal.model import db, RegisteredUser
 | |
| 
 | |
| import portal.base
 | |
| import portal.solar
 | |
| import portal.regular
 | |
| import portal.predavanja
 | |
| 
 | |
| 
 | |
| # TODO: Implement user registration.
 | |
| # TODO: Integrate Shibboleth login.
 | |
| 
 | |
| 
 | |
| 
 | |
| # TODO: make logging level configurable
 | |
| logging.basicConfig(level=logging.DEBUG, format='[APP LOGGER] %(asctime)s %(levelname)s: %(message)s')
 | |
| 
 | |
| ######################
 | |
| # Load configuration #
 | |
| ######################
 | |
| config = configparser.ConfigParser()
 | |
| config.read('config.ini')
 | |
| config = config['DEFAULT']
 | |
| 
 | |
| MAIL_HOST = config['MAIL_HOST'] 
 | |
| MAIL_LOGIN = config['MAIL_LOGIN']
 | |
| MAIL_PASS = config['MAIL_PASS']
 | |
| APP_SECRET_KEY = bytes.fromhex(config['APP_SECRET_KEY'])
 | |
| SMTP_PORT = int(config['SMTP_PORT'])
 | |
| IMAP_PORT = int(config['IMAP_PORT'])
 | |
| MAX_UPLOAD_SIZE = int(config['MAX_UPLOAD_SIZE']) # Bytes
 | |
| MAX_FILES_PER_UPLOAD = int(config['MAX_FILES_PER_UPLOAD'])
 | |
| CONTRACT_CLIENT_CONTACT = config['CONTRACT_CLIENT_CONTACT']
 | |
| MAIL_SUBJECT = config['MAIL_SUBJECT']
 | |
| MAIL_BODY = config['MAIL_BODY']
 | |
| MAIL_SUBJECT_PREDAVANJA = config['MAIL_SUBJECT_PREDAVANJA']
 | |
| MAIL_BODY_PREDAVANJA = config['MAIL_BODY_PREDAVANJA']
 | |
| SQL_CONN_STR = config['SQL_CONN_STR']
 | |
| DESC_PREVODI = config['DESC_PREVODI']
 | |
| DESC_GIGAFIDA = config['DESC_GIGAFIDA']
 | |
| DESC_PREDAVANJA = config['DESC_PREDAVANJA']
 | |
| 
 | |
| if 'UPLOADS_DIR' in config:
 | |
|     UPLOADS_DIR = Path(config['UPLOADS_DIR'])
 | |
| else:
 | |
|     UPLOADS_DIR = Path(__file__).resolve().parent / 'uploads'
 | |
| if not UPLOADS_DIR.exists:
 | |
|     UPLOADS_DIR.mkdir(parents=True)
 | |
| 
 | |
| # Override configs with environment variables, if set
 | |
| if 'PORTALDS4DS1_MAIL_HOST' in os.environ:
 | |
|     MAIL_HOST = os.environ['PORTALDS4DS1_MAIL_HOST']
 | |
| if 'PORTALDS4DS1_MAIL_LOGIN' in os.environ:
 | |
|     MAIL_LOGIN = os.environ['PORTALDS4DS1_MAIL_LOGIN']
 | |
| if 'PORTALDS4DS1_MAIL_PASS' in os.environ:
 | |
|     MAIL_PASS = os.environ['PORTALDS4DS1_MAIL_PASS']
 | |
| if 'PORTALDS4DS1_APP_SECRET_KEY' in os.environ:
 | |
|     APP_SECRET_KEY = bytes.fromhex(os.environ['PORTALDS4DS1_APP_SECRET_KEY'])
 | |
| if 'PORTALDS4DS1_SMTP_PORT' in os.environ:
 | |
|     SMTP_PORT = int(os.environ['PORTALDS4DS1_SMTP_PORT'])
 | |
| if 'PORTALDS4DS1_IMAP_PORT' in os.environ:
 | |
|     IMAP_PORT = int(os.environ['PORTALDS4DS1_IMAP_PORT'])
 | |
| if 'PORTALDS4DS1_MAX_UPLOAD_SIZE' in os.environ:
 | |
|     MAX_UPLOAD_SIZE = int(os.environ['PORTALDS4DS1_MAX_UPLOAD_SIZE'])
 | |
| if 'PORTALDS4DS1_MAX_FILES_PER_UPLOAD' in os.environ:
 | |
|     MAX_FILES_PER_UPLOAD = int(os.environ['PORTALDS4DS1_MAX_FILES_PER_UPLOAD'])
 | |
| if 'PORTALDS4DS1_CONTRACT_CLIENT_CONTACT' in os.environ:
 | |
|     CONTRACT_CLIENT_CONTACT = os.environ['PORTALDS4DS1_CONTRACT_CLIENT_CONTACT']
 | |
| if 'PORTALDS4DS1_UPLOADS_DIR' in os.environ:
 | |
|     UPLOADS_DIR = os.environ['PORTALDS4DS1_UPLOADS_DIR']
 | |
| if 'PORTALDS4DS1_MAIL_SUBJECT' in os.environ:
 | |
|     MAIL_SUBJECT = os.environ['PORTALDS4DS1_MAIL_SUBJECT']
 | |
| if 'PORTALDS4DS1_MAIL_BODY' in os.environ:
 | |
|     MAIL_BODY = os.environ['PORTALDS4DS1_MAIL_BODY']
 | |
| if 'PORTALDS4DS1_MAIL_SUBJECT_PREDAVANJA' in os.environ:
 | |
|     MAIL_SUBJECT_PREDAVANJA = os.environ['PORTALDS4DS1_MAIL_SUBJECT_PREDAVANJA']
 | |
| if 'PORTALDS4DS1_MAIL_BODY_PREDAVANJA' in os.environ:
 | |
|     MAIL_BODY_PREDAVANJA = os.environ['PORTALDS4DS1_MAIL_BODY_PREDAVANJA']
 | |
| if 'PORTALDS4DS1_SQL_CONN_STR' in os.environ:
 | |
|     SQL_CONN_STR = os.environ['PORTALDS4DS1_SQL_CONN_STR']
 | |
| if 'PORTALDS4DS1_DESC_PREVODI' in os.environ:
 | |
|     DESC_PREVODI = os.environ['PORTALDS4DS1_DESC_PREVODI']
 | |
| if 'PORTALDS4DS1_DESC_GIGAFIDA' in os.environ:
 | |
|     DESC_GIGAFIDA = os.environ['PORTALDS4DS1_DESC_GIGAFIDA']
 | |
| 
 | |
| ENABLED_CORPUSES = ['prevodi', 'gigafida', 'solar', 'predavanja']
 | |
| CORPUSES_LOGIN_REQUIRED = ['solar']
 | |
| 
 | |
| 
 | |
| ######################
 | |
| 
 | |
| app = Flask(__name__)
 | |
| 
 | |
| app.config.update(
 | |
|     SECRET_KEY = APP_SECRET_KEY,
 | |
|     UPLOADED_PATH = UPLOADS_DIR,
 | |
|     MAX_CONTENT_LENGTH = MAX_UPLOAD_SIZE,
 | |
|     TEMPLATES_AUTO_RELOAD = True,
 | |
|     SQLALCHEMY_DATABASE_URI = SQL_CONN_STR,
 | |
|     SQLALCHEMY_ECHO = True
 | |
| )
 | |
| app.url_map.strict_slashes = False
 | |
| 
 | |
| # Run "python app.py db -?" to see more info about DB migrations.
 | |
| manager = Manager(app)
 | |
| db.init_app(app)
 | |
| migrate = Migrate(app, db)
 | |
| manager.add_command('db', MigrateCommand)
 | |
| 
 | |
| # Set up dropzone.js to serve all the stuff for "file dropping" on the web interface.
 | |
| dropzone = Dropzone(app)
 | |
| 
 | |
| upload_handler_regular = portal.regular.UploadHandlerRegular(
 | |
|         UPLOADS_DIR=UPLOADS_DIR,
 | |
|         MAIL_HOST=MAIL_HOST,
 | |
|         MAIL_LOGIN=MAIL_LOGIN,
 | |
|         MAIL_PASS=MAIL_PASS,
 | |
|         SMTP_PORT=SMTP_PORT,
 | |
|         IMAP_PORT=IMAP_PORT,
 | |
|         MAIL_SUBJECT=MAIL_SUBJECT,
 | |
|         MAIL_BODY=MAIL_BODY,
 | |
|         CONTRACT_CLIENT_CONTACT=CONTRACT_CLIENT_CONTACT,
 | |
|         MAX_FILES_PER_UPLOAD=MAX_FILES_PER_UPLOAD
 | |
|         )
 | |
| 
 | |
| upload_handler_solar = portal.solar.UploadHandlerSolar(
 | |
|         UPLOADS_DIR=UPLOADS_DIR,
 | |
|         MAIL_HOST=MAIL_HOST,
 | |
|         MAIL_LOGIN=MAIL_LOGIN,
 | |
|         MAIL_PASS=MAIL_PASS,
 | |
|         SMTP_PORT=SMTP_PORT,
 | |
|         IMAP_PORT=IMAP_PORT,
 | |
|         MAIL_SUBJECT=MAIL_SUBJECT,
 | |
|         MAIL_BODY=MAIL_BODY,
 | |
|         CONTRACT_CLIENT_CONTACT=CONTRACT_CLIENT_CONTACT,
 | |
|         MAX_FILES_PER_UPLOAD=MAX_FILES_PER_UPLOAD
 | |
|         )
 | |
| 
 | |
| upload_handler_predavanja = portal.predavanja.UploadHandlerPredavanja(
 | |
|         UPLOADS_DIR=UPLOADS_DIR,
 | |
|         MAIL_HOST=MAIL_HOST,
 | |
|         MAIL_LOGIN=MAIL_LOGIN,
 | |
|         MAIL_PASS=MAIL_PASS,
 | |
|         SMTP_PORT=SMTP_PORT,
 | |
|         IMAP_PORT=IMAP_PORT,
 | |
|         MAIL_SUBJECT=MAIL_SUBJECT_PREDAVANJA,
 | |
|         MAIL_BODY=MAIL_BODY_PREDAVANJA,
 | |
|         CONTRACT_CLIENT_CONTACT=CONTRACT_CLIENT_CONTACT,
 | |
|         MAX_FILES_PER_UPLOAD=MAX_FILES_PER_UPLOAD
 | |
|         )
 | |
| 
 | |
| 
 | |
| # Use flask-login to manage user sessions where they are required.
 | |
| login_manager = LoginManager(app)
 | |
| login_manager.init_app(app)
 | |
| 
 | |
| 
 | |
| @app.route('/')
 | |
| def index():
 | |
|     return render_template('index.html')
 | |
| 
 | |
| 
 | |
| @app.route('/<corpus_name>')
 | |
| def index_corpus(corpus_name):
 | |
|     if corpus_name not in ENABLED_CORPUSES:
 | |
|         return 'Korpus "{}" ne obstaja.'.format(corpus_name), 404
 | |
| 
 | |
|     if corpus_name == 'prevodi':
 | |
|         description = DESC_PREVODI
 | |
|     elif corpus_name == 'gigafida':
 | |
|         description = DESC_GIGAFIDA
 | |
|     elif corpus_name == 'predavanja':
 | |
|         return render_template('basic-predavanja.html', description=DESC_PREDAVANJA, max_files=MAX_FILES_PER_UPLOAD)
 | |
|     elif corpus_name == 'solar':
 | |
|         if current_user.is_authenticated:
 | |
|             return redirect('/solar/oddaja')
 | |
|         return redirect('/solar/login')
 | |
| 
 | |
|     return render_template('basic.html', 
 | |
|             corpus_name=corpus_name, description=description, max_files=MAX_FILES_PER_UPLOAD)
 | |
| 
 | |
| 
 | |
| @login_manager.user_loader
 | |
| def load_user(user_id):
 | |
|     user = RegisteredUser.query.get(int(user_id))
 | |
|     return user
 | |
| 
 | |
| 
 | |
| @app.route('/solar/login')
 | |
| def login_get():
 | |
|     return render_template('login.html', corpus_name='solar', title='ŠOLAR')
 | |
| 
 | |
| 
 | |
| @app.route('/<corpus_name>/login', methods=['POST'])
 | |
| def login_post(corpus_name):
 | |
|     if corpus_name not in ENABLED_CORPUSES or corpus_name not in CORPUSES_LOGIN_REQUIRED:
 | |
|         return '', 404
 | |
| 
 | |
|     email = request.form.get('email')
 | |
|     password = request.form.get('password')
 | |
|     remember = True if request.form.get('remember') else False
 | |
| 
 | |
|     user = RegisteredUser.query.filter_by(email=email).first()
 | |
| 
 | |
|     if not user or not check_password_hash(user.pass_hash, password):
 | |
|         flash('Napačni podatki za prijavo. Poskusite ponovno.')
 | |
|         return redirect('/{}/login'.format(corpus_name))
 | |
| 
 | |
|     if not user.active:
 | |
|         flash('Vaš uporabniški račun še ni bil aktiviran.')
 | |
|         return redirect('/{}/login'.format(corpus_name))
 | |
| 
 | |
|     # Check if user is authorized to log into this corpus. Admins are an exception.
 | |
|     if not portal.base.has_user_corpus_access(user.id, corpus_name):
 | |
|         flash('Nimate dostopa do tega korpusa.')
 | |
|         return redirect('/{}/login'.format(corpus_name))
 | |
| 
 | |
|     #portal.base.add_user_session(user.id)
 | |
|     login_user(user, remember=remember)
 | |
| 
 | |
|     if corpus_name == 'solar':
 | |
|         return redirect('/solar/oddaja')
 | |
|     return '', 404
 | |
| 
 | |
| 
 | |
| # TODO: Move solar stuff to seperate file using Flask blueprints.
 | |
| # TODO: Better routing logic.
 | |
| 
 | |
| @app.route('/solar/logout')
 | |
| @login_required
 | |
| def logout():
 | |
|     logout_user()
 | |
|     return redirect('/solar/login')
 | |
| 
 | |
| 
 | |
| @app.route('/solar/<path:text>')
 | |
| @login_required
 | |
| def solar(text):
 | |
|     if not portal.base.has_user_corpus_access(current_user.id, 'solar'):
 | |
|         return '', 404
 | |
|     if text.startswith('oddaja/') or text == 'oddaja':
 | |
|         return render_template('solar-oddaja.html')
 | |
|     elif text.startswith('zgodovina/') or text == 'zgodovina':
 | |
|         upload_items = portal.solar.get_upload_history(current_user.id)
 | |
|         uploader_names = []
 | |
|         institution_names = []
 | |
|         for item in upload_items:
 | |
|             uploader_names.append(portal.base.get_user_obj(current_user.id).name)
 | |
|             institution = portal.base.get_institution_obj(item.institution)
 | |
|             if not institution:
 | |
|                 institution_names.append(None)
 | |
|             else:
 | |
|                 institution_names.append(institution.name)
 | |
|         return render_template('solar-zgodovina.html', upload_history=upload_items, uploader_names=uploader_names,
 | |
|                 institution_names=institution_names)
 | |
|     elif text.startswith('pogodbe/') or text == 'pogodbe':
 | |
|         # Check for ownload contract request.
 | |
|         match = re.match('^pogodbe/([a-z0-9_]+\.pdf)$', text)
 | |
|         if match:
 | |
|             filename = match.group(1)
 | |
|             if len(filename) < 10:
 | |
|                 return '', 404
 | |
|             prefix = filename[:2]
 | |
|             suffix = filename[2:]
 | |
| 
 | |
|             safe_path = safe_join(str(upload_handler_solar.get_uploads_subdir('contracts')), prefix, suffix)
 | |
|             try:
 | |
|                 return send_file(safe_path, as_attachment=True)
 | |
|             except FileNotFoundError:
 | |
|                 return '', 404
 | |
| 
 | |
|         user_obj = portal.base.get_user_obj(current_user.get_id())
 | |
|         institutions = portal.base.get_user_institutions(user_obj.id)
 | |
|         contracts_students = []
 | |
|         contract_school = []
 | |
|         enable_upload_school_contract = False
 | |
|         show_upload_form = False
 | |
|         if len(institutions) > 0:
 | |
|             show_upload_form = True
 | |
|             institution = portal.base.get_user_institutions(user_obj.id)[0]
 | |
|             contracts_students = portal.solar.get_institution_student_contracts(institution.id)
 | |
|             contract_school = portal.solar.get_institution_contract(institution.id)
 | |
| 
 | |
|             if portal.base.is_institution_moderator(user_obj.id, institution.id):
 | |
|                 enable_upload_school_contract = True
 | |
| 
 | |
|         return render_template('solar-pogodbe.html', contracts_students=contracts_students,
 | |
|                 contract_school=contract_school, 
 | |
|                 enable_upload_school_contract=enable_upload_school_contract,
 | |
|                 show_upload_form=show_upload_form)
 | |
|     elif text.startswith('admin/') or text == 'admin':
 | |
|         solar_users = portal.base.get_all_active_users()
 | |
|         solar_institutions = portal.solar.get_all_institutions()
 | |
|         if current_user.role == 'admin':
 | |
|             return render_template('solar-admin.html', users=solar_users, institutions=solar_institutions)
 | |
|     return '', 404
 | |
| 
 | |
| @app.route('/solar/pogodbe', methods=['POST'])
 | |
| @login_required
 | |
| def solar_upload_contract():
 | |
|     if not portal.base.has_user_corpus_access(current_user.id, 'solar'):
 | |
|         return '', 404
 | |
| 
 | |
|     return upload_handler_solar.handle_contract_upload(request, current_user.get_id())
 | |
| 
 | |
| 
 | |
| @app.route('/<corpus_name>/adduser', methods=['POST'])
 | |
| @login_required
 | |
| def solar_add_user(corpus_name):
 | |
|     if not portal.base.is_admin(current_user.id):
 | |
|         return '', 404
 | |
|     if not corpus_name in ENABLED_CORPUSES:
 | |
|         return '', 404
 | |
| 
 | |
|     name = request.form['name']
 | |
|     email = request.form['email']
 | |
|     password = request.form['password']
 | |
| 
 | |
|     if not name:
 | |
|         return 'Prazno polje za ime.'
 | |
|     if len(name) > 100:
 | |
|         return 'Predolgo ime.'
 | |
| 
 | |
|     if not email:
 | |
|         return 'Prazno polje za elektronsko pošto.'
 | |
|     if len(email) > 100:
 | |
|         return 'Predolgi email naslov'
 | |
|     elif not re.search(portal.base.REGEX_EMAIL, email):
 | |
|         return 'Email napačnega formata.'
 | |
| 
 | |
|     if not password:
 | |
|         return 'Prazno polje za geslo.'
 | |
|     if len(password) > 100:
 | |
|         return 'Predolgo geslo.'
 | |
| 
 | |
|     portal.base.register_new_user(name, email, password)
 | |
| 
 | |
|     return 'Uporabnik je bil dodan.'
 | |
| 
 | |
| 
 | |
| @app.route('/solar/deluser', methods=['POST'])
 | |
| @login_required
 | |
| def solar_del_user():
 | |
|     # TODO: check if user is institution moderator for the added users institution or is an admin
 | |
|     # TODO: delete from "user", "user_institution_mapping", update "institution_contract" set user to NULL
 | |
|     return '', 404
 | |
| 
 | |
| @app.route('/<corpus_name>/addinstitution', methods=['POST'])
 | |
| @login_required
 | |
| def add_institution(corpus_name):
 | |
|     if not portal.base.is_admin(current_user.id):
 | |
|         return '', 404
 | |
|     if not corpus_name in ENABLED_CORPUSES:
 | |
|         return '', 404
 | |
| 
 | |
|     name = request.form['name']
 | |
|     region = request.form['region']
 | |
| 
 | |
|     if not name:
 | |
|         return 'Prazno polje za ime.'
 | |
|     if len(name) > 100:
 | |
|         return 'Predolgo ime.'
 | |
| 
 | |
|     if not region:
 | |
|         return 'Prazno polje za regijo.'
 | |
|     if len(region) > 100:
 | |
|         return 'Predolgi niz za regijo.'
 | |
| 
 | |
|     institution_id = portal.base.add_institution(name, region)
 | |
|     portal.base.grant_institution_corpus_access(institution_id, corpus_name)
 | |
|     return 'Institucija je bila dodana.'
 | |
| 
 | |
| @app.route('/<corpus_name>/addusertoinstitution', methods=['POST'])
 | |
| @login_required
 | |
| def add_user_institution_mapping(corpus_name):
 | |
|     if not portal.base.is_admin(current_user.id):
 | |
|         return '', 404
 | |
|     if not corpus_name in ENABLED_CORPUSES:
 | |
|         return '', 404
 | |
| 
 | |
|     user_id = request.form['user_id']
 | |
|     institution_id = request.form['institution_id']
 | |
|     role = request.form['role']
 | |
|     if role not in ['moderator', 'user']:
 | |
|         return '', 404
 | |
| 
 | |
|     # TODO: remove this restriction
 | |
|     if len(portal.base.get_user_institutions(user_id)) > 0:
 | |
|         return 'Uporabnik je že dodeljen instituciji. Dodeljevanje večim institucijam '\
 | |
|                 'zaenkrat ni implementirano.'
 | |
| 
 | |
|     portal.base.add_user_to_institution(user_id, institution_id, role)
 | |
|     return 'Uporabnik je bil dodeljen instituciji.'
 | |
| 
 | |
| @app.route('/<corpus_name>/delinstitution', methods=['POST'])
 | |
| @login_required
 | |
| def del_institution(corpus_name):
 | |
|     # TODO: check if valid corpus_name
 | |
|     # TODO: check if user is admin
 | |
|     # TODO: delete cascade - institution, user_institution_mapping, corpus_access, institution_contract
 | |
|     return '', 404
 | |
| 
 | |
| 
 | |
| @app.route('/<corpus_name>/upload', methods=['POST'])
 | |
| def handle_upload(corpus_name):
 | |
|     if corpus_name not in ENABLED_CORPUSES:
 | |
|         return '', 404
 | |
|     
 | |
|     if corpus_name == 'solar':
 | |
|         if not current_user.is_authenticated:
 | |
|             return '', 404
 | |
|         if not portal.base.has_user_corpus_access(current_user.id, corpus_name):
 | |
|             return '', 404
 | |
|         return upload_handler_solar.handle_upload(request, current_user.get_id())
 | |
|     elif corpus_name == 'predavanja':
 | |
|         return upload_handler_predavanja.handle_upload(request)
 | |
|     else:
 | |
|         return upload_handler_regular.handle_upload(request, corpus_name)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     app.run(debug=True)
 |