import logging import re import hashlib from datetime import datetime from sqlalchemy import desc from collections import Counter from portal.base import UploadHandler, get_user_institution, has_user_corpus_access from . model import * VALID_PROGRAMS = {'OS', 'SSG', 'MGP', 'ZG', 'NPI', 'SPI', 'SSI', 'PTI'} VALID_SUBJECTS = {'slo', 'drug-jez', 'drug-druz', 'drug-narav', 'drug-strok', 'drug-izb'} VALID_TEXT_TYPES = {'esej-spis', 'prakticno', 'solski-test', 'delo-v-razredu'} VALID_GRAMMAR_CORRECTIONS = {'popr-ne', 'brez-popr', 'popr-da'} VALID_REGIONS = {'CE', 'GO', 'KK', 'KP', 'KR', 'LJ', 'MB', 'MS', 'NM', 'PO', 'SG'} MAXLEN_FORM = 150 class UploadHandlerSolar(UploadHandler): @staticmethod def store_metadata(upload_metadata, user_id): timestamp = datetime.fromtimestamp(upload_metadata['timestamp']) form_data = upload_metadata['form_data'] file_hashes = upload_metadata['file_hashes_dict'] sorted_f_hashes = list(file_hashes.values()) sorted_f_hashes.sort() institution_id = get_user_institution(user_id).id model_obj = UploadSolar( upload_user = user_id, institution = institution_id, upload_hash=upload_metadata['upload_id'], timestamp=timestamp, program=form_data['program'], subject=form_data['predmet'], subject_custom=form_data['predmet-custom'], grade=form_data['letnik'], text_type=form_data['vrsta'], text_type_custom=form_data['vrsta-custom'], school_year=form_data['solsko-leto'], grammar_corrections=form_data['jezikovni-popravki'], upload_file_hashes=sorted_f_hashes ) UploadHandler.store_model(model_obj) def handle_upload(self, request, user_id): err = self.check_upload_request(request) if err: return err, 400 err = self.check_form(request.form) if err: return err, 400 # Parse request. upload_metadata = self.extract_upload_metadata('solar', request) logging.info('Upload from user "{}" with upload id "{}" supplied form data: {}'.format( user_id, upload_metadata['upload_id'], str(upload_metadata['form_data'] ))) # Store uploaded files to disk. self.store_datafiles(request.files, upload_metadata) # Store to database. self.store_metadata(upload_metadata, user_id) return 'Uspešno ste oddali datotek(e). Št. datotek: {}'.format(len(request.files)) def handle_contract_upload(self, request, user_id): contract_type = request.form['tip-pogodbe'] if contract_type not in ['sola', 'ucenci-starsi']: return 'Neveljaven tip pogodbe.' #for key, f in request.files.items(): for f in request.files.getlist("file[]"): mimetype = f.content_type if mimetype != 'application/pdf': return 'Datoteka "{}" ni formata PDF.'.format(f.filename) if not f: return 'Niste naložili nobene datoteke.' base = self.get_uploads_subdir('contracts') f_hash = hashlib.md5(f.read()).hexdigest() f.seek(0, 0) # First byte used for indexing, similarly like git does for example. sub_dir = base / f_hash[:2] if not sub_dir.exists(): sub_dir.mkdir() path = sub_dir / (f_hash[2:] + '.pdf') f.save(path) timestamp = datetime.now() user_institution_mapping = UserInstitutionMapping.query.filter_by(user=user_id).first() if user_institution_mapping is None: return 'Vaš uporabnik ni dodeljen nobeni inštituciji.' institution_id = user_institution_mapping.institution is_institution_coordinator = True if user_institution_mapping.role == 'coordinator' else False if contract_type == 'sola': if not is_institution_coordinator: return 'Vaš uporabnik nima pravic za nalaganje pogodbe s šolo.' # TODO: insert institution contract model_obj = InstitutionContract( institution=institution_id, corpus='solar', timestamp=timestamp, file_contract=f_hash, original_filename=f.filename ) self.store_model(model_obj) else: model_obj = ContractsSolar( institution=institution_id, upload_user=user_id, timestamp=timestamp, file_contract=f_hash, contract_type=contract_type, original_filename=f.filename ) self.store_model(model_obj) return 'Nalaganje pogodbe je bilo uspešno.' @staticmethod def check_form(form): program = form['program'] predmet = form['predmet'] letnik = int(form['letnik']) vrsta = form['vrsta'] solsko_leto = form['solsko-leto'] jezikovni_popravki = form['jezikovni-popravki'] if program not in VALID_PROGRAMS: return 'Invalid program "{}"'.format(program) if predmet not in VALID_SUBJECTS: return 'Invalid subject "{}"'.format(predmet) if letnik < 1 or letnik > 9: return 'Invalid grade: {}'.format(letnik) if vrsta not in VALID_TEXT_TYPES: return 'Invalid text type "{}"'.format(vrsta) if not re.match('^\d{0,2}-\d{0,2}$', solsko_leto): return 'Invalid school year "{}"'.format(solsko_leto) if jezikovni_popravki not in VALID_GRAMMAR_CORRECTIONS: return 'Invalid text type "{}"'.format(jezikovni_popravki) for key, val in form.items(): if len(val) > MAXLEN_FORM: return 'Value in form field "{}" exceeds max len of {}'.format(key, MAXLEN_FORM) def get_upload_history(user_id, n=20): if n == -1: return UploadSolar.query.filter_by(upload_user=user_id).order_by(desc(UploadSolar.timestamp)).all() return UploadSolar.query.filter_by(upload_user=user_id).order_by(desc(UploadSolar.timestamp)).limit(n).all() def get_institution_upload_history(institution_id, n=20): return UploadSolar.query.filter_by(institution=institution_id).order_by(desc(UploadSolar.timestamp)).limit(n).all() def get_all_upload_history(n=20): if n == -1: return UploadSolar.query.order_by(desc(UploadSolar.timestamp)).all() return UploadSolar.query.order_by(desc(UploadSolar.timestamp)).limit(n).all() def get_all_institutions(): # TODO: do filtering purely within an SQL query res = [] for institution in Institution.query.filter_by(is_removed=False).all(): row = CorpusAccess.query.filter_by(institution=institution.id, corpus='solar').first() if row: res.append(institution) return res def get_institution_student_contracts(institution_id, user_id=None): if not user_id: return ContractsSolar.query.filter_by(institution=institution_id, contract_type='ucenci-starsi').all() return ContractsSolar.query.filter_by(institution=institution_id, contract_type='ucenci-starsi', upload_user=user_id).all() def get_institution_contract(institution_id): return InstitutionContract.query.filter_by(institution=institution_id, corpus='solar').order_by(desc(InstitutionContract.timestamp)).first() def get_top_uploading_institutions(): res = dict() institutions = get_all_institutions() for institution in institutions: uploads = UploadSolar.query.filter_by(institution=institution.id).all() for upload in uploads: if institution.name not in res: res[institution.name] = 0 res[institution.name] += len(upload.upload_file_hashes) if len(res) >= 5: return dict(sorted(res.items(), key=lambda x:x[1], reverse=True)[:5]) return dict(sorted(res.items(), key=lambda x:x[1], reverse=True)) def get_all_active_users(): # TODO: do filtering purely within an SQL query res = [] active_users = RegisteredUser.query.filter_by(active=True).all() for user in active_users: if has_user_corpus_access(user.id, 'solar'): res.append(user) return res def update_upload_item(item_id, program, subject, subject_custom, grade, text_type, text_type_custom, school_year, grammar_corrections): rowcount = db.session.query(UploadSolar).filter_by(id=item_id).update({ 'program': program, 'subject': subject, 'subject_custom': subject_custom, 'grade': grade, 'text_type': text_type, 'text_type_custom': text_type_custom, 'school_year': school_year, 'grammar_corrections': grammar_corrections }) db.session.commit() return rowcount