123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- from unittest import TestCase, mock
- from datetime import datetime
- from .factories import (IEDocFactory, SentencedIEDocFactory, GazetteItemFactory,
- EntityOccurrenceFactory, EntityKindFactory)
- from .manager_case import ManagerTestCase
- from iepy.preprocess.pipeline import PreProcessSteps
- from iepy.preprocess.stanford_preprocess import (
- StanfordPreprocess, GazetteManager, apply_coreferences, CoreferenceError,
- StanfordAnalysis)
- class TestableStanfordAnalysis(StanfordAnalysis):
- def __init__(self, hacked_sentences, *args):
- self.hacked_sentences = hacked_sentences
- super().__init__({})
- def get_sentences(self):
- return self.hacked_sentences
- def sentence_factory(markup):
- """Simplistic builder of *parsed* sentences.
- Each line in the markup is interpreted as a whitespace-separated-values for
- token offset-in-chars ner lemma
- which are returned as a list of dicts.
- """
- sentence = []
- for line in markup.split("\n"):
- line = line.strip()
- if not line:
- continue
- token, offset, ner, lemma = line.split()
- sentence.append({
- "word": token,
- "CharacterOffsetBegin": offset,
- "NER": ner,
- "lemma": lemma,
- })
- return sentence
- def get_analysis_from_sent_markup(markup):
- sentences = [sentence_factory(markup)]
- return TestableStanfordAnalysis(hacked_sentences=sentences)
- class TestSentenceFunctions(TestCase):
- def test_get_tokens_simple(self):
- analysis = get_analysis_from_sent_markup("""
- friends x x x
- will x x x
- be x x x
- friends x x x
- """)
- X = analysis.get_tokens()
- self.assertEqual(X, "friends will be friends".split())
- def test_get_tokens_empty(self):
- self.assertEqual(TestableStanfordAnalysis([]).get_tokens(),
- [])
- def test_get_tokens_invalid_data(self):
- with self.assertRaises(KeyError):
- TestableStanfordAnalysis([[{"aaa": "bbb"}]]).get_tokens()
- def test_get_token_offsets_simple(self):
- analysis = get_analysis_from_sent_markup("""
- x 1 x x
- x 4 x x
- x 8 x x
- x 3 x x
- """)
- X = analysis.get_token_offsets()
- self.assertEqual(X, [1, 4, 8, 3])
- def test_get_token_offsets_empty(self):
- self.assertEqual(TestableStanfordAnalysis([]).get_token_offsets(), [])
- def test_get_token_offsets_invalid_data(self):
- with self.assertRaises(KeyError):
- TestableStanfordAnalysis([[{"aaa": "bbb"}]]).get_token_offsets()
- def test_sentence_boundaries_empty(self):
- self.assertEqual(TestableStanfordAnalysis([]).get_sentence_boundaries(), [0])
- def test_sentence_boundaries_simple(self):
- sentences = [
- sentence_factory("x x x x\n" * 3), # 3 words
- sentence_factory("x x x x\n" * 2), # 2 words
- sentence_factory("x x x x\n" * 4), # 4 words
- ]
- # 1st 2nd 3rd end
- expected = [0, 3, 3 + 2, 3 + 2 + 4]
- analysis = TestableStanfordAnalysis(sentences)
- self.assertEqual(analysis.get_sentence_boundaries(), expected)
- def test_offsets_and_tokens_work_togheter(self):
- sentences = [
- sentence_factory("a x x x\n" * 3), # 3 words
- sentence_factory("b x x x\n" * 2), # 2 words
- sentence_factory("c x x x\n" * 4), # 4 words
- sentence_factory("d x x x\n" * 5), # 5 words
- ]
- analysis = TestableStanfordAnalysis(sentences)
- words = analysis.get_tokens()
- offsets = analysis.get_sentence_boundaries()
- self.assertEqual(len(words), offsets[-1])
- self.assertEqual(words[offsets[1]], "b")
- self.assertEqual(words[offsets[1] - 1], "a")
- self.assertEqual(words[offsets[3]], "d")
- self.assertEqual(words[offsets[3] - 1], "c")
- def test_get_entity_occurrences_simple(self):
- a = sentence_factory("""
- a 0 O x
- b 1 B x
- c 2 O x
- """)
- b = sentence_factory("""
- d 3 O x
- e 4 O x
- f 5 O x
- """)
- c = sentence_factory("""
- g 6 D x
- h 7 D x
- i 8 O x
- j 9 O x
- k 10 H x
- """)
- d = sentence_factory("""
- l 11 H x
- m 12 O x
- n 13 O x
- o 14 L x
- """)
- analysis = TestableStanfordAnalysis([a, b, c, d])
- expected = [
- (1, 2, "B"), # this is the b letter in the first sentence
- (6, 8, "D"), # first two words of the third sentence
- (10, 11, "H"), # last word of the third sentence
- (11, 12, "H"), # first word of the fourth sentence
- (14, 15, "L"), # last word of the fourth sentence
- ]
- self.assertEqual(analysis.get_entity_occurrences(), expected)
- def test_get_lemmas_empty(self):
- self.assertEqual(TestableStanfordAnalysis([]).get_lemmas(), [])
- def test_get_lemmas_and_tokens_same_length(self):
- sentences = [
- sentence_factory("x x x x\n" * 3), # 3 words
- sentence_factory("x x x x\n" * 2), # 2 words
- sentence_factory("x x x x\n" * 4), # 4 words
- sentence_factory("x x x x\n" * 5), # 5 words
- ]
- analysis = TestableStanfordAnalysis(sentences)
- tokens = analysis.get_tokens()
- lemmas = analysis.get_lemmas()
- self.assertEqual(len(tokens), len(lemmas))
- class TestPreProcessCall(ManagerTestCase):
- def _doc_creator(self, mark_as_done):
- doc = SentencedIEDocFactory()
- for step in mark_as_done:
- setattr(doc, "{}_done_at".format(step.name), datetime.now())
- doc.save()
- return doc
- def setUp(self):
- pps = PreProcessSteps
- self._all_steps = [
- pps.tokenization,
- pps.sentencer,
- pps.tagging,
- pps.ner,
- pps.lemmatization,
- pps.syntactic_parsing
- ]
- patcher = mock.patch("iepy.preprocess.corenlp.get_analizer")
- self.mock_get_analizer = patcher.start()
- self.mock_analizer = self.mock_get_analizer.return_value
- self.addCleanup(patcher.stop)
- self.stanfordpp = StanfordPreprocess()
- def test_if_all_steps_are_done_then_no_step_is_run(self):
- doc = self._doc_creator(mark_as_done=self._all_steps)
- self.stanfordpp(doc)
- self.assertFalse(self.mock_analizer.analyse.called)
- def test_if_all_steps_are_done_but_in_override_mode_then_all_are_run_again(self):
- doc = self._doc_creator(mark_as_done=self._all_steps[:])
- self.mock_analizer.analyse.return_value = {}
- self.stanfordpp.override = True
- self.stanfordpp(doc)
- self.assertTrue(self.mock_analizer.analyse.called)
- def test_for_new_doc_all_steps_are_done_when_preprocessed(self):
- doc = IEDocFactory()
- self.mock_analizer.analyse.return_value = {}
- self.stanfordpp(doc)
- for step in self._all_steps:
- self.assertTrue(doc.was_preprocess_step_done(step))
- def test_lemmatization_is_run_even_all_others_already_did(self):
- # On release 0.9.1 lemmatization was added. This checks it's possible to
- # increment older preprocessed docs
- doc_no_lemmas = self._doc_creator(
- [s for s in self._all_steps if s is not PreProcessSteps.lemmatization])
- with mock.patch.object(self.stanfordpp, "lemmatization_only") as mock_lemmatization:
- mock_lemmatization.side_effect = lambda x: None
- self.stanfordpp(doc_no_lemmas)
- self.assertTrue(mock_lemmatization.called)
- def test_syntactic_parsing_is_run_even_all_others_already_did(self):
- # On release 0.9.2 syntac parsing was added. This checks it's possible to
- # increment older preprocessed docs
- doc_no_synparse = self._doc_creator(
- [s for s in self._all_steps if s is not PreProcessSteps.syntactic_parsing])
- with mock.patch.object(self.stanfordpp, "syntactic_parsing_only") as mock_synparse:
- mock_synparse.side_effect = lambda x: None
- self.stanfordpp(doc_no_synparse)
- self.assertTrue(mock_synparse.called)
- def test_can_add_ner_on_incremental_mode_over_already_preprocessed_documents(self):
- doc_done = self._doc_creator(mark_as_done=self._all_steps)
- doc_fresh = IEDocFactory()
- self.stanfordpp.increment_ner = True
- p = lambda x: mock.patch.object(self.stanfordpp, x)
- with p("increment_ner_only") as mock_ner_only:
- with p("run_everything") as mock_run_everything:
- self.stanfordpp(doc_done)
- self.assertEqual(mock_ner_only.call_count, 1)
- self.assertEqual(mock_run_everything.call_count, 0)
- self.stanfordpp(doc_fresh)
- self.assertEqual(mock_ner_only.call_count, 1)
- self.assertEqual(mock_run_everything.call_count, 1)
- class TestGazetteer(ManagerTestCase):
- def test_generate_gazettes_file_empty(self):
- self.assertEqual(GazetteManager().generate_stanford_gazettes_file(), None)
- def _test_single_gazette(self, text=None):
- if text:
- gazette_item = GazetteItemFactory(text=text)
- else:
- gazette_item = GazetteItemFactory()
- gzmanager = GazetteManager()
- filepath = gzmanager.generate_stanford_gazettes_file()
- self.assertNotEqual(filepath, None)
- data = open(filepath).read().split("\n")
- self.assertEqual(len(data), 2)
- data = data[0].split("\t")
- self.assertEqual(len(data), 3)
- self.assertEqual(data[0], gzmanager.escape_text(gazette_item.text))
- self.assertEqual(data[1], "{}{}".format(gzmanager._PREFIX, gazette_item.kind.name))
- gazette_item.delete()
- def test_generate_gazettes_several_lines(self):
- [GazetteItemFactory() for x in range(10)]
- filepath = GazetteManager().generate_stanford_gazettes_file()
- self.assertNotEqual(filepath, None)
- data = open(filepath).read()
- self.assertEqual(data.count("\n"), 10)
- for line in data.split("\n")[:-1]:
- self.assertEqual(line.count("\t"), 2)
- def test_generate_gazettes_one_line(self):
- self._test_single_gazette()
- def test_gazettes_unicode(self):
- self._test_single_gazette("#½]}→@}#½ĸ@#")
- self._test_single_gazette("ħøłæ")
- self._test_single_gazette("æ}@ł¢µ«»µ«»“~þðøđþ")
- self._test_single_gazette("\ || \ ()(()))) \\ |")
- def test_gazettes_same_eo_has_same_entity(self):
- tokens = "The nominates were Stuart Little and Memento but the winner was Stuart Little".split()
- analysis = get_analysis_from_sent_markup(
- " ".join(["{} x x x\n".format(x) for x in tokens]))
- fake_gazetter = mock.MagicMock()
- fake_gazetter.was_entry_created_by_gazette.return_value = True
- with mock.patch.object(analysis, "get_entity_occurrences") as mock_eos:
- mock_eos.return_value = [
- (3, 5, "MOVIE"), # first occurrence of "Stuart Little"
- (6, 7, "MOVIE"), # occurrence of "Memento"
- (11, 13, "MOVIE"), # second occurrence of "Stuart Little"
- ]
- found_entities = analysis.get_found_entities('random_string', fake_gazetter)
- # Ok, so, Occurrences with same alias, if came from gazetter, are same Entity
- self.assertEqual(found_entities[0].key, found_entities[2].key)
- # But ofcourse, if have different aliases, not
- self.assertNotEqual(found_entities[0].key, found_entities[1].key)
- def test_escaping(self):
- texts = [
- "Maradona",
- "El Diego",
- "El Diego ( el 10 )",
- "|()|",
- "æßðæßð",
- "\ hello \ ",
- "*",
- ]
- gm = GazetteManager()
- for text in texts:
- escaped = gm.escape_text(text)
- self.assertEqual(escaped.count("\Q"), len(text.split()))
- self.assertEqual(escaped.count("\E"), len(text.split()))
- class TestMergeCorreferences(ManagerTestCase):
- def setUp(self):
- self.doc = SentencedIEDocFactory(
- text="Diego did it . He scored on the first half , and now he did it again . "
- #0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
- "Diego Maradona , the best player ever , won the game alone .")
- #18 19 20 21 22 23 24 25 26 27 28 29 30
- # mentions is a list of triplets (start, end, head) of each mention
- self.mentions = [(0, 1, 0), # Diego
- (4, 5, 4), # He
- (13, 14, 13), # he
- (18, 20, 18), # Diego Maradona
- (21, 25, 23), # the best player ever
- ]
- assert self.doc.entity_occurrences.count() == 0
- self.sample_ekind = EntityKindFactory()
- def merge(self, correfs):
- # runs the method on document
- return apply_coreferences(self.doc, correfs)
- def create_eo_with_mention(self, mention):
- return EntityOccurrenceFactory(
- document=self.doc, entity__kind=self.sample_ekind,
- offset=mention[0], offset_end=mention[1])
- def test_if_none_of_the_mentions_is_already_and_EO_nothing_happens(self):
- self.merge(self.mentions[:])
- self.assertEqual(self.doc.entity_occurrences.count(), 0)
- def test_merging_when_there_are_EOS_from_different_kind_fails(self):
- for m in self.mentions[:2]:
- eo = self.create_eo_with_mention(self.mentions[0])
- eo.entity.kind = EntityKindFactory()
- eo.entity.save()
- self.assertRaises(CoreferenceError, self.merge, self.mentions[:])
- def test_if_only_one_EO_existed_then_all_others_are_created_with_same_entity(self):
- original_eo = self.create_eo_with_mention(self.mentions[0])
- self.merge(self.mentions[:])
- self.assertEqual(self.doc.entity_occurrences.count(), len(self.mentions))
- for eo in self.doc.entity_occurrences.all():
- self.assertEqual(eo.entity, original_eo.entity)
- if eo != original_eo:
- self.assertTrue(eo.anaphora)
- def test_if_all_the_existent_EOs_are_from_anaphora_no_new_ones_are_created(self):
- original_eo = self.create_eo_with_mention(self.mentions[0])
- original_eo.anaphora = True
- original_eo.save()
- self.merge(self.mentions[:])
- self.assertEqual(self.doc.entity_occurrences.count(), 1)
- def test_if_coexist_EO_from_gazette_and_EO_from_NER_entity_of_the_later_is_used(self):
- eo_from_gz = self.create_eo_with_mention(self.mentions[0])
- eo_from_gz.entity.gazette = GazetteItemFactory()
- eo_from_gz.entity.save()
- eo_from_ner = self.create_eo_with_mention(self.mentions[1])
- expected_entity = eo_from_ner.entity
- self.merge(self.mentions[:])
- self.assertEqual(self.doc.entity_occurrences.count(), len(self.mentions))
- for eo in self.doc.entity_occurrences.all():
- # this will reload also eo_from_gz and eo_from_ner
- self.assertEqual(eo.entity, expected_entity)
- def test_if_coexist_several_EOs_from_NER_the_entity_of_first_is_used(self):
- eo_1 = self.create_eo_with_mention(self.mentions[0])
- eo_2 = self.create_eo_with_mention(self.mentions[1])
- assert eo_1.entity != eo_2.entity
- expected_entity = eo_1.entity
- self.merge(self.mentions[:])
- for eo in self.doc.entity_occurrences.all():
- # this will reload eo_1 and eo_2
- self.assertEqual(eo.entity, expected_entity)
- def test_cant_merge_several_EOs_from_different_GZ_items(self):
- eo_1 = self.create_eo_with_mention(self.mentions[0])
- eo_1.entity.gazette = GazetteItemFactory()
- eo_1.entity.save()
- eo_2 = self.create_eo_with_mention(self.mentions[1])
- eo_2.entity.gazette = GazetteItemFactory()
- eo_2.entity.save()
- self.assertRaises(CoreferenceError, self.merge, self.mentions[:])
|