cjvt-srl-tagging/tools/parser/parser.py

446 lines
19 KiB
Python

import copy
from lxml import etree
import re
from parser.msd.msdmap import Msdmap
import pickle
from pathlib import Path
from fillpred_model.step1 import build_model_row
import sys
import xml.etree.ElementTree as ET
class Parser:
# reads a TEI xml file and returns a dictionary:
# { <sentence_id>: {
# sid: <sentence_id>, # serves as index in MongoDB
# text: ,
# tokens: ,
# }}
def __init__(self):
self.msdmap = Msdmap()
self.W_TAGS = ['w']
self.C_TAGS = ['c']
self.S_TAGS = ['S', 'pc']
try:
fp = Path("./fillpred_model/model.pickle").open("rb")
self.fillpred_model = pickle.load(fp)
except IOError:
print("Generate the model first: $ make tools/fillpred_mode/model.pickle")
sys.exit(1)
def parse_tei(self, filepath):
def parse_links(s_el):
sent_id = '#' + s_el.get('id')
lgrps = s_el.findall(".//linkGrp")
if len(lgrps) < 1:
raise IOError("Can't find links.")
res_links = {}
for lgrp in lgrps:
if lgrp.get("type") == "JOS-SYN":
for link in lgrp:
jos_type = link.get("ana").split(":")[-1]
link_data = link.get("target").split(" ")
link_from = int(link_data[1].split('.')[-1][1:])
link_to = int(link_data[0].split('.')[-1][1:]) if sent_id != link_data[0] else 0
res_links[link_from] = (
jos_type,
link_from,
link_to,
)
return res_links
guess_corpus = None # SSJ | KRES
res_dict = {}
with filepath.open("rb") as fp:
# remove namespaces
bstr = fp.read()
utf8str = bstr.decode("utf-8")
utf8str = re.sub('\\sxmlns="[^"]+"', '', utf8str, count=1)
utf8str = re.sub(' xml:', ' ', utf8str)
root = etree.XML(utf8str.encode("utf-8"))
divs = [] # in ssj, there are divs, in Kres, there are separate files
if "id" in root.keys():
# Kres files start with <TEI id=...>
if root.get("id")[0:2] == 'GF':
guess_corpus = "GIGA"
else:
guess_corpus = "KRES"
divs = [root]
else:
guess_corpus = "SSJ"
divs = root.findall(".//div")
# parse divs
for div in divs:
f_id = div.get("id")[:-6]
if guess_corpus == "GIGA":
div = div.findall(".//body")[0]
# parse paragraphs
for p in div.findall(".//p"):
p_id = p.get("id").split(".")[-1]
# parse sentences
for s in p.findall(".//s"):
# test if sentence has jos-syn annotations and doesn't have SRL
sent_annot_type_list = [links.get('type') for links in s.findall(".//linkGrp")]
if 'JOS-SYN' not in sent_annot_type_list or 'UD-SYN' not in sent_annot_type_list or 'SRL' in sent_annot_type_list:
continue
s_id = s.get("id").split(".")[-1]
sentence_text = ""
sentence_list = []
sentence_tokens = []
# parse tokens
for el in s.iter():
if el.tag in self.W_TAGS:
el_id = el.get("id").split(".")[-1]
if el_id[0] == 't':
el_id = el_id[1:] # ssj W_TAG ids start with t
sentence_text += el.text
uPosTag = None
uPosFeats = []
for msd_el in el.get("msd").split('|'):
key, val = msd_el.split('=')
if key == 'UPosTag':
uPosTag = val
else:
uPosFeats.append(msd_el)
uPosFeats = '|'.join(uPosFeats)
sentence_tokens += [(
"w",
int(el_id),
el.text,
el.get("lemma"),
(el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA"
else el.get("ana").split(":")[-1]),
uPosTag,
uPosFeats
)]
elif el.tag in self.C_TAGS:
# only Kres' C_TAGS have ids
if guess_corpus != "GIGA":
el_id = el.get("id") or "none"
el_id = el_id.split(".")[-1]
sentence_text += el.text
sentence_tokens += [("c", el_id, el.text,)]
elif el.tag in self.S_TAGS:
el_id = el.get("id").split(".")[-1]
if el_id[0] == 't':
el_id = el_id[1:] # ssj W_TAG ids start with t
sentence_text += el.text
uPosTag = None
uPosFeats = []
for msd_el in el.get("msd").split('|'):
key, val = msd_el.split('=')
if key == 'UPosTag':
uPosTag = val
else:
uPosFeats.append(msd_el)
uPosFeats = '|'.join(uPosFeats)
sentence_tokens += [(
"pc",
int(el_id),
el.text,
el.text,
(el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA"
else el.get("ana").split(":")[-1]),
uPosTag,
uPosFeats
)]
else:
# pass links and linkGroups
pass
sentence_id = s.get("id")
if sentence_id in res_dict:
raise KeyError("duplicated id: {}".format(sentence_id))
res_dict[sentence_id] = {
"sid": sentence_id,
"text": sentence_text,
"tokens": sentence_tokens,
"links": (
parse_links(s)
)
}
fp.close()
return res_dict
def minimize_tei(self, filepath, jsondata):
def set_xml_attr(node, attribute, value):
node.attrib['{http://www.w3.org/XML/1998/namespace}' + attribute] = value
def parse_links(s_el):
sent_id = '#' + s_el.get('id')
lgrps = s_el.findall(".//linkGrp")
if len(lgrps) < 1:
raise IOError("Can't find links.")
res_links = {}
for lgrp in lgrps:
if lgrp.get("type") == "JOS-SYN":
for link in lgrp:
jos_type = link.get("ana").split(":")[-1]
link_data = link.get("target").split(" ")
link_from = int(link_data[1].split('.')[-1][1:])
link_to = int(link_data[0].split('.')[-1][1:]) if sent_id != link_data[0] else 0
res_links[link_from] = (
jos_type,
link_from,
link_to,
)
return res_links
guess_corpus = None # SSJ | KRES
res_dict = {}
# with filepath.open("rb") as fp, open("../data/ssj500k2.3/final_tei/res.xml", 'w') as sf:
with filepath.open("rb") as fp:
used_ssj_documents = set([k.split('.')[0] for k, v in jsondata.items()])
used_ssj_paragraphs = set(['.'.join(k.split('.')[:-1]) for k, v in jsondata.items()])
used_ssj_sentences = set([k for k, v in jsondata.items()])
ET.register_namespace("", "http://www.tei-c.org/ns/1.0")
tree = ET.parse(fp)
root_res = tree.getroot()
# root_res = copy.deepcopy(root)
ns = '{http://www.w3.org/XML/1998/namespace}'
ns2 = '{http://www.tei-c.org/ns/1.0}'
for doc in list(root_res):
doc_id = doc.get(ns + 'id')
if doc_id not in used_ssj_documents:
root_res.remove(doc)
continue
for par in list(doc):
par_id = par.get(ns + 'id')
if par_id not in used_ssj_paragraphs:
if par.tag != ns2 + 'bibl':
doc.remove(par)
continue
for sen in list(par):
sen_id = sen.get(ns + 'id')
if sen_id not in used_ssj_sentences:
par.remove(sen)
continue
linkGrp = ET.Element(f'{ns2}linkGrp')
linkGrp.attrib[f'targFunc'] = 'head argument'
linkGrp.attrib[f'type'] = 'SRL'
for srl_el in jsondata[sen_id]:
link = ET.Element(f'{ns2}link')
link.attrib['ana'] = f'srl:{srl_el["arg"]}'
link.attrib['target'] = f'#{sen_id}.t{srl_el["from"]} #{sen_id}.t{srl_el["dep"]}'
linkGrp.append(link)
sen.append(linkGrp)
# <linkGrp corresp="#ssj1.1.1" targFunc="head argument" type="SRL">
# <link ana="srl:TIME" target="#ssj1.1.1.t6 #ssj1.1.1.t3"/>
# <link ana="srl:QUANT" target="#ssj1.1.1.t6 #ssj1.1.1.t5"/>
# <link ana="srl:TIME" target="#ssj1.1.1.t8 #ssj1.1.1.t11"/>
# <link ana="srl:PAT" target="#ssj1.1.1.t23 #ssj1.1.1.t21"/>
# <link ana="srl:ACT" target="#ssj1.1.1.t23 #ssj1.1.1.t22"/>
# <link ana="srl:RESLT" target="#ssj1.1.1.t18 #ssj1.1.1.t23"/>
# </linkGrp>
# print('aaa')
# sf.write(etree.tostring(tree, pretty_print=True, encoding='utf-8').decode())
tree.write("../data/ssj500k2.3/final_tei/res.xml", encoding='utf-8')
return
divs = [] # in ssj, there are divs, in Kres, there are separate files
if "id" in root.keys():
# Kres files start with <TEI id=...>
if root.get("id")[0:2] == 'GF':
guess_corpus = "GIGA"
else:
guess_corpus = "KRES"
divs = [root]
else:
guess_corpus = "SSJ"
divs = root.findall(".//div")
# parse divs
for div in divs:
f_id = div.get("id")
if guess_corpus == "GIGA":
div = div.findall(".//body")[0]
# parse paragraphs
for p in div.findall(".//p"):
p_id = p.get("id").split(".")[-1]
# parse sentences
for s in p.findall(".//s"):
# test if sentence has jos-syn annotations and doesn't have SRL
sent_annot_type_list = [links.get('type') for links in s.findall(".//linkGrp")]
if 'JOS-SYN' not in sent_annot_type_list or 'UD-SYN' not in sent_annot_type_list or 'SRL' in sent_annot_type_list:
del s
continue
s_id = s.get("id").split(".")[-1]
sentence_text = ""
sentence_list = []
sentence_tokens = []
# parse tokens
for el in s.iter():
if el.tag in self.W_TAGS:
el_id = el.get("id").split(".")[-1]
if el_id[0] == 't':
el_id = el_id[1:] # ssj W_TAG ids start with t
sentence_text += el.text
uPosTag = None
uPosFeats = []
for msd_el in el.get("msd").split('|'):
key, val = msd_el.split('=')
if key == 'UPosTag':
uPosTag = val
else:
uPosFeats.append(msd_el)
uPosFeats = '|'.join(uPosFeats)
sentence_tokens += [(
"w",
int(el_id),
el.text,
el.get("lemma"),
(el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA"
else el.get("ana").split(":")[-1]),
uPosTag,
uPosFeats
)]
elif el.tag in self.C_TAGS:
# only Kres' C_TAGS have ids
if guess_corpus != "GIGA":
el_id = el.get("id") or "none"
el_id = el_id.split(".")[-1]
sentence_text += el.text
sentence_tokens += [("c", el_id, el.text,)]
elif el.tag in self.S_TAGS:
el_id = el.get("id").split(".")[-1]
if el_id[0] == 't':
el_id = el_id[1:] # ssj W_TAG ids start with t
sentence_text += el.text
uPosTag = None
uPosFeats = []
for msd_el in el.get("msd").split('|'):
key, val = msd_el.split('=')
if key == 'UPosTag':
uPosTag = val
else:
uPosFeats.append(msd_el)
uPosFeats = '|'.join(uPosFeats)
sentence_tokens += [(
"pc",
int(el_id),
el.text,
el.text,
(el.get("msd") if guess_corpus == "KRES" or guess_corpus == "GIGA"
else el.get("ana").split(":")[-1]),
uPosTag,
uPosFeats
)]
else:
# pass links and linkGroups
pass
sentence_id = s.get("id")
if sentence_id in res_dict:
raise KeyError("duplicated id: {}".format(sentence_id))
res_dict[sentence_id] = {
"sid": sentence_id,
"text": sentence_text,
"tokens": sentence_tokens,
"links": (
parse_links(s)
)
}
et = etree.ElementTree(root)
et.write("../data/ssj500k2.3/final_tei/res.xml", pretty_print=True, encoding='unicode')
fp.close()
return res_dict
def to_conll_2009_SRL(self, sentence_entry):
def fillpred(tsv_row):
mrow = build_model_row(tsv_row)
x = mrow[:-1]
y = self.fillpred_model.predict([x])
return y[0] # bool
# works with kres, with parsed links
out_str = ""
for token in sentence_entry["tokens"]:
t_id = token[1]
form = token[2]
# handle stop signs
if token[0] != "w":
out_list = [t_id] + [form for x in range(7)] + ["0", "0", "modra", "modra", "_", "_"] + ["\n"]
out_str += '\t'.join(map(str, out_list))
continue
pos = self.msdmap.slo_msd_to_eng_pos(token[4])
feat = "|".join(self.msdmap.slo_msd_to_eng_long(token[4]).split(" "))
"""
print(t_id)
print("msd:")
print(msd)
print(token)
print(sentence_entry["links"])
"""
row_list = [
t_id,
form,
token[3], # lemma
token[3], # plemma
pos, # pos
pos, # ppos
feat, # feat
feat, # pfeat
sentence_entry["links"][t_id][2], # head
sentence_entry["links"][t_id][2], # phead
sentence_entry["links"][t_id][0], # deprel
sentence_entry["links"][t_id][0], # pdeprel
"_", # fillpred
"_", # pred
"\n",
]
fprd = fillpred(row_list)
row_list[12] = "Y" if fprd else "_"
row_list[13] = token[3] if fprd else "_"
# format: 14 + apreds
out_str += '\t'.join(map(str,
row_list
))
out_str += "\n" # newline at the end of sentence
return out_str
def to_conll_2009_full(self, sentence_entry):
out_str = ""
for token in sentence_entry["tokens"]:
t_id = token[1]
# 1 3
out_str += "{}\t{}\n".format(
t_id, # id
token[2], # form
)
out_str += "\n"
return out_str