import configparser import csv import hashlib import json import os import random import re import shutil import string import time from pathlib import Path import requests from flask import Flask, render_template, request, send_file, redirect, url_for from flask_babel import Babel, gettext from werkzeug.utils import secure_filename from stark import run UPLOAD_FOLDER = 'uploads' ALLOWED_EXTENSIONS = {'conllu'} DAYS_BEFORE_DELETION = 7 DEFAULT_LANGUAGE = 'en' LANGUAGES = ['en', 'sl'] _translations = { 'en': {}, 'sl': {}, } def get_locale(): lang = request.args.get('lang') if lang in LANGUAGES: return lang return DEFAULT_LANGUAGE def ilog(n, base): """ Find the integer log of n with respect to the base. >>> import math >>> for base in range(2, 16 + 1): ... for n in range(1, 1000): ... assert ilog(n, base) == int(math.log(n, base) + 1e-10), '%s %s' % (n, base) """ count = 0 while n >= base: count += 1 n //= base return count def sci_notation(n, prec=2): """ Represent n in scientific notation, with the specified precision. >>> sci_notation(1234 * 10**1000) '1.234e+1003' >>> sci_notation(10**1000 // 2, prec=1) '5.0e+999' """ if -100000 < n < 100000: return str(n) if n < 0: return "-" + sci_notation(-n, prec=prec) base = 10 exponent = ilog(n, base) mantissa = n / base**exponent return '{0:.{1}f}e{2:+d}'.format(mantissa, prec, exponent) def create_app(): app = Flask(__name__, static_url_path='/stark/static') babel = Babel(app, locale_selector=get_locale, default_translation_directories='translations') babel.list_translations = ['en', 'sl'] app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER def create_default_configs(): configs = {} # mandatory parameters configs['input_path'] = 'data/sl_ssj-ud_v2.4.conllu' configs['output'] = 'results/out_official.tsv' configs['tree_size'] = '2-4' configs['node_type'] = 'upos' # mandatory parameters with default value configs['internal_saves'] = './internal_saves' configs['cpu_cores'] = 1 configs['complete_tree_type'] = True configs['dependency_type'] = True configs['node_order'] = True configs['association_measures'] = False configs['label_whitelist'] = [] configs['root_whitelist'] = [] configs['query'] = None configs['compare'] = None configs['frequency_threshold'] = 0 configs['lines_threshold'] = None configs['continuation_processing'] = False configs['nodes_number'] = True configs['print_root'] = True if configs['compare'] is not None: configs['other_input_path'] = configs['compare'] return configs def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route('/stark/about', methods=['GET']) def about(): return render_template('about.html') @app.route('/stark/visualization///', methods=['GET']) def visualization(result_id, sentence_id, subtree_ids): annodoc = '' subtree_ids = subtree_ids.split('+') for subtree_id in subtree_ids: annodoc += f'# visual-style {subtree_id} bgColor:lightgreen\n' with open(os.path.join('media', result_id, 'annodoc', sentence_id), 'r') as rf: annodoc += rf.read() + '\n\n' return {'annodoc': annodoc} @app.route('/stark/result//', methods=['GET']) def examples(result_id, subtree_hash): # find example details with open(os.path.join('media', result_id, 'result.tsv'), 'r') as rf: content = list(csv.reader(rf, delimiter='\t')) head = content[0] content_dict = {h: [] for h in head} table_columns2displayed_table_columns = { 'Tree': gettext('Tree'), 'Absolute frequency': gettext('Frequency'), 'Absolute frequency in second treebank': gettext('Frequency in B'), 'Order': gettext('Order'), 'Number of nodes': gettext('Number of nodes'), 'Head node': gettext('Head node'), 'Grew-match URL': gettext('Grew-match URL'), 'Ratio': gettext('Ratio'), '%DIFF': gettext('%DIFF'), 'OR': gettext('OR'), 'BIC': gettext('BIC'), 'MI': gettext('MI'), 'logDice': gettext('logDice'), 't-score': gettext('t-score') } if 'Absolute frequency in second treebank' in head: table_columns2displayed_table_columns['Absolute frequency'] = gettext('Frequency in A') else: del table_columns2displayed_table_columns['Absolute frequency in second treebank'] displayed_table_columns2table_columns = {v: k for k, v in table_columns2displayed_table_columns.items()} annodoc_index = head.index('Annodoc') selected_content = [row for row in content[1:] if json.loads(row[annodoc_index])['subtree_hash'] == subtree_hash] for i, row in enumerate(selected_content): for j, v in enumerate(row): content_dict[head[j]].append(v) head = [(k, v) for k, v in table_columns2displayed_table_columns.items() if k in head] displayed_content_dict = {} for f_h, h in head: if f_h == '%DIFF' or f_h == 'OR': displayed_content_dict[f_h] = [sci_notation(eval(n)) for n in content_dict[displayed_table_columns2table_columns[h]]] else: displayed_content_dict[f_h] = content_dict[displayed_table_columns2table_columns[h]] # add visualization parts to dict visualization_dict = {'example_id': [], 'example_positions': []} with open(os.path.join('media', result_id, 'annodoc_detailed', subtree_hash), 'r') as rf: for vis in list(csv.reader(rf, delimiter='\t')): visualization_dict['example_id'].append(vis[0]) visualization_dict['example_positions'].append('+'.join(map(str, eval(vis[1])))) return render_template('examples.html', head=head, content=displayed_content_dict, visualization=visualization_dict) @app.route('/stark/result/', methods=['GET', 'POST']) def result(result_id): if request.method == 'POST': for filename in os.listdir('media'): path = os.path.join('media', filename) if os.path.isdir(path): file_path = os.path.join('media', filename, 'result.tsv') else: file_path = path f_t = os.path.getmtime(file_path) c_t = time.time() file_age_seconds = c_t - f_t if file_age_seconds > DAYS_BEFORE_DELETION * 86400: if os.path.isdir(path): shutil.rmtree(os.path.join('media', filename), ignore_errors=True) else: os.remove(path) return send_file(os.path.join('media', result_id, 'result.tsv'), as_attachment=True, download_name='results.tsv') with open(os.path.join('media', result_id, 'result.tsv'), 'r') as rf: content = list(csv.reader(rf, delimiter='\t')) content = [con + [str(i)] for i, con in enumerate(content)] head = content[0] content_dict = {h: [] for h in head} table_columns2displayed_table_columns = { 'Tree': gettext('Tree'), 'Absolute frequency': gettext('Frequency'), 'Absolute frequency in second treebank': gettext('Frequency in B'), 'Order': gettext('Order'), 'Number of nodes': gettext('Number of nodes'), 'Head node': gettext('Head node'), 'Grew-match URL': gettext('Grew-match URL'), 'Ratio': gettext('Ratio'), '%DIFF': gettext('%DIFF'), 'OR': gettext('OR'), 'BIC': gettext('BIC'), 'MI': gettext('MI'), 'logDice': gettext('logDice'), 't-score': gettext('t-score') } if 'Absolute frequency in second treebank' in head: table_columns2displayed_table_columns['Absolute frequency'] = gettext('Frequency in A') else: del table_columns2displayed_table_columns['Absolute frequency in second treebank'] displayed_table_columns2table_columns = {v: k for k, v in table_columns2displayed_table_columns.items()} order_by_display = request.args.get('order_by') order_by = displayed_table_columns2table_columns[ order_by_display[:-1]] if order_by_display is not None else None order_type = request.args.get('order_type') if order_by is not None and order_by in head: sort_id = head.index(order_by) if order_type == 'asc': # check if a number can be converted to float or int ordered_content = sorted(content[1:], key=lambda x: -1 * float(x[sort_id]) if x[sort_id].isnumeric() or re.match(r'^-?\d+(?:\.\d+)$', x[sort_id]) is not None and order_by != 'Ratio' else x[sort_id], reverse=True) else: ordered_content = sorted(content[1:], key=lambda x: -1 * float(x[sort_id]) if x[sort_id].isnumeric() or re.match(r'^-?\d+(?:\.\d+)$', x[sort_id]) is not None and order_by != 'Ratio' else x[sort_id]) else: ordered_content = content[1:] for i, row in enumerate(ordered_content): for j, v in enumerate(row): content_dict[head[j]].append(v) head = [(k, v) for k, v in table_columns2displayed_table_columns.items() if k in head] displayed_content_dict = {} for f_h, h in head: if f_h == '%DIFF' or f_h == 'OR': # for num_str in content_dict[displayed_table_columns2table_columns[h]]: # if n < 0: # return "-" + sci_notation(-n, prec=prec) # sci_notation(eval(num)) displayed_content_dict[f_h] = [sci_notation(eval(n)) for n in content_dict[displayed_table_columns2table_columns[h]]] else: displayed_content_dict[f_h] = content_dict[displayed_table_columns2table_columns[h]] # add visualization parts to dict annodoc_data = [json.loads(el) for el in content_dict['Annodoc']] displayed_content_dict['example_id'] = [el['id'] for el in annodoc_data] displayed_content_dict['example_positions'] = ['+'.join([str(p) for p in el['positions']]) for el in annodoc_data] displayed_content_dict['subtree_hash'] = [el['subtree_hash'] for el in annodoc_data] return render_template('result.html', head=head, content=displayed_content_dict) @app.route('/stark/', methods=['GET', 'POST']) # @headers({'Cache-Control': 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0'}) def index(): translations = _translations[get_locale()] if request.method == 'POST': form = request.form configs = { 'greedy_counter': 'yes', 'input_path': '', 'tree_size': '1-1000000', 'display_size': '1-1000000', 'complete_tree_type': True, 'ignored_labels': [] } # mandatory parameters validation = {} # handling input if 'file' in request.files and request.files['file']: # store file f = request.files['file'] input_path = os.path.join('media', secure_filename(f.filename)) f.save(input_path) configs['input_path'] = input_path if 'input_url' in form and form['input_url']: validation['file'] = gettext('Please insert either input url or file, not both of them.') validation['input_url'] = gettext('Please insert either input url or file, not both of them.') # TODO OPTIONALLY ADD conllu FILE CHECK elif 'input_url' in form and form['input_url']: try: name = form['input_url'].split('/')[-1] input_path = os.path.join('media', name) response = requests.get(form['input_url']) open(input_path, "wb").write(response.content) configs['input_path'] = input_path except: validation['input_url'] = gettext('Incorrect URL!') else: validation['file'] = gettext('Please insert either input url or provide a file.') validation['input_url'] = gettext('Please insert either input url or provide a file.') if 'display_size' in form and form['display_size']: configs['display_size'] = form['display_size'] def validate_node_type(node_type): node_type_options = {'upos', 'form', 'lemma', 'upos', 'xpos', 'feats', 'deprel'} if len(node_type) == 0: validation['node_type'] = gettext('Please select at least one node type.') return False for el in node_type: if el not in node_type_options: validation['node_type'] = gettext('Node option') + f' {el} ' + gettext('is not supported. Please enter valid options.') return False return True node_type = [] if 'node_type_upos' in form: node_type.append('upos') if 'node_type_form' in form: node_type.append('form') if 'node_type_lemma' in form: node_type.append('lemma') if 'node_type_none' in form: configs['node_type'] = None elif validate_node_type(node_type): configs['node_type'] = '+'.join(node_type) # mandatory parameters with default value configs['internal_saves'] = None # TODO depends on computer configs['cpu_cores'] = 12 configs['dependency_type'] = 'labeled_trees' in form and form['labeled_trees'] == 'on' configs['node_order'] = 'fixed_order' in form and form['fixed_order'] == 'on' configs['association_measures'] = 'association_measures' in form and form['association_measures'] == 'on' configs['label_whitelist'] = [] configs['root_whitelist'] = [] if 'root_restriction' in form and form['root_restriction']: configs['root_whitelist'] = form['root_restriction'].split('|') if 'ignored_labels' in form and form['ignored_labels']: configs['ignored_labels'] = form['ignored_labels'].split('|') if 'query' in form and form['query']: configs['query'] = form['query'] configs['tree_size'] = '0' configs['display_size'] = '0' else: configs['query'] = None # handling input if 'compare_file' in request.files and request.files['compare_file']: # store file f = request.files['compare_file'] input_path = os.path.join('media', secure_filename(f.filename)) f.save(input_path) configs['compare'] = input_path if 'compare_url' in form and form['compare_url']: validation['compare_file'] = gettext('Please insert either compare url or file, not both of them.') validation['compare_url'] = gettext('Please insert either compare url or file, not both of them.') elif 'compare_url' in form and form['compare_url']: try: name = form['compare_url'].split('/')[-1] input_path = os.path.join('media', name) response = requests.get(form['compare_url']) open(input_path, "wb").write(response.content) configs['compare'] = input_path except: configs['compare'] = None validation['compare_url'] = gettext('Incorrect URL!') else: configs['compare'] = None configs['sentence_count_file'] = None configs['detailed_results_file'] = None configs['frequency_threshold'] = 0 if 'frequency_threshold' in form and form['frequency_threshold']: try: int(form['frequency_threshold']) except ValueError: validation['frequency_threshold'] = gettext('Please insert an Integer.') else: configs['frequency_threshold'] = int(form['frequency_threshold']) configs['lines_threshold'] = None configs['node_info'] = None configs['continuation_processing'] = False configs['label_subtypes'] = True configs['nodes_number'] = True configs['print_root'] = True if configs['compare'] is not None: configs['other_input_path'] = configs['compare'] configs['grew_match'] = True configs['depsearch'] = False configs['example'] = True name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=60)) output_path = Path(os.path.join('media', name)) if output_path.exists(): shutil.rmtree(output_path, ignore_errors=True) output_path.mkdir() configs['output'] = os.path.join(output_path, 'result.tsv') configs['annodoc_example_dir'] = os.path.join(output_path, 'annodoc') configs['annodoc_detailed_dir'] = os.path.join(output_path, 'annodoc_detailed') configs['detailed_results_file'] = os.path.join(output_path, 'detailed_results_file.tsv') if len(validation) > 0: return render_template('index.html', validation=validation, translations=translations) try: run(configs) except Exception as e: validation['general'] = gettext('Processing failed! Please recheck your settings, e.g. input format or head node description.') if len(validation) > 0: return render_template('index.html', validation=validation, translations=translations) # check if there are no results with open(os.path.join('media', name, 'result.tsv'), 'r') as rf: content = list(csv.reader(rf, delimiter='\t')) if len(content) == 1: validation['results'] = False return render_template('index.html', validation=validation, translations=translations) order_by = gettext('Frequency ') if not configs['compare'] else gettext('Frequency in A ') return redirect(url_for('result', result_id=name, order_by=order_by, order_type='desc', lang=gettext('code'))) return render_template('index.html', translations=translations) return app if __name__ == '__main__': app = create_app() app.run(debug=True)