from pathlib import Path import re import json from lxml import etree import logging import time logging.basicConfig(level=logging.INFO) # Read input file(.xml, .json; kres or ssj500k). # Create an iterator that outputs resulting sentences (python dict format). class Parser(): def __init__(self, corpus, kres_srl_folder=None, logger=None): self.corpus = corpus if self.corpus == "kres": self.kres_srl_folder = kres_srl_folder self.W_TAGS = ['w'] self.C_TAGS = ['c'] self.S_TAGS = ['S', 'pc'] self.logger = logger or logging.getLogger(__name__) self.stats = { "parsed_count": 0, "missing_srl": [] } # for logging output self.n_kres_files = -1 self.nth_kres_file = -1 def parse_jos_links(self, sent_el): if self.corpus == "kres": return self.parse_jos_links_kres(sent_el) else: # 'syntax' is the linkgroup we're looking for return self.parse_any_links_ssj(sent_el, "syntax") def parse_jos_links_kres(self, sent_el): lgrps = sent_el.findall(".//links") if len(lgrps) < 1: raise IOError("Can't find links.") res_links = [] for link in lgrps[0]: res_links += [{ "from": int(link.get("from").split(".")[-1]), "afun": link.get("afun"), "to": int(link.get("dep").split(".")[-1]), }] return res_links def parse_ssj_target_arg(self, text): # from: 0, to: 6 # # from: 6, to: 7 # lst = [x.split(".")[-1] for x in text.split(" ")] return [int(x[1:] if x[0] == "t" else 0) for x in lst] def parse_any_links_ssj(self, sent_el, links_type): lgrps = sent_el.findall(".//linkGrp") links = [x for x in lgrps if x.get("type") == links_type][0] res_links = [] for link in links: tar = self.parse_ssj_target_arg(link.get("target")) res_links += [{ "from": tar[0], "afun": link.get("ana").split(":")[1], "to": tar[1], }] return res_links def parse_srl_links(self, sent_el, sent_srl_links=None): if self.corpus == "kres": return self.parse_srl_links_kres(sent_el, sent_srl_links) else: return self.parse_any_links_ssj(sent_el, "SRL") def parse_srl_links_kres(self, sent_el, sent_srl_links): res_links = [] for link in sent_srl_links: res_links += [{ "from": int(link["from"]), "afun": link["arg"], "to": int(link["dep"]), }] # find the correspointing json file with srl links return res_links def sentence_generator(self): # Using generators so we don't copy a whole corpu around in memory. # Use parse_xml_file() instead for pre-file processing (parallelism?) if self.corpus == "kres": # some logging output if self.n_kres_files == -1: self.n_kres_files = len(list(Path(self.kres_folder).glob('*'))) for xml_file in self.kres_folder.iterdir(): self.nth_kres_file += 1 self.logger.info("{} ({}/{})".format( xml_file, self.nth_kres_file, self.n_kres_files)) yield from self.xml_file_to_generator(xml_file) else: yield from self.xml_file_to_generator(self.ssj_file) def parse_xml_file(self, xml_file): # tstart = time.time() file_data = [] for tpl in self.xml_file_to_generator(xml_file): file_data += [tpl[1]] tend = time.time() # self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart)) return file_data def xml_file_to_generator(self, xml_file): # for separate srl links, it will guess the srl file based on # self.kres_srl_folder srl_from_json = {} if self.corpus == "kres": # in case of kres, read the SRL links form a separate json file file_id = xml_file.name.split(".")[0] json_file = self.kres_srl_folder / Path(file_id).with_suffix(".srl.json") with json_file.open("r") as fp: srl_from_json = json.loads(fp.read()) with xml_file.open("rb") as fp: # remove namespaces bstr = fp.read() utf8str = bstr.decode("utf-8") utf8str = re.sub('\\sxmlns="[^"]+"', '', utf8str, count=1) utf8str = re.sub(' xml:', ' ', utf8str) root = etree.XML(utf8str.encode("utf-8")) divs = [] # in ssj, there are divs, in Kres, there are separate files if self.corpus == "kres": divs = [root] else: divs = root.findall(".//div") res_dict = {} # parse divs for div in divs: f_id = div.get("id") # parse paragraphs for p in div.findall(".//p"): p_id = p.get("id").split(".")[-1] # parse sentences for s in p.findall(".//s"): s_id = s.get("id").split(".")[-1] sentence_text = "" sentence_tokens = [] # parse tokens for el in s.iter(): if el.tag in self.W_TAGS: el_id = el.get("id").split(".")[-1] if el_id[0] == 't': el_id = el_id[1:] # ssj W_TAG ids start with t sentence_text += el.text sentence_tokens += [{ "word": True, "tid": int(el_id), "text": el.text, "lemma": el.get("lemma"), "msd": (el.get("msd") if self.corpus == "kres" else el.get("ana").split(":")[-1]), }] elif el.tag in self.C_TAGS: # only Kres' C_TAGS have ids el_id = el.get("id") or "none" el_id = el_id.split(".")[-1] sentence_text += el.text sentence_tokens += [{ "word": False, "tid": (int(el_id) if self.corpus == "kres" else -1), "text": el.text, }] elif el.tag in self.S_TAGS: # Kres' doesn't contain .text sentence_text += " " else: # pass links and linkGroups pass sentence_id = "{}.{}.{}".format(f_id, p_id, s_id) jos_links = self.parse_jos_links(s) if self.corpus == "kres": srl_links_raw = srl_from_json.get(sentence_id) if srl_links_raw is None: srl_links_parsed = None self.stats["missing_srl"] += [(sentence_id, sentence_text)] else: srl_links_parsed = self.parse_srl_links(s, srl_links_raw) else: srl_links_parsed = self.parse_srl_links(s) if len(srl_links_parsed) == 0: self.stats["missing_srl"] += [(sentence_id, sentence_text)] sentence_entry = { "sid": sentence_id, "text": sentence_text, "tokens": sentence_tokens, "jos_links": jos_links, "srl_links": srl_links_parsed, } self.stats["parsed_count"] += 1 yield (xml_file, sentence_entry)