svala-scripts/solar2svala.py
2023-08-17 08:44:59 +02:00

286 lines
13 KiB
Python
Executable File

import argparse
import copy
import logging
import os
import shutil
import time
from xml.etree import ElementTree
import json
logging.basicConfig(level=logging.INFO)
def add_token(ind, el, source, target, edges):
source_id = "s" + ind
source.append({"id": source_id, "text": el.text + " "})
target_id = "t" + ind
target.append({"id": target_id, "text": el.text + " "})
edge_id = "e-" + source_id + "-" + target_id
edges[edge_id] = {"id": edge_id, "ids": [source_id, target_id], "labels": [], "manual": False}
def add_errors(i, error, source, target, edges):
source_edge_ids = []
target_edge_ids = []
podtip = error.attrib['podtip'] if 'podtip' in error.attrib else ''
label = error.attrib['tip'] + '/' + podtip + '/' + error.attrib['kat']
labels = [label]
word_combination_L1 = ''
word_combination_L2 = None
word_combination_L3 = None
word_combination_L4 = None
word_combination_L5 = None
label_L2 = ''
label_L3 = ''
label_L4 = ''
label_L5 = ''
has_error = False
# solar5.7
for el in error:
if el.tag.startswith('w') or el.tag.startswith('pc'):
ind = str(i)
source_id = "s" + ind
source.append({"id": source_id, "text": el.text + " "})
source_edge_ids.append(source_id)
i += 1
elif el.tag.startswith('p'):
for p_el in el:
if p_el.tag.startswith('w') or p_el.tag.startswith('pc'):
ind = str(i)
target_id = "t" + ind
target.append({"id": target_id, "text": p_el.text + " "})
target_edge_ids.append(target_id)
word_combination_L1 += p_el.text + " "
i += 1
elif el.tag.startswith('u2'):
word_combination_L2 = ''
podtip = el.attrib['podtip'] if 'podtip' in el.attrib else ''
label_L2 = el.attrib['tip'] + '/' + podtip + '/' + el.attrib['kat']
for el_l2 in el:
if el_l2.tag.startswith('w') or el_l2.tag.startswith('pc'):
ind = str(i)
source_id = "s" + ind
source.append({"id": source_id, "text": el_l2.text + " "})
source_edge_ids.append(source_id)
i += 1
elif el_l2.tag.startswith('p'):
for p_el_l2 in el_l2:
if p_el_l2.tag.startswith('w') or p_el_l2.tag.startswith('pc'):
word_combination_L2 += p_el_l2.text + " "
elif el_l2.tag.startswith('u3'):
word_combination_L3 = ''
podtip = el_l2.attrib['podtip'] if 'podtip' in el_l2.attrib else ''
label_L3 = el_l2.attrib['tip'] + '/' + podtip + '/' + el_l2.attrib['kat']
for el_l3 in el_l2:
if el_l3.tag.startswith('w') or el_l3.tag.startswith('pc'):
ind = str(i)
source_id = "s" + ind
source.append({"id": source_id, "text": el_l3.text + " "})
source_edge_ids.append(source_id)
i += 1
elif el_l3.tag.startswith('p'):
for p_el_l3 in el_l3:
if p_el_l3.tag.startswith('w') or p_el_l3.tag.startswith('pc'):
word_combination_L3 += p_el_l3.text + " "
elif el_l3.tag.startswith('u4'):
word_combination_L4 = ''
podtip = el_l3.attrib['podtip'] if 'podtip' in el_l3.attrib else ''
label_L4 = el_l3.attrib['tip'] + '/' + podtip + '/' + el_l3.attrib['kat']
for el_l4 in el_l3:
if el_l4.tag.startswith('w') or el_l4.tag.startswith('pc'):
ind = str(i)
source_id = "s" + ind
source.append({"id": source_id, "text": el_l4.text + " "})
source_edge_ids.append(source_id)
i += 1
elif el_l4.tag.startswith('p'):
for p_el_l4 in el_l4:
if p_el_l4.tag.startswith('w') or p_el_l4.tag.startswith('pc'):
word_combination_L4 += p_el_l4.text + " "
elif el_l4.tag.startswith('u5'):
word_combination_L5 = ''
podtip = el_l4.attrib['podtip'] if 'podtip' in el_l4.attrib else ''
label_L5 = el_l4.attrib['tip'] + '/' + podtip + '/' + el_l4.attrib['kat']
for el_l5 in el_l4:
if el_l5.tag.startswith('w') or el_l5.tag.startswith('pc'):
ind = str(i)
source_id = "s" + ind
source.append({"id": source_id, "text": el_l5.text + " "})
source_edge_ids.append(source_id)
i += 1
elif el_l5.tag.startswith('p'):
for p_el_l5 in el_l5:
if p_el_l5.tag.startswith('w') or p_el_l5.tag.startswith('pc'):
word_combination_L5 += p_el_l5.text + " "
# TODO NOT SURE IF THIS SHOULD BE COMMENTED! IF IT IS NOT THERE ARE ERRORS ON 2ND lvl of errors, where some words are duplicated
# for p_el in el:
# if p_el.tag.startswith('w') or p_el.tag.startswith('pc'):
# ind = str(i)
#
# target_id = "t" + ind
# target.append({"id": target_id, "text": p_el.text + " "})
# target_edge_ids.append(target_id)
# i += 1
if word_combination_L1 == word_combination_L2 and word_combination_L2 is not None:
if label_L2 not in labels:
labels.append(label_L2)
else:
print(f"REPEATING LABEL - {label_L2} in {error.attrib['{http://www.w3.org/XML/1998/namespace}id']} - ID {i}")
if word_combination_L1 == word_combination_L3 and word_combination_L3 is not None:
if label_L3 not in labels:
labels.append(label_L3)
else:
print(f"REPEATING LABEL - {label_L3} in {error.attrib['{http://www.w3.org/XML/1998/namespace}id']} - ID {i}")
if word_combination_L1 == word_combination_L4 and word_combination_L4 is not None:
if label_L4 not in labels:
labels.append(label_L4)
else:
print(f"REPEATING LABEL - {label_L4} in {error.attrib['{http://www.w3.org/XML/1998/namespace}id']} - ID {i}")
if word_combination_L1 == word_combination_L5 and word_combination_L5 is not None:
if label_L5 not in labels:
labels.append(label_L5)
else:
print(f"REPEATING LABEL - {label_L5} in {error.attrib['{http://www.w3.org/XML/1998/namespace}id']} - ID {i}")
elif word_combination_L5 is not None:
has_error = True
elif word_combination_L4 is not None:
has_error = True
elif word_combination_L3 is not None:
has_error = True
elif word_combination_L2 is not None:
has_error = True
edge_ids = sorted(source_edge_ids) + sorted(target_edge_ids)
edge_id = "e-" + "-".join(edge_ids)
edges[edge_id] = {"id": edge_id, "ids": edge_ids, "labels": labels, "manual": True}
return i, has_error
def save_file(paragraph_error, output_folder_loc, error_folder_loc, paragraph, dictionary, essay_problematic, dictionary_i):
if not paragraph_error:
if not os.path.exists(output_folder_loc):
os.mkdir(output_folder_loc)
if not os.path.exists(error_folder_loc):
os.mkdir(error_folder_loc)
file_name = paragraph.attrib['{http://www.w3.org/XML/1998/namespace}id'] + '.json' if dictionary_i == 1 else paragraph.attrib['{http://www.w3.org/XML/1998/namespace}id'] + '_P' + str(dictionary_i) + '.json'
with open(os.path.join(output_folder_loc, file_name), 'w') as wf:
json.dump(dictionary, wf, ensure_ascii=False, indent="")
with open(os.path.join(error_folder_loc, file_name), 'w') as wf:
json.dump(dictionary, wf, ensure_ascii=False, indent="")
else:
essay_problematic = True
if not os.path.exists(error_folder_loc):
os.mkdir(error_folder_loc)
file_name = paragraph.attrib['{http://www.w3.org/XML/1998/namespace}id'] + '_problem.json' if dictionary_i == 1 else paragraph.attrib['{http://www.w3.org/XML/1998/namespace}id'] + '_P' + str(dictionary_i) + '_problem.json'
with open(os.path.join(error_folder_loc, file_name),
'w') as wf:
json.dump(dictionary, wf, ensure_ascii=False, indent="")
return essay_problematic
def process_file(et, args):
if os.path.exists(args.output_folder):
shutil.rmtree(args.output_folder)
if os.path.exists(args.error_folder):
shutil.rmtree(args.error_folder)
os.mkdir(args.output_folder)
os.mkdir(args.error_folder)
# folders_count = 5484
for i, div in enumerate(et.iter('div')):
bibl = div.find('bibl')
file_name = bibl.get('n')
file_name = file_name.replace('/', '_')
# print(f'{i * 100 / folders_count} % : {file_name}')
# if file_name == 'KUS-G-slo-1-LJ-E-2009_2010-10540':
# print('asd')
# else:
# continue
output_folder_loc = os.path.join(args.output_folder, file_name)
error_folder_loc = os.path.join(args.error_folder, file_name)
essay_problematic = False
paragraphs = div.findall('p')
for paragraph in paragraphs:
sentences = paragraph.findall('s')
i = 1
dictionary_i = 1
source = []
target = []
edges = {}
paragraph_error = False
for sentence in sentences:
for el in sentence:
if el.tag.startswith('w'):
add_token(str(i), el, source, target, edges)
i += 1
elif el.tag.startswith('pc'):
add_token(str(i), el, source, target, edges)
i += 1
elif el.tag.startswith('u'):
i, has_error = add_errors(i, el, source, target, edges)
if has_error:
paragraph_error = True
# add part of dictionary
if i > dictionary_i * 10000000000000:
essay_problematic = save_file(paragraph_error, output_folder_loc, error_folder_loc, paragraph, {"source": source, "target": target, "edges": edges}, essay_problematic, dictionary_i)
dictionary_i += 1
source = []
target = []
edges = {}
paragraph_error = False
essay_problematic = save_file(paragraph_error, output_folder_loc, error_folder_loc, paragraph, {"source": source, "target": target, "edges": edges}, essay_problematic, dictionary_i)
if not essay_problematic:
shutil.rmtree(error_folder_loc)
def main(args):
with open(args.input_file, 'r') as fp:
logging.info(args.input_file)
et = ElementTree.XML(fp.read())
process_file(et, args)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Read already processed xmls, erase entries without examples and limit gigafida examples to 1 per entry.')
parser.add_argument('--input_file', default='data/Solar2.0/solar2.xml',
help='input file in (gz or xml currently). If none, then just database is loaded')
parser.add_argument('--output_folder', default='data/solar.svala',
help='input file in (gz or xml currently). If none, then just database is loaded')
parser.add_argument('--error_folder', default='data/solar.svala.error',
help='input file in (gz or xml currently). If none, then just database is loaded')
args = parser.parse_args()
start = time.time()
main(args)
logging.info("TIME: {}".format(time.time() - start))