123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- from getpass import getuser
- import csv
- import gzip
- import logging
- import os
- import sys
- import tarfile
- import zipfile
- import wget
- from appdirs import AppDirs
- logger = logging.getLogger(__name__)
- DIRS = AppDirs('iepy', getuser())
- # if not os.path.exists(DIRS.user_data_dir):
- # # making sure that user_data_dir exists
- # os.mkdir(DIRS.user_data_dir)
- def unzip(zipped_list, n):
- """returns n lists with the elems of zipped_list unsplitted.
- The general case could be solved with zip(*zipped_list), but here we
- are also dealing with:
- - un-zipping empy list to n empty lists
- - ensuring that all zipped items in zipped_list have lenght n, raising
- ValueError if not.
- """
- if not zipped_list:
- return tuple([[]] * n)
- else:
- if not all(isinstance(x, tuple) and len(x) == n for x in zipped_list):
- raise ValueError
- return zip(*zipped_list)
- def unzip_from_url(zip_url, extraction_base_path):
- got_zipfile = None
- try:
- got_zipfile = wget.download(zip_url)
- print('') # just because wget progress-bar finishes a line with no EOL
- unzip_file(got_zipfile, extraction_base_path)
- finally:
- if zipfile:
- os.remove(got_zipfile)
- def unzip_file(zip_path, extraction_base_path):
- if zip_path.endswith('.tar.gz'):
- with tarfile.open(zip_path, mode='r:gz') as tfile:
- tfile.extractall(extraction_base_path)
- else:
- zfile = zipfile.ZipFile(zip_path)
- zfile.extractall(extraction_base_path)
- def make_feature_list(text):
- return [x.strip() for x in text.split("\n") if x.strip()]
- def evaluate(predicted_knowledge, gold_knowledge):
- """Computes evaluation metrics for a predicted knowledge with respect to a
- gold (or reference) knowledge. Returns a dictionary with the results.
- """
- # ignore predicted facts with no evidence:
- predicted_positives = set([p for p in predicted_knowledge.keys() if p.segment])
- gold_positives = set([p for p, b in gold_knowledge.items() if b])
- correct_positives = predicted_positives & gold_positives
- result = {}
- result['correct'] = correct = len(correct_positives)
- result['predicted'] = predicted = len(predicted_positives)
- result['gold'] = gold = len(gold_positives)
- if predicted > 0:
- result['precision'] = precision = float(correct) / predicted
- else:
- result['precision'] = precision = 1.0
- if gold > 0:
- result['recall'] = recall = float(correct) / gold
- else:
- result['recall'] = recall = 1.0
- if precision + recall > 0.0:
- result['f1'] = 2 * precision * recall / (precision + recall)
- else:
- result['f1'] = 0.0
- return result
- def csv_to_iepy(filepath):
- print ('Importing Documents to IEPY from {}'.format(filepath))
- from iepy.data.db import DocumentManager
- if filepath.endswith(".gz"):
- fin = gzip.open(filepath, "rt")
- else:
- fin = open(filepath, "rt")
- reader = csv.DictReader(fin)
- expected_fnames = ['document_id', 'document_text']
- if not set(reader.fieldnames).issuperset(expected_fnames):
- msg = "Couldn't find the expected field names on the provided csv {}"
- sys.exit(msg.format(expected_fnames))
- name = os.path.basename(filepath)
- docdb = DocumentManager()
- seen = set()
- i = 0
- while True:
- try:
- d = next(reader)
- except StopIteration:
- break
- except csv.Error as error:
- logger.warn("Couldn't load document: {}".format(error))
- continue
- i += 1
- doc_id = d["document_id"]
- if doc_id in seen:
- continue
- seen.add(doc_id)
- docdb.create_document(
- identifier=doc_id,
- text=d["document_text"],
- metadata={"input_filename": name},
- update_mode=True
- )
- print ('Added {} documents'.format(i))
|