You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
luscenje_struktur/wani.py

882 lines
25 KiB

from xml.etree import ElementTree
import re
from enum import Enum
from collections import defaultdict
import sys
import logging
from msd_translate import MSD_TRANSLATE
MAX_NUM_COMPONENTS = 5
STAVKI = sys.argv[1]
STRUKTURE = sys.argv[2]
FILE_OUT = sys.argv[3]
CODES = {
"Noun": "N",
"Verb": "V",
"Adjective": "A",
"Adverb": "R",
"Pronoun": "P",
"Numeral": "M",
"Preposition": "S",
"Conjunction": "C",
"Particle": "Q",
"Interjection": "I",
"Abbreviation": "Y",
"Residual": "X",
'common': 'c',
'proper': 'p',
'masculine': 'm',
'feminine': 'f',
'neuter': 'n',
"singular": "s",
"dual": "d",
"plural": "p",
"nominative": "n",
"genitive": "g",
"dative": "d",
"accusative": "a",
"locative": "l",
"instrumental": "i",
"no": "n",
"yes": "y",
"main": "m",
"auxiliary": "a",
"perfective": "e",
"progressive": "p",
"biaspectual": "b",
"infinitive": "n",
"supine": "u",
"participle": "p",
"present": "r",
"future": "f",
"conditional": "c",
"imperative": "m",
"first": "1",
"second": "2",
"third": "3",
"general": "g",
"possessive": "s",
"positive": "p",
"comparative": "c",
"superlative": "s",
"personal": "p",
"demonstrative": "d",
"relative": "r",
"reflexive": "x",
"interrogative": "q",
"indefinite": "i",
"negative": "z",
"bound": "b",
"digit": "d",
"roman": "r",
"letter": "l",
"cardinal": "c",
"ordinal": "o",
"pronominal": "p",
"special": "s",
"coordinating": "c",
"subordinating": "s",
"foreign": "f",
"typo": "t",
"program": "p",
}
TAGSET = {
"N": ['type', 'gender', 'number', 'case', 'animate'],
"V": ['type', 'aspect', 'vform', 'person', 'number', 'gender', 'negative'],
"A": ['type', 'degree', 'gender', 'number', 'case', 'definiteness'],
"R": ['type', 'degree'],
"P": ['type', 'person', 'gender', 'number', 'case', 'owner_number', 'owned_gender', 'clitic'],
"M": ['form', 'type', 'gender', 'number', 'case', 'definiteness'],
"S": ['case'],
"C": ['type'],
"Q": [],
"I": [],
"Y": [],
"X": ['type']
}
CATEGORY_BASES = {
"N": ['.'] * 5,
"V": ['.'] * 7,
"A": ['.'] * 6,
"R": ['.'] * 2,
"P": ['.'] * 6,
"M": ['.'] * 6,
"S": ['.'] * 1,
"C": ['.'] * 1,
"Q": [],
"I": [],
"Y": [],
"X": ['.'] * 1
}
class RestrictionType(Enum):
Morphology = 0
Lexis = 1
MatchAll = 2
class Rendition(Enum):
Lemma = 0
WordForm = 1
Unknown = 2
class Order(Enum):
FromTo = 0
ToFrom = 1
Any = 2
@staticmethod
def new(order):
if order is not None:
if order == "to-from":
return Order.ToFrom
elif order == "from-to":
return Order.FromTo
else:
raise NotImplementedError("What kind of ordering is: {}".format(order))
else:
return Order.Any
def match(self, from_w, to_w):
if self is Order.Any:
return True
fi = int(from_w.id.split('.')[-1][1:])
ti = int(to_w.id.split('.')[-1][1:])
if self is Order.FromTo:
return fi < ti
elif self is Order.ToFrom:
return ti < fi
else:
raise NotImplementedError("Should not be here: Order match")
class ComponentRendition:
def __init__(self, rendition=Rendition.Unknown):
self.word_form = {}
self.rendition = rendition
def render(self, word):
if self.rendition == Rendition.Lemma:
return word.lemma
elif self.rendition == Rendition.WordForm:
return word.text
elif self.rendition == Rendition.Unknown:
return None
else:
raise RuntimeError("Unknown rendition: {}".format(self.rendition))
def __str__(self):
return str(self.rendition)
# dont know...
class StructureSelection(Enum):
All = 0
Frequency = 1
class ComponentRepresentation:
def new(s):
if 'rendition' in s:
if s['rendition'] == "lemma":
return ComponentRendition(Rendition.Lemma)
elif s['rendition'] == "word_form":
return ComponentRendition(Rendition.WordForm)
else:
raise NotImplementedError("Rendition: {}".format(s))
elif 'selection' in s:
if s['selection'] == "frequency":
return StructureSelection.Frequency
elif s['selection'] == "all":
return StructureSelection.All
else:
return {s['selection']: s['value']}
else:
return None
class ComponentStatus(Enum):
Optional = 0
Required = 1
Forbidden = 2
def __str__(self):
if self == ComponentStatus.Optional:
return "?"
elif self == ComponentStatus.Required:
return "!"
else: #Forbidden
return "X"
def get_level(restriction):
for feature in restriction:
if "level" in feature.keys():
lvl = feature.get("level")
else:
continue
raise RuntimeError("Unreachable!")
def build_morphology_regex(restriction):
restr_dict = {}
for feature in restriction:
feature_dict = dict(feature.items())
match_type = True
if "filter" in feature_dict:
assert(feature_dict['filter'] == "negative")
match_type = False
del feature_dict['filter']
assert(len(feature_dict) == 1)
key, value = next(iter(feature_dict.items()))
restr_dict[key] = (value, match_type)
assert('POS' in restr_dict)
category = restr_dict['POS'][0].capitalize()
cat_code = CODES[category]
rgx = [cat_code] + CATEGORY_BASES[cat_code]
del restr_dict['POS']
min_msd_length = 1
for attribute, (value, typ) in restr_dict.items():
index = TAGSET[cat_code].index(attribute.lower())
assert(index >= 0)
if '|' in value:
match = "".join(CODES[val] for val in value.split('|'))
else:
match = CODES[value]
match = "[{}{}]".format("" if typ else "^", match)
rgx[index + 1] = match
if typ:
min_msd_length = max(index + 1, min_msd_length)
def matcher(text):
if len(text) <= min_msd_length:
return False
for c, r in zip(text, rgx):
if not re.match(r, c):
return False
return True
return " ".join(rgx), matcher
def build_lexis_regex(restriction):
restr_dict = {}
for feature in restriction:
restr_dict.update(feature.items())
assert("lemma" in restr_dict)
match_list = restr_dict['lemma'].split('|')
return match_list, lambda text: text in match_list
class Restriction:
def __init__(self, restriction_tag):
if restriction_tag is None:
self.type = RestrictionType.MatchAll
self.matcher = None
self.present = None
return
restriction_type = restriction_tag.get('type')
if restriction_type == "morphology":
self.type = RestrictionType.Morphology
self.present, self.matcher = build_morphology_regex(list(restriction_tag))
elif restriction_type == "lexis":
self.type = RestrictionType.Lexis
self.present, self.matcher = build_lexis_regex(list(restriction_tag))
else:
raise NotImplementedError()
def match(self, word):
if self.type == RestrictionType.Morphology:
match_to = word.msd
elif self.type == RestrictionType.Lexis:
match_to = word.lemma
elif self.type == RestrictionType.MatchAll:
return True
else:
raise RuntimeError("Unreachable!")
return self.matcher(match_to)
def __str__(self):
return "({:s} {})".format(str(self.type).split('.')[1], self.present)
def __repr__(self):
return str(self)
class Component:
def __init__(self, info):
idx = info['cid']
name = info['name'] if 'name' in info else None
if 'status' not in info:
status = ComponentStatus.Required
elif info['status'] == 'forbidden':
status = ComponentStatus.Forbidden
elif info['status'] == 'obligatory':
status = ComponentStatus.Required
elif info['status'] == 'optional':
status = ComponentStatus.Optional
else:
raise NotImplementedError("strange status: {}".format(info['status']))
self.status = status
self.name = name
self.idx = idx
self.restriction = None
self.next_element = []
self.rendition = ComponentRendition()
self.selection = {}
self.iter_ctr = 0
def render_word(self, word):
return self.rendition.render(word)
def add_next(self, next_component, link_label, order):
self.next_element.append((next_component, link_label, Order.new(order)))
def set_restriction(self, restrictions_tag):
if restrictions_tag is None:
self.restriction = Restriction(None)
elif restrictions_tag.tag == "restriction":
self.restriction = Restriction(restrictions_tag)
elif restrictions_tag.tag == "restriction_or":
self.restriction = [Restriction(el) for el in restrictions_tag]
else:
raise RuntimeError("Unreachable")
def set_representation(self, representation):
cr = None
if representation is not None:
self.representation = []
for feature in representation:
f = ComponentRepresentation.new(dict(feature.attrib))
if type(f) is None:
logging.warning("Unknown representation in component {}, skipping...".format(self.idx), file=sys.stderr)
continue
if type(f) is StructureSelection:
assert(cr is None)
cr = f
elif type(f) is ComponentRendition:
self.rendition = f
elif type(f) is dict:
self.selection.update(f)
else:
raise RuntimeError("Unreachable: {}".format(f))
return cr
def find_next(self, deps, comps, restrs, reprs):
representation = StructureSelection.All
to_ret = []
for d in deps:
if d[0] == self.idx:
_, idx, dep_label, order = d
next_component = Component(comps[idx])
next_component.set_restriction(restrs[idx])
r1 = next_component.set_representation(reprs[idx])
to_ret.append(next_component)
self.add_next(next_component, dep_label, order)
others, r2 = next_component.find_next(deps, comps, restrs, reprs)
to_ret.extend(others)
if StructureSelection.Frequency in (r1, r2):
representation = StructureSelection.Frequency
return to_ret, representation
def name_str(self):
return "_" if self.name is None else self.name
def __str__(self):
n = self.name_str()
return "{:s}) {:7s}:{} [{}] :{}".format(
self.idx, n, self.status, self.restriction, self.rendition)
def tree(self):
el = []
for next, link, order in self.next_element:
s = "{:3} -- {:5} --> {:3}".format(self.idx, link, next.idx)
if order != Order.Any:
s += " " + str(order)[6:]
el.append(s)
el.extend(next.tree())
return el
def __repr__(self):
return str(self)
def match(self, word):
m1 = self._match_self(word)
if m1 is None:
return None
mn = self._match_next(word)
if mn is None:
return None
to_ret = [m1]
for cmatch in mn:
# if good match but nothing to add, just continue
if len(cmatch) == 0:
continue
# if more than one match found for particular component
elif len(cmatch) > 1:
logging.debug("MULTIPLE: {}, {}".format(self.idx, cmatch))
# if more than one match in multiple components, NOPE!
if len(to_ret) > 1:
logging.warning("Strange multiple match: {}".format(
str([w.id for w in cmatch[0].values()])))
for tr in to_ret:
tr.update(cmatch[0])
continue
# yeah, so we have found more than one match, =>
# more than one element in to_ret
to_ret = [{**dict(to_ret[0]), **m} for m in cmatch]
else:
for tr in to_ret:
tr.update(cmatch[0])
logging.debug("MA: {}".format(str(to_ret)))
return to_ret
def _match_self(self, word):
matched = None
# matching
if type(self.restriction) is list:
for restr in self.restriction:
matched = restr.match(word)
if matched: # match either
break
else:
matched = self.restriction.match(word)
logging.debug("SELF MATCH({}: {} -> {}".format(self.idx, word.text, matched))
# recurse to next
if not matched:
return None
else:
return {self.idx: word}
def _match_next(self, word):
# matches for every component in links from this component
to_ret = []
# need to get all links that match
for next, link, order in self.next_element:
next_links = word.get_links(link)
logging.debug("FIND LINKS FOR: {} -> {}: #{}".format(self.idx, next.idx, len(next_links)))
to_ret.append([])
# good flag
good = next.status != ComponentStatus.Required
for next_word in next_links:
logging.debug("link: {}: {} -> {}".format(link, word.id, next_word.id))
if not order.match(word, next_word):
continue
match = next.match(next_word)
if match is not None:
# special treatement for forbidden
if next.status == ComponentStatus.Forbidden:
good = False
break
else:
assert(type(match) is list)
to_ret[-1].extend(match)
good = True
# if none matched, nothing found!
if not good:
logging.debug("BAD")
return None
return to_ret
class SyntacticStructure:
def __init__(self):
self.id = None
self.lbs = None
self.agreements = []
self.components = []
self.selection = StructureSelection.All
@staticmethod
def from_xml(xml):
st = SyntacticStructure()
st.id = xml.get('id')
st.lbs = xml.get('LBS')
assert(len(list(xml)) == 1)
system = next(iter(xml))
assert(system.get('type') == 'JOS')
components, dependencies, definitions = list(system)
deps = [ (dep.get('from'), dep.get('to'), dep.get('label'), dep.get('order')) for dep in dependencies ]
comps = { comp.get('cid'): dict(comp.items()) for comp in components }
restrs, forms = {}, {}
for comp in definitions:
n = comp.get('cid')
restrs[n] = None
forms[n] = None
for el in comp:
if el.tag.startswith("restriction"):
assert(restrs[n] is None)
restrs[n] = el
elif el.tag.startswith("representation"):
st.add_representation(n, el, forms)
else:
raise NotImplementedError("definition??")
fake_root_component = Component({'cid': '#', 'type': 'other'})
st.components, st.selection = fake_root_component.find_next(deps, comps, restrs, forms)
return st
def add_representation(self, n, el, forms):
if el.tag == "representation":
els = [el]
elif el.tag == "representation_and":
els = list(el)
else:
raise NotImplementedError("Unknown representation tag: {}".format(el.tag))
for el in els:
if el.get('basic') == 'form':
assert(forms[n] is None)
forms[n] = el
elif el.get('basic') == "agreement":
self.add_agreement(n, el)
else:
logging.warning("Strange representation (basic={}) in structure {}. Skipping"
.format(el.get('basic'), self.id))
continue
def add_agreement(self, n, el):
assert(el.get('head')[:4] == 'cid_')
n1 = n
n2 = el.get('head')[4:]
agreement_str = next(iter(el)).get('agreement')
self.agreements.append({
'n1': n1,
'n2': n2,
'match': agreement_str.split('|')})
def __str__(self):
comp_str = "\n".join(str(comp) for comp in self.components)
agrs = "\n".join("({} -[{}]- {}) ".format(
a['n1'], "|".join(a['match']), a['n2']) for a in self.agreements)
links_str = "\n".join(self.components[0].tree())
return "{} LBS {}\nCOMPONENTS\n{}\nAGREEMENTS\n{}\nLINKS\n{}\n{}".format(
self.id, self.lbs, comp_str, agrs, links_str, "-" * 40)
def get_component(self, idx):
for c in self.components:
if c.idx == idx:
return c
raise RuntimeError("Unknown component id: {}".format(idx))
def check_agreements(self, match):
for agr in self.agreements:
w1 = match[agr['n1']]
w2 = match[agr['n2']]
for agr_case in agr['match']:
t1 = w1.msd[0]
v1 = TAGSET[t1].index(agr_case)
assert(v1 >= 0)
# if none specified: nedolocnik, always agrees
if v1 + 1 >= len(w1.msd):
continue
# first is uppercase, not in TAGSET
m1 = w1.msd[v1 + 1]
# REPEAT (not DRY!)
t2 = w2.msd[0]
v2 = TAGSET[t2].index(agr_case)
assert(v2 >= 0)
if v2 + 1 >= len(w2.msd):
continue
m2 = w2.msd[v2 + 1]
# match!
if '-' not in [m1, m2] and m1 != m2:
return False
return True
def check_form(self, match):
for midx, w in match.items():
c = self.get_component(midx)
for key, value in c.selection.items():
t = w.msd[0]
v = TAGSET[t].index(key.lower())
f1 = w.msd[v + 1]
f2 = CODES[value]
if '-' not in [f1, f2] and f1 != f2:
return False
return True
def match(self, word):
matches = self.components[0].match(word)
if matches is None:
return []
to_ret = []
for m in matches:
if not self.check_agreements(m):
bad = "Agreement"
elif not self.check_form(m):
bad = "Form"
else:
bad = "OK"
to_ret.append((m, bad))
return to_ret
def build_structures(filename):
structures = []
with open(filename, 'r') as fp:
et = ElementTree.XML(fp.read())
for structure in et.iter('syntactic_structure'):
to_append = SyntacticStructure.from_xml(structure)
if to_append is None:
continue
structures.append(to_append)
return structures
def get_msd(comp):
d = dict(comp.items())
if 'msd' in d:
return d['msd']
elif 'ana' in d:
return d['ana'][4:]
else:
logging.error(d, file=sys.stderr)
raise NotImplementedError("MSD?")
class Word:
def __init__(self, xml):
self.lemma = xml.get('lemma')
self.msd = MSD_TRANSLATE[get_msd(xml)]
self.id = xml.get('id')
self.text = xml.text
self.links = defaultdict(list)
assert(None not in (self.id, self.lemma, self.msd))
@staticmethod
def pcWord(pc):
pc.set('lemma', pc.text)
return Word(pc)
def add_link(self, link, to):
self.links[link].append(to)
def get_links(self, link):
if link not in self.links and "|" in link:
for l in link.split('|'):
self.links[link].extend(self.links[l])
return self.links[link]
def is_root_id(id_):
return len(id_.split('.')) == 3
def load_corpus(filename):
with open(filename, 'r') as fp:
xmlstring = re.sub(' xmlns="[^"]+"', '', fp.read(), count=1)
xmlstring = xmlstring.replace(' xml:', ' ')
et = ElementTree.XML(xmlstring)
words = {}
for w in et.iter("w"):
words[w.get('id')] = Word(w)
for pc in et.iter("pc"):
words[pc.get('id')] = Word.pcWord(pc)
for l in et.iter("link"):
if 'dep' in l.keys():
ana = l.get('afun')
lfrom = l.get('from')
dest = l.get('dep')
else:
ana = l.get('ana')
if ana[:4] != 'syn:': # dont bother...
continue
ana = ana[4:]
lfrom, dest = l.get('target').replace('#', '').split()
if lfrom in words:
if is_root_id(lfrom):
logging.error("NOO: ", lfrom)
sys.exit(1)
if dest in words:
next_word = words[dest]
words[lfrom].add_link(ana, next_word)
else:
logging.error("Unknown id: {}".format(dest))
sys.exit(1)
else:
# strange errors, just skip...
pass
return list(words.values())
def main():
import time
t = time.time()
structures = build_structures(STRUKTURE)
for s in structures:
logging.debug(str(s))
words = load_corpus(STAVKI)
# import pickle
# with open("words.p", "wb") as fp:
# pickle.dump(words, fp)
# with open("words.p", "rb") as fp:
# words = pickle.load(fp)
logging.info("MATCHES...")
matches = {s.id: [] for s in structures}
for idx, s in enumerate(structures):
logging.info("{}/{}: {:7s}".format(idx, len(structures), s.id))
for w in words:
mhere = s.match(w)
logging.debug(" GOT: {}".format(len(mhere)))
for match, reason in mhere:
matches[s.id].append((match, reason))
print("")
header = ["Structure_ID"]
for i in range(MAX_NUM_COMPONENTS):
header.extend("C{}_{}".format(i + 1, thd) for thd in
["Token_ID", "Word_form", "Lemma", "Msd", "Representative_form"])
header.extend(["Collocation_ID", "Joint_representative_form"])
csv = [", ".join(header)]
colocation_ids = {}
for s in structures:
ms = matches[s.id]
for m, reason in ms:
colocation_id = [s.id]
to_print = []
m_sorted = defaultdict(lambda: None, m.items())
for idx, comp in enumerate(s.components):
idx = str(idx + 1)
if idx not in m_sorted:
to_print.extend(["", "", "", "", ""])
else:
w = m_sorted[idx]
# if comp.render_word(m_sorted[idx]) is not None:
if True:
to_print.extend([w.id, w.text, w.lemma, w.msd, ""])
colocation_id.append(w.lemma)
colocation_id = tuple(colocation_id)
if colocation_id in colocation_ids:
cid = colocation_ids[colocation_id]
else:
cid = len(colocation_ids) + 1
colocation_ids[colocation_id] = cid
to_print = [s.id] + to_print
length = 1 + MAX_NUM_COMPONENTS * 5
# make them equal size
to_print.extend([""] * (length - len(to_print)))
to_print.extend([str(cid), ""])
logging.info("TIME: {}".format(time.time() - t))
logging.debug([(k, len(v)) for k, v in matches.items()])
logging.debug(sum(len(v) for _, v in matches.items()))
csv.append(", ".join(to_print))
with open(FILE_OUT, "w") as fp:
print("\n".join(csv), file=fp)
# groups = defaultdict(int)
# for m, reason in ms:
# if reason != "OK":
# continue
# lemmas = [(n, w.lemma) for n, w in m.items()]
# lemmas = tuple(sorted(lemmas, key=lambda x: x[0]))
# groups[lemmas] += 1
# print(s.id)
# print(groups)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
# 6, 7 primeri laznih zadetkov?