import copy from lxml import etree import re from parser.msd.msdmap import Msdmap import pickle from pathlib import Path from fillpred_model.step1 import build_model_row import sys import xml.etree.ElementTree as ET class Parser: # reads a TEI xml file and returns a dictionary: # { : { # sid: , # serves as index in MongoDB # text: , # tokens: , # }} def __init__(self): self.msdmap = Msdmap() self.W_TAGS = ['w'] self.C_TAGS = ['c'] self.S_TAGS = ['S', 'pc'] try: fp = Path("./fillpred_model/model.pickle").open("rb") self.fillpred_model = pickle.load(fp) except IOError: print("Generate the model first: $ make tools/fillpred_mode/model.pickle") sys.exit(1) def parse_tei(self, filepath): def parse_links(s_el): sent_id = '#' + s_el.get('id') lgrps = s_el.findall(".//linkGrp") if len(lgrps) < 1: raise IOError("Can't find links.") res_links = {} for lgrp in lgrps: if lgrp.get("type") == "JOS-SYN": for link in lgrp: jos_type = link.get("ana").split(":")[-1] link_data = link.get("target").split(" ") link_from = int(link_data[1].split('.')[-1][1:]) link_to = int(link_data[0].split('.')[-1][1:]) if sent_id != link_data[0] else 0 res_links[link_from] = ( jos_type, link_from, link_to, ) return res_links guess_corpus = None # SSJ | KRES res_dict = {} with filepath.open("rb") as fp: # remove namespaces bstr = fp.read() utf8str = bstr.decode("utf-8") utf8str = re.sub('\\sxmlns="[^"]+"', '', utf8str, count=1) utf8str = re.sub(' xml:', ' ', utf8str) root = etree.XML(utf8str.encode("utf-8")) divs = [] # in ssj, there are divs, in Kres, there are separate files if "id" in root.keys(): # Kres files start with if root.get("id")[0:2] == 'GF': guess_corpus = "GIGA" else: guess_corpus = "KRES" divs = [root] else: guess_corpus = "SSJ" divs = root.findall(".//div") # parse divs for div in divs: f_id = div.get("id")[:-6] if guess_corpus == "GIGA": div = div.findall(".//body")[0] # parse paragraphs for p in div.findall(".//p"): p_id = p.get("id").split(".")[-1] # parse sentences for s in p.findall(".//s"): # test if sentence has jos-syn annotations and doesn't have SRL sent_annot_type_list = [links.get('type') for links in s.findall(".//linkGrp")] if 'JOS-SYN' not in sent_annot_type_list or 'UD-SYN' not in sent_annot_type_list or 'SRL' in sent_annot_type_list: continue s_id = s.get("id").split(".")[-1] sentence_text = "" sentence_list = [] sentence_tokens = [] # parse tokens for el in s.iter(): if el.tag in self.W_TAGS: el_id = el.get("id").split(".")[-1] if el_id[0] == 't': el_id = el_id[1:] # ssj W_TAG ids start with t sentence_text += el.text uPosTag = None uPosFeats = [] for msd_el in el.get("msd").split('|'): key, val = msd_el.split('=') if key == 'UPosTag': uPosTag = val else: uPosFeats.append(msd_el) uPosFeats = '|'.join(uPosFeats) sentence_tokens += [( "w", int(el_id), el.text, el.get("lemma"), (el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA" else el.get("ana").split(":")[-1]), uPosTag, uPosFeats )] elif el.tag in self.C_TAGS: # only Kres' C_TAGS have ids if guess_corpus != "GIGA": el_id = el.get("id") or "none" el_id = el_id.split(".")[-1] sentence_text += el.text sentence_tokens += [("c", el_id, el.text,)] elif el.tag in self.S_TAGS: el_id = el.get("id").split(".")[-1] if el_id[0] == 't': el_id = el_id[1:] # ssj W_TAG ids start with t sentence_text += el.text uPosTag = None uPosFeats = [] for msd_el in el.get("msd").split('|'): key, val = msd_el.split('=') if key == 'UPosTag': uPosTag = val else: uPosFeats.append(msd_el) uPosFeats = '|'.join(uPosFeats) sentence_tokens += [( "pc", int(el_id), el.text, el.text, (el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA" else el.get("ana").split(":")[-1]), uPosTag, uPosFeats )] else: # pass links and linkGroups pass sentence_id = s.get("id") if sentence_id in res_dict: raise KeyError("duplicated id: {}".format(sentence_id)) res_dict[sentence_id] = { "sid": sentence_id, "text": sentence_text, "tokens": sentence_tokens, "links": ( parse_links(s) ) } fp.close() return res_dict def minimize_tei(self, filepath, jsondata): def set_xml_attr(node, attribute, value): node.attrib['{http://www.w3.org/XML/1998/namespace}' + attribute] = value def parse_links(s_el): sent_id = '#' + s_el.get('id') lgrps = s_el.findall(".//linkGrp") if len(lgrps) < 1: raise IOError("Can't find links.") res_links = {} for lgrp in lgrps: if lgrp.get("type") == "JOS-SYN": for link in lgrp: jos_type = link.get("ana").split(":")[-1] link_data = link.get("target").split(" ") link_from = int(link_data[1].split('.')[-1][1:]) link_to = int(link_data[0].split('.')[-1][1:]) if sent_id != link_data[0] else 0 res_links[link_from] = ( jos_type, link_from, link_to, ) return res_links guess_corpus = None # SSJ | KRES res_dict = {} # with filepath.open("rb") as fp, open("../data/ssj500k2.3/final_tei/res.xml", 'w') as sf: with filepath.open("rb") as fp: used_ssj_documents = set([k.split('.')[0] for k, v in jsondata.items()]) used_ssj_paragraphs = set(['.'.join(k.split('.')[:-1]) for k, v in jsondata.items()]) used_ssj_sentences = set([k for k, v in jsondata.items()]) ET.register_namespace("", "http://www.tei-c.org/ns/1.0") tree = ET.parse(fp) root_res = tree.getroot() # root_res = copy.deepcopy(root) ns = '{http://www.w3.org/XML/1998/namespace}' ns2 = '{http://www.tei-c.org/ns/1.0}' for doc in list(root_res): doc_id = doc.get(ns + 'id') if doc_id not in used_ssj_documents: root_res.remove(doc) continue for par in list(doc): par_id = par.get(ns + 'id') if par_id not in used_ssj_paragraphs: if par.tag != ns2 + 'bibl': doc.remove(par) continue for sen in list(par): sen_id = sen.get(ns + 'id') if sen_id not in used_ssj_sentences: par.remove(sen) continue linkGrp = ET.Element(f'{ns2}linkGrp') linkGrp.attrib[f'targFunc'] = 'head argument' linkGrp.attrib[f'type'] = 'SRL' for srl_el in jsondata[sen_id]: link = ET.Element(f'{ns2}link') link.attrib['ana'] = f'srl:{srl_el["arg"]}' link.attrib['target'] = f'#{sen_id}.t{srl_el["from"]} #{sen_id}.t{srl_el["dep"]}' linkGrp.append(link) sen.append(linkGrp) # # # # # # # # # print('aaa') # sf.write(etree.tostring(tree, pretty_print=True, encoding='utf-8').decode()) tree.write("../data/ssj500k2.3/final_tei/res.xml", encoding='utf-8') return divs = [] # in ssj, there are divs, in Kres, there are separate files if "id" in root.keys(): # Kres files start with if root.get("id")[0:2] == 'GF': guess_corpus = "GIGA" else: guess_corpus = "KRES" divs = [root] else: guess_corpus = "SSJ" divs = root.findall(".//div") # parse divs for div in divs: f_id = div.get("id") if guess_corpus == "GIGA": div = div.findall(".//body")[0] # parse paragraphs for p in div.findall(".//p"): p_id = p.get("id").split(".")[-1] # parse sentences for s in p.findall(".//s"): # test if sentence has jos-syn annotations and doesn't have SRL sent_annot_type_list = [links.get('type') for links in s.findall(".//linkGrp")] if 'JOS-SYN' not in sent_annot_type_list or 'UD-SYN' not in sent_annot_type_list or 'SRL' in sent_annot_type_list: del s continue s_id = s.get("id").split(".")[-1] sentence_text = "" sentence_list = [] sentence_tokens = [] # parse tokens for el in s.iter(): if el.tag in self.W_TAGS: el_id = el.get("id").split(".")[-1] if el_id[0] == 't': el_id = el_id[1:] # ssj W_TAG ids start with t sentence_text += el.text uPosTag = None uPosFeats = [] for msd_el in el.get("msd").split('|'): key, val = msd_el.split('=') if key == 'UPosTag': uPosTag = val else: uPosFeats.append(msd_el) uPosFeats = '|'.join(uPosFeats) sentence_tokens += [( "w", int(el_id), el.text, el.get("lemma"), (el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA" else el.get("ana").split(":")[-1]), uPosTag, uPosFeats )] elif el.tag in self.C_TAGS: # only Kres' C_TAGS have ids if guess_corpus != "GIGA": el_id = el.get("id") or "none" el_id = el_id.split(".")[-1] sentence_text += el.text sentence_tokens += [("c", el_id, el.text,)] elif el.tag in self.S_TAGS: el_id = el.get("id").split(".")[-1] if el_id[0] == 't': el_id = el_id[1:] # ssj W_TAG ids start with t sentence_text += el.text uPosTag = None uPosFeats = [] for msd_el in el.get("msd").split('|'): key, val = msd_el.split('=') if key == 'UPosTag': uPosTag = val else: uPosFeats.append(msd_el) uPosFeats = '|'.join(uPosFeats) sentence_tokens += [( "pc", int(el_id), el.text, el.text, (el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA" else el.get("ana").split(":")[-1]), uPosTag, uPosFeats )] else: # pass links and linkGroups pass sentence_id = s.get("id") if sentence_id in res_dict: raise KeyError("duplicated id: {}".format(sentence_id)) res_dict[sentence_id] = { "sid": sentence_id, "text": sentence_text, "tokens": sentence_tokens, "links": ( parse_links(s) ) } et = etree.ElementTree(root) et.write("../data/ssj500k2.3/final_tei/res.xml", pretty_print=True, encoding='unicode') fp.close() return res_dict def to_conll_2009_SRL(self, sentence_entry): def fillpred(tsv_row): mrow = build_model_row(tsv_row) x = mrow[:-1] y = self.fillpred_model.predict([x]) return y[0] # bool # works with kres, with parsed links out_str = "" for token in sentence_entry["tokens"]: t_id = token[1] form = token[2] # handle stop signs if token[0] != "w": out_list = [t_id] + [form for x in range(7)] + ["0", "0", "modra", "modra", "_", "_"] + ["\n"] out_str += '\t'.join(map(str, out_list)) continue pos = self.msdmap.slo_msd_to_eng_pos(token[4]) feat = "|".join(self.msdmap.slo_msd_to_eng_long(token[4]).split(" ")) """ print(t_id) print("msd:") print(msd) print(token) print(sentence_entry["links"]) """ row_list = [ t_id, form, token[3], # lemma token[3], # plemma pos, # pos pos, # ppos feat, # feat feat, # pfeat sentence_entry["links"][t_id][2], # head sentence_entry["links"][t_id][2], # phead sentence_entry["links"][t_id][0], # deprel sentence_entry["links"][t_id][0], # pdeprel "_", # fillpred "_", # pred "\n", ] fprd = fillpred(row_list) row_list[12] = "Y" if fprd else "_" row_list[13] = token[3] if fprd else "_" # format: 14 + apreds out_str += '\t'.join(map(str, row_list )) out_str += "\n" # newline at the end of sentence return out_str def to_conll_2009_full(self, sentence_entry): out_str = "" for token in sentence_entry["tokens"]: t_id = token[1] # 1 3 out_str += "{}\t{}\n".format( t_id, # id token[2], # form ) out_str += "\n" return out_str