from xml.etree import ElementTree import re from enum import Enum from collections import defaultdict import sys import logging from msd_translate import MSD_TRANSLATE MAX_NUM_COMPONENTS = 5 STAVKI = sys.argv[1] STRUKTURE = sys.argv[2] FILE_OUT = sys.argv[3] CODES = { "Noun": "N", "Verb": "V", "Adjective": "A", "Adverb": "R", "Pronoun": "P", "Numeral": "M", "Preposition": "S", "Conjunction": "C", "Particle": "Q", "Interjection": "I", "Abbreviation": "Y", "Residual": "X", 'common': 'c', 'proper': 'p', 'masculine': 'm', 'feminine': 'f', 'neuter': 'n', "singular": "s", "dual": "d", "plural": "p", "nominative": "n", "genitive": "g", "dative": "d", "accusative": "a", "locative": "l", "instrumental": "i", "no": "n", "yes": "y", "main": "m", "auxiliary": "a", "perfective": "e", "progressive": "p", "biaspectual": "b", "infinitive": "n", "supine": "u", "participle": "p", "present": "r", "future": "f", "conditional": "c", "imperative": "m", "first": "1", "second": "2", "third": "3", "general": "g", "possessive": "s", "positive": "p", "comparative": "c", "superlative": "s", "personal": "p", "demonstrative": "d", "relative": "r", "reflexive": "x", "interrogative": "q", "indefinite": "i", "negative": "z", "bound": "b", "digit": "d", "roman": "r", "letter": "l", "cardinal": "c", "ordinal": "o", "pronominal": "p", "special": "s", "coordinating": "c", "subordinating": "s", "foreign": "f", "typo": "t", "program": "p", } TAGSET = { "N": ['type', 'gender', 'number', 'case', 'animate'], "V": ['type', 'aspect', 'vform', 'person', 'number', 'gender', 'negative'], "A": ['type', 'degree', 'gender', 'number', 'case', 'definiteness'], "R": ['type', 'degree'], "P": ['type', 'person', 'gender', 'number', 'case', 'owner_number', 'owned_gender', 'clitic'], "M": ['form', 'type', 'gender', 'number', 'case', 'definiteness'], "S": ['case'], "C": ['type'], "Q": [], "I": [], "Y": [], "X": ['type'] } CATEGORY_BASES = { "N": ['.'] * 5, "V": ['.'] * 7, "A": ['.'] * 6, "R": ['.'] * 2, "P": ['.'] * 6, "M": ['.'] * 6, "S": ['.'] * 1, "C": ['.'] * 1, "Q": [], "I": [], "Y": [], "X": ['.'] * 1 } class RestrictionType(Enum): Morphology = 0 Lexis = 1 MatchAll = 2 class Rendition(Enum): Lemma = 0 WordForm = 1 Unknown = 2 class ComponentRendition: def __init__(self, rendition=Rendition.Unknown): self.word_form = {} self.rendition = rendition def render(self, word): if self.rendition == Rendition.Lemma: return word.lemma elif self.rendition == Rendition.WordForm: return word.text elif self.rendition == Rendition.Unknown: return None else: raise RuntimeError("Unknown rendition: {}".format(self.rendition)) def __str__(self): return str(self.rendition) # dont know... class StructureSelection(Enum): All = 0 Frequency = 1 class ComponentRepresentation: def new(s): if 'rendition' in s: if s['rendition'] == "lemma": return ComponentRendition(Rendition.Lemma) elif s['rendition'] == "word_form": return ComponentRendition(Rendition.WordForm) else: raise NotImplementedError("Rendition: {}".format(s)) elif 'selection' in s: if s['selection'] == "frequency": return StructureSelection.Frequency elif s['selection'] == "all": return StructureSelection.All else: return {s['selection']: s['value']} else: raise NotImplementedError("Representation: {}".format(s)) class ComponentStatus(Enum): Optional = 0 Required = 1 Forbidden = 2 def __str__(self): if self == ComponentStatus.Optional: return "?" elif self == ComponentStatus.Required: return "!" else: #Forbidden return "X" def get_level(restriction): for feature in restriction: if "level" in feature.keys(): lvl = feature.get("level") else: continue raise RuntimeError("Unreachable!") def build_morphology_regex(restriction): restr_dict = {} for feature in restriction: feature_dict = dict(feature.items()) match_type = True if "filter" in feature_dict: assert(feature_dict['filter'] == "negative") match_type = False del feature_dict['filter'] assert(len(feature_dict) == 1) key, value = next(iter(feature_dict.items())) restr_dict[key] = (value, match_type) assert('POS' in restr_dict) category = restr_dict['POS'][0].capitalize() cat_code = CODES[category] rgx = [cat_code] + CATEGORY_BASES[cat_code] del restr_dict['POS'] min_msd_length = 1 for attribute, (value, typ) in restr_dict.items(): index = TAGSET[cat_code].index(attribute.lower()) assert(index >= 0) if '|' in value: match = "".join(CODES[val] for val in value.split('|')) else: match = CODES[value] match = "[{}{}]".format("" if typ else "^", match) rgx[index + 1] = match if typ: min_msd_length = max(index + 1, min_msd_length) def matcher(text): if len(text) <= min_msd_length: return False for c, r in zip(text, rgx): if not re.match(r, c): return False return True return " ".join(rgx), matcher def build_lexis_regex(restriction): restr_dict = {} for feature in restriction: restr_dict.update(feature.items()) assert("lemma" in restr_dict) match_list = restr_dict['lemma'].split('|') return match_list, lambda text: text in match_list class Restriction: def __init__(self, restriction_tag): if restriction_tag is None: self.type = RestrictionType.MatchAll self.matcher = None self.present = None return restriction_type = restriction_tag.get('type') if restriction_type == "morphology": self.type = RestrictionType.Morphology self.present, self.matcher = build_morphology_regex(list(restriction_tag)) elif restriction_type == "lexis": self.type = RestrictionType.Lexis self.present, self.matcher = build_lexis_regex(list(restriction_tag)) else: raise NotImplementedError() def match(self, word): if self.type == RestrictionType.Morphology: match_to = word.msd elif self.type == RestrictionType.Lexis: match_to = word.lemma elif self.type == RestrictionType.MatchAll: return True else: raise RuntimeError("Unreachable!") return self.matcher(match_to) def __str__(self): return "({:s} {})".format(str(self.type).split('.')[1], self.present) def __repr__(self): return str(self) class Component: def __init__(self, info): idx = info['cid'] name = info['name'] if 'name' in info else None if 'status' not in info: status = ComponentStatus.Required elif info['status'] == 'forbidden': status = ComponentStatus.Forbidden elif info['status'] == 'obligatory': status = ComponentStatus.Required elif info['status'] == 'optional': status = ComponentStatus.Optional else: raise NotImplementedError("strange status: {}".format(info['status'])) self.status = status self.name = name self.idx = idx self.restriction = None self.next_element = [] self.rendition = ComponentRendition() self.selection = {} self.iter_ctr = 0 def render_word(self, word): return self.rendition.render(word) def add_next(self, next_component, link_label): self.next_element.append((next_component, link_label)) def set_restriction(self, restrictions_tag): if restrictions_tag is None: self.restriction = Restriction(None) elif restrictions_tag.tag == "restriction": self.restriction = Restriction(restrictions_tag) elif restrictions_tag.tag == "restriction_or": self.restriction = [Restriction(el) for el in restrictions_tag] else: raise RuntimeError("Unreachable") def set_representation(self, representation): cr = None if representation is not None: self.representation = [] for feature in representation: f = ComponentRepresentation.new(dict(feature.attrib)) if type(f) is StructureSelection: assert(cr is None) cr = f elif type(f) is ComponentRendition: self.rendition = f elif type(f) is dict: self.selection.update(f) else: raise RuntimeError("Unreachable: {}".format(f)) return cr def find_next(self, deps, comps, restrs, reprs): representation = StructureSelection.All to_ret = [] for d in deps: if d[0] == self.idx: _, idx, dep_label = d next_component = Component(comps[idx]) next_component.set_restriction(restrs[idx]) r1 = next_component.set_representation(reprs[idx]) to_ret.append(next_component) self.add_next(next_component, dep_label) others, r2 = next_component.find_next(deps, comps, restrs, reprs) to_ret.extend(others) if StructureSelection.Frequency in (r1, r2): representation = StructureSelection.Frequency return to_ret, representation def name_str(self): return "_" if self.name is None else self.name def __str__(self): n = self.name_str() return "{:s}) {:7s}:{} [{}] :{}".format( self.idx, n, self.status, self.restriction, self.rendition) def tree(self): el = [] for next, link in self.next_element: el.append("{:3} -- {:5} --> {:3}".format(self.idx, link, next.idx)) el.extend(next.tree()) return el def __repr__(self): return str(self) def match(self, word): m1 = self._match_self(word) if m1 is None: return None mn = self._match_next(word) if mn is None: return None to_ret = [m1] for cmatch in mn: # if good match but nothing to add, just continue if len(cmatch) == 0: continue # if more than one match found for particular component elif len(cmatch) > 1: logging.debug("MULTIPLE: {}, {}".format(self.idx, cmatch)) # if more than one match in multiple components, NOPE! if len(to_ret) > 1: logging.warning("Strange multiple match: {}".format( str([w.id for w in cmatch[0].values()]))) for tr in to_ret: tr.update(cmatch[0]) continue # yeah, so we have found more than one match, => # more than one element in to_ret to_ret = [{**dict(to_ret[0]), **m} for m in cmatch] else: for tr in to_ret: tr.update(cmatch[0]) logging.debug("MA: {}".format(str(to_ret))) return to_ret def _match_self(self, word): matched = None # matching if type(self.restriction) is list: for restr in self.restriction: matched = restr.match(word) if matched: # match either break else: matched = self.restriction.match(word) logging.debug("SELF MATCH({}: {} -> {}".format(self.idx, word.text, matched)) # recurse to next if not matched: return None else: return {self.idx: word} def _match_next(self, word): # matches for every component in links from this component to_ret = [] # need to get all links that match for next, link in self.next_element: next_links = word.get_links(link) logging.debug("FIND LINKS FOR: {} -> {}: #{}".format(self.idx, next.idx, len(next_links))) to_ret.append([]) # good flag good = next.status != ComponentStatus.Required for next_word in next_links: logging.debug("link: {}: {} -> {}".format(link, word.id, next_word.id)) match = next.match(next_word) if match is not None: # special treatement for forbidden if next.status == ComponentStatus.Forbidden: good = False break else: assert(type(match) is list) to_ret[-1].extend(match) good = True # if none matched, nothing found! if not good: logging.debug("BAD") return None return to_ret class SyntacticStructure: def __init__(self): self.id = None self.lbs = None self.agreements = [] self.components = [] self.selection = StructureSelection.All @staticmethod def from_xml(xml): st = SyntacticStructure() st.id = xml.get('id') st.lbs = xml.get('LBS') if float(st.id.replace('-','.')) >= 17: return None assert(len(list(xml)) == 1) system = next(iter(xml)) assert(system.get('type') == 'JOS') components, dependencies, definitions = list(system) deps = [ (dep.get('from'), dep.get('to'), dep.get('label')) for dep in dependencies ] comps = { comp.get('cid'): dict(comp.items()) for comp in components } restrs, forms = {}, {} for comp in definitions: n = comp.get('cid') restrs[n] = None forms[n] = None for el in comp: if el.tag.startswith("restriction"): assert(restrs[n] is None) restrs[n] = el elif el.tag.startswith("representation"): st.add_representation(n, el, forms) else: raise NotImplementedError("definition??") fake_root_component = Component({'cid': '#', 'type': 'other'}) st.components, st.selection = fake_root_component.find_next(deps, comps, restrs, forms) return st def add_representation(self, n, el, forms): if el.tag == "representation": els = [el] elif el.tag == "representation_and": els = list(el) else: raise NotImplementedError("repr what?: {}".format(el.tag)) for el in els: if el.get('basic') == 'form': assert(forms[n] is None) forms[n] = el elif el.get('basic') == "agreement": self.add_agreement(n, el) else: raise NotImplementedError("representation?: {}".format(el.tag)) def add_agreement(self, n, el): assert(el.get('head')[:4] == 'cid_') n1 = n n2 = el.get('head')[4:] agreement_str = next(iter(el)).get('agreement') self.agreements.append({ 'n1': n1, 'n2': n2, 'match': agreement_str.split('|')}) def __str__(self): comp_str = "\n".join(str(comp) for comp in self.components) agrs = "\n".join("({} -[{}]- {}) ".format( a['n1'], "|".join(a['match']), a['n2']) for a in self.agreements) links_str = "\n".join(self.components[0].tree()) return "{} LBS {}\nCOMPONENTS\n{}\nAGREEMENTS\n{}\nLINKS\n{}\n{}".format( self.id, self.lbs, comp_str, agrs, links_str, "-" * 40) def get_component(self, idx): for c in self.components: if c.idx == idx: return c raise RuntimeError("Unknown component id: {}".format(idx)) def check_agreements(self, match): for agr in self.agreements: w1 = match[agr['n1']] w2 = match[agr['n2']] for agr_case in agr['match']: t1 = w1.msd[0] v1 = TAGSET[t1].index(agr_case) assert(v1 >= 0) # if none specified: nedolocnik, always agrees if v1 + 1 >= len(w1.msd): continue # first is uppercase, not in TAGSET m1 = w1.msd[v1 + 1] # REPEAT (not DRY!) t2 = w2.msd[0] v2 = TAGSET[t2].index(agr_case) assert(v2 >= 0) if v2 + 1 >= len(w2.msd): continue m2 = w2.msd[v2 + 1] # match! if '-' not in [m1, m2] and m1 != m2: return False return True def check_form(self, match): for midx, w in match.items(): c = self.get_component(midx) for key, value in c.selection.items(): t = w.msd[0] v = TAGSET[t].index(key.lower()) f1 = w.msd[v + 1] f2 = CODES[value] if '-' not in [f1, f2] and f1 != f2: return False return True def match(self, word): matches = self.components[0].match(word) if matches is None: return [] to_ret = [] for m in matches: if not self.check_agreements(m): bad = "Agreement" elif not self.check_form(m): bad = "Form" else: bad = "OK" to_ret.append((m, bad)) return to_ret def build_structures(filename): structures = [] with open(filename, 'r') as fp: et = ElementTree.XML(fp.read()) for structure in et.iter('syntactic_structure'): to_append = SyntacticStructure.from_xml(structure) if to_append is None: continue structures.append(to_append) return structures def get_msd(comp): d = dict(comp.items()) if 'msd' in d: return d['msd'] elif 'ana' in d: return d['ana'][4:] else: logging.error(d, file=sys.stderr) raise NotImplementedError("MSD?") class Word: def __init__(self, xml): self.lemma = xml.get('lemma') self.msd = MSD_TRANSLATE[get_msd(xml)] self.id = xml.get('id') self.text = xml.text self.links = defaultdict(list) assert(None not in (self.id, self.lemma, self.msd)) @staticmethod def pcWord(pc): pc.set('lemma', pc.text) return Word(pc) def add_link(self, link, to): self.links[link].append(to) def get_links(self, link): if link not in self.links and "|" in link: for l in link.split('|'): self.links[link].extend(self.links[l]) return self.links[link] def is_root_id(id_): return len(id_.split('.')) == 3 def load_corpus(filename): with open(filename, 'r') as fp: xmlstring = re.sub(' xmlns="[^"]+"', '', fp.read(), count=1) xmlstring = xmlstring.replace(' xml:', ' ') et = ElementTree.XML(xmlstring) words = {} for w in et.iter("w"): words[w.get('id')] = Word(w) for pc in et.iter("pc"): words[pc.get('id')] = Word.pcWord(pc) for l in et.iter("link"): if 'dep' in l.keys(): ana = l.get('afun') lfrom = l.get('from') dest = l.get('dep') else: ana = l.get('ana') if ana[:4] != 'syn:': # dont bother... continue ana = ana[4:] lfrom, dest = l.get('target').replace('#', '').split() if lfrom in words: if is_root_id(lfrom): logging.error("NOO: ", lfrom) sys.exit(1) if dest in words: next_word = words[dest] words[lfrom].add_link(ana, next_word) else: logging.error("Unknown id: {}".format(dest)) sys.exit(1) else: # strange errors, just skip... pass return list(words.values()) def main(): import time t = time.time() structures = build_structures(STRUKTURE) for s in structures: print(s) # words = load_corpus(STAVKI) import pickle # with open("words.p", "wb") as fp: # pickle.dump(words, fp) with open("words.p", "rb") as fp: words = pickle.load(fp) print("MATCHES...") matches = {s.id: [] for s in structures} for idx, s in enumerate(structures): print("\r{}/{}: {:7s}".format(idx, len(structures), s.id)) #, end="") for w in words: mhere = s.match(w) logging.debug(" GOT: {}".format(len(mhere))) for match, reason in mhere: matches[s.id].append((match, reason)) print("") header = ["Structure_ID"] for i in range(MAX_NUM_COMPONENTS): header.extend("C{}_{}".format(i + 1, thd) for thd in ["Token_ID", "Word_form", "Lemma", "Msd", "Representative_form"]) header.extend(["Collocation_ID", "Joint_representative_form"]) csv = [", ".join(header)] colocation_ids = {} for s in structures: ms = matches[s.id] for m, reason in ms: colocation_id = [s.id] to_print = [] m_sorted = defaultdict(lambda: None, m.items()) for idx, comp in enumerate(s.components): idx = str(idx + 1) if idx not in m_sorted: to_print.extend(["", "", "", "", ""]) else: w = m_sorted[idx] # if comp.render_word(m_sorted[idx]) is not None: if True: to_print.extend([w.id, w.text, w.lemma, w.msd, ""]) colocation_id.append(w.lemma) colocation_id = tuple(colocation_id) if colocation_id in colocation_ids: cid = colocation_ids[colocation_id] else: cid = len(colocation_ids) + 1 colocation_ids[colocation_id] = cid to_print = [s.id] + to_print length = 1 + MAX_NUM_COMPONENTS * 5 # make them equal size to_print.extend([""] * (length - len(to_print))) to_print.extend([str(cid), ""]) csv.append(", ".join(to_print)) with open(FILE_OUT, "w") as fp: print("\n".join(csv), file=fp) # groups = defaultdict(int) # for m, reason in ms: # if reason != "OK": # continue # lemmas = [(n, w.lemma) for n, w in m.items()] # lemmas = tuple(sorted(lemmas, key=lambda x: x[0])) # groups[lemmas] += 1 # print(s.id) # print(groups) print("") print("TIME", time.time() - t) print([(k, len(v)) for k, v in matches.items()]) print(sum(len(v) for _, v in matches.items())) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() # 6, 7 primeri laznih zadetkov?