197 lines
7.4 KiB
Python
197 lines
7.4 KiB
Python
|
from pathlib import Path
|
||
|
import re
|
||
|
import json
|
||
|
from lxml import etree
|
||
|
import logging
|
||
|
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
||
|
# Read input file(.xml, .json; kres or ssj500k).
|
||
|
# Create an iterator that outputs resulting sentences (python dict format).
|
||
|
class Parser():
|
||
|
|
||
|
def __init__(self, corpus, infiles, logger=None):
|
||
|
|
||
|
if corpus == "kres":
|
||
|
self.kres_folder = Path(infiles[0])
|
||
|
self.kres_srl_folder = Path(infiles[1])
|
||
|
elif corpus == "ssj":
|
||
|
self.ssj_file = Path(infiles[0])
|
||
|
else:
|
||
|
raise ValueError("Argument corpus should be 'ssj' or 'kres'.")
|
||
|
|
||
|
self.corpus = corpus
|
||
|
self.W_TAGS = ['w']
|
||
|
self.C_TAGS = ['c']
|
||
|
self.S_TAGS = ['S', 'pc']
|
||
|
self.logger = logger or logging.getLogger(__name__)
|
||
|
self.stats = {
|
||
|
"parsed_count": 0,
|
||
|
"missing_srl": []
|
||
|
}
|
||
|
|
||
|
def parse_jos_links(self, sent_el):
|
||
|
if self.corpus == "kres":
|
||
|
return self.parse_jos_links_kres(sent_el)
|
||
|
else:
|
||
|
# 'syntax' is the linkgroup we're looking for
|
||
|
return self.parse_any_links_ssj(sent_el, "syntax")
|
||
|
|
||
|
def parse_jos_links_kres(self, sent_el):
|
||
|
lgrps = sent_el.findall(".//links")
|
||
|
if len(lgrps) < 1:
|
||
|
raise IOError("Can't find links.")
|
||
|
res_links = []
|
||
|
for link in lgrps[0]:
|
||
|
res_links += [{
|
||
|
"from": int(link.get("from").split(".")[-1]),
|
||
|
"afun": link.get("afun"),
|
||
|
"to": int(link.get("dep").split(".")[-1]),
|
||
|
}]
|
||
|
return res_links
|
||
|
|
||
|
def parse_ssj_target_arg(self, text):
|
||
|
# from: 0, to: 6
|
||
|
# <link ana="syn:modra" target="#ssj1.1.3 #ssj1.1.3.t6"/>
|
||
|
# from: 6, to: 7
|
||
|
# <link ana="syn:dol" target="#ssj1.1.3.t6 #ssj1.1.3.t7"/>
|
||
|
lst = [x.split(".")[-1] for x in text.split(" ")]
|
||
|
return [int(x[1:] if x[0] == "t" else 0) for x in lst]
|
||
|
|
||
|
def parse_any_links_ssj(self, sent_el, links_type):
|
||
|
lgrps = sent_el.findall(".//linkGrp")
|
||
|
links = [x for x in lgrps if x.get("type") == links_type][0]
|
||
|
res_links = []
|
||
|
for link in links:
|
||
|
tar = self.parse_ssj_target_arg(link.get("target"))
|
||
|
res_links += [{
|
||
|
"from": tar[0],
|
||
|
"afun": link.get("ana").split(":")[1],
|
||
|
"to": tar[1],
|
||
|
}]
|
||
|
return res_links
|
||
|
|
||
|
def parse_srl_links(self, sent_el, sent_srl_links=None):
|
||
|
if self.corpus == "kres":
|
||
|
return self.parse_srl_links_kres(sent_el, sent_srl_links)
|
||
|
else:
|
||
|
return self.parse_any_links_ssj(sent_el, "SRL")
|
||
|
|
||
|
def parse_srl_links_kres(self, sent_el, sent_srl_links):
|
||
|
res_links = []
|
||
|
for link in sent_srl_links:
|
||
|
res_links += [{
|
||
|
"from": int(link["from"]),
|
||
|
"afun": link["arg"],
|
||
|
"to": int(link["dep"]),
|
||
|
}]
|
||
|
# find the correspointing json file with srl links
|
||
|
return res_links
|
||
|
|
||
|
def sentence_generator(self):
|
||
|
# Using generators so we don't copy a whole corpu around in memory.
|
||
|
if self.corpus == "kres":
|
||
|
for xml_file in self.kres_folder.iterdir():
|
||
|
# self.parse_xml_file(xml_file)
|
||
|
yield from self.parse_xml_file(xml_file)
|
||
|
else:
|
||
|
yield from self.parse_xml_file(self.ssj_file)
|
||
|
|
||
|
def parse_xml_file(self, xml_file):
|
||
|
srl_from_json = {}
|
||
|
if self.corpus == "kres":
|
||
|
# in case of kres, read the SRL links form a separate json file
|
||
|
file_id = xml_file.name.split(".")[0]
|
||
|
json_file = self.kres_srl_folder / Path(file_id).with_suffix(".srl.json")
|
||
|
with json_file.open("r") as fp:
|
||
|
srl_from_json = json.loads(fp.read())
|
||
|
|
||
|
with xml_file.open("rb") as fp:
|
||
|
# remove namespaces
|
||
|
bstr = fp.read()
|
||
|
|
||
|
utf8str = bstr.decode("utf-8")
|
||
|
utf8str = re.sub('\\sxmlns="[^"]+"', '', utf8str, count=1)
|
||
|
utf8str = re.sub(' xml:', ' ', utf8str)
|
||
|
|
||
|
root = etree.XML(utf8str.encode("utf-8"))
|
||
|
|
||
|
divs = [] # in ssj, there are divs, in Kres, there are separate files
|
||
|
if self.corpus == "kres":
|
||
|
divs = [root]
|
||
|
else:
|
||
|
divs = root.findall(".//div")
|
||
|
|
||
|
res_dict = {}
|
||
|
|
||
|
# parse divs
|
||
|
for div in divs:
|
||
|
f_id = div.get("id")
|
||
|
|
||
|
# parse paragraphs
|
||
|
for p in div.findall(".//p"):
|
||
|
p_id = p.get("id").split(".")[-1]
|
||
|
|
||
|
# parse sentences
|
||
|
for s in p.findall(".//s"):
|
||
|
s_id = s.get("id").split(".")[-1]
|
||
|
sentence_text = ""
|
||
|
sentence_tokens = []
|
||
|
|
||
|
# parse tokens
|
||
|
for el in s.iter():
|
||
|
if el.tag in self.W_TAGS:
|
||
|
el_id = el.get("id").split(".")[-1]
|
||
|
if el_id[0] == 't':
|
||
|
el_id = el_id[1:] # ssj W_TAG ids start with t
|
||
|
sentence_text += el.text
|
||
|
sentence_tokens += [{
|
||
|
"word": True,
|
||
|
"tid": int(el_id),
|
||
|
"text": el.text,
|
||
|
"lemma": el.get("lemma"),
|
||
|
"msd": (el.get("msd") if self.corpus == "kres"
|
||
|
else el.get("ana").split(":")[-1]),
|
||
|
}]
|
||
|
elif el.tag in self.C_TAGS:
|
||
|
# only Kres' C_TAGS have ids
|
||
|
el_id = el.get("id") or "none"
|
||
|
el_id = el_id.split(".")[-1]
|
||
|
sentence_text += el.text
|
||
|
sentence_tokens += [{
|
||
|
"word": False,
|
||
|
"tid": (int(el_id) if self.corpus == "kres" else -1),
|
||
|
"text": el.text,
|
||
|
}]
|
||
|
elif el.tag in self.S_TAGS:
|
||
|
# Kres' <S /> doesn't contain .text
|
||
|
sentence_text += " "
|
||
|
else:
|
||
|
# pass links and linkGroups
|
||
|
pass
|
||
|
sentence_id = "{}.{}.{}".format(f_id, p_id, s_id)
|
||
|
|
||
|
jos_links = self.parse_jos_links(s)
|
||
|
|
||
|
if self.corpus == "kres":
|
||
|
srl_links_raw = srl_from_json.get(sentence_id)
|
||
|
if srl_links_raw is None:
|
||
|
srl_links_parsed = None
|
||
|
self.stats["missing_srl"] += [(sentence_id, sentence_text)]
|
||
|
else:
|
||
|
srl_links_parsed = self.parse_srl_links(s, srl_links_raw)
|
||
|
else:
|
||
|
srl_links_parsed = self.parse_srl_links(s)
|
||
|
if len(srl_links_parsed) == 0:
|
||
|
self.stats["missing_srl"] += [(sentence_id, sentence_text)]
|
||
|
|
||
|
sentence_entry = {
|
||
|
"sid": sentence_id,
|
||
|
"text": sentence_text,
|
||
|
"tokens": sentence_tokens,
|
||
|
"jos_links": jos_links,
|
||
|
"srl_links": srl_links_parsed
|
||
|
}
|
||
|
self.stats["parsed_count"] += 1
|
||
|
yield (xml_file, sentence_entry)
|