Commit 41aa142c authored by J. Fernando Sánchez's avatar J. Fernando Sánchez

Refactored conversion and postprocessing

parent b4873013
......@@ -3,10 +3,8 @@ from .models import Error, Results, Entry, from_string
import logging
logger = logging.getLogger(__name__)
boolean = [True, False]
API_PARAMS = {
"algorithm": {
"aliases": ["algorithms", "a", "algo"],
......@@ -140,6 +138,15 @@ NIF_PARAMS = {
}
}
BUILTIN_PARAMS = {}
for d in [
NIF_PARAMS, CLI_PARAMS, WEB_PARAMS, PLUGINS_PARAMS, EVAL_PARAMS,
API_PARAMS
]:
for k, v in d.items():
BUILTIN_PARAMS[k] = v
def parse_params(indict, *specs):
if not specs:
......@@ -164,7 +171,7 @@ def parse_params(indict, *specs):
continue
if "options" in options:
if options["options"] == boolean:
outdict[param] = outdict[param] in [None, True, 'true', '1']
outdict[param] = str(outdict[param]).lower() in ['true', '1']
elif outdict[param] not in options["options"]:
wrong_params[param] = spec[param]
if wrong_params:
......@@ -180,11 +187,19 @@ def parse_params(indict, *specs):
return outdict
def parse_extra_params(request, plugin=None):
def parse_extra_params(request, plugins=None):
plugins = plugins or []
params = request.parameters.copy()
if plugin:
extra_params = parse_params(params, plugin.get('extra_params', {}))
params.update(extra_params)
for plugin in plugins:
if plugin:
extra_params = parse_params(params, plugin.get('extra_params', {}))
for k, v in extra_params.items():
if k not in BUILTIN_PARAMS:
if k in params: # Set by another plugin
del params[k]
else:
params[k] = v
params['{}.{}'.format(plugin.name, k)] = v
return params
......@@ -194,12 +209,12 @@ def parse_call(params):
params = parse_params(params, NIF_PARAMS)
if params['informat'] == 'text':
results = Results()
entry = Entry(nif__isString=params['input'],
id='#') # Use @base
entry = Entry(nif__isString=params['input'], id='#') # Use @base
results.entries.append(entry)
elif params['informat'] == 'json-ld':
results = from_string(params['input'], cls=Results)
else: # pragma: no cover
raise NotImplementedError('Informat {} is not implemented'.format(params['informat']))
raise NotImplementedError('Informat {} is not implemented'.format(
params['informat']))
results.parameters = params
return results
......@@ -197,7 +197,9 @@ def api_root(plugin):
plugin = plugin.replace('+', '/')
plugin = plugin.split('/')
req.parameters['algorithm'] = tuple(plugin)
return current_app.senpy.analyse(req)
results = current_app.senpy.analyse(req)
results.analysis = set(i.id for i in results.analysis)
return results
@api_blueprint.route('/evaluate/', methods=['POST', 'GET'])
......
......@@ -6,7 +6,6 @@ from future import standard_library
standard_library.install_aliases()
from . import plugins, api
from .plugins import Plugin, evaluate
from .models import Error, AggregatedEvaluation
from .blueprints import api_blueprint, demo_blueprint, ns_blueprint
......@@ -17,7 +16,6 @@ import copy
import errno
import logging
from . import gsitk_compat
logger = logging.getLogger(__name__)
......@@ -25,6 +23,7 @@ logger = logging.getLogger(__name__)
class Senpy(object):
""" Default Senpy extension for Flask """
def __init__(self,
app=None,
plugin_folder=".",
......@@ -50,7 +49,7 @@ class Senpy(object):
self.add_folder('plugins', from_root=True)
else:
# Add only conversion plugins
self.add_folder(os.path.join('plugins', 'conversion'),
self.add_folder(os.path.join('plugins', 'postprocessing'),
from_root=True)
self.app = app
if app is not None:
......@@ -115,6 +114,7 @@ class Senpy(object):
raise AttributeError("Not a folder or does not exist: %s", folder)
def _get_plugins(self, request):
'''Get a list of plugins that should be run for a specific request'''
if not self.analysis_plugins:
raise Error(
status=404,
......@@ -132,33 +132,32 @@ class Senpy(object):
plugins = list()
for algo in algos:
algo = algo.lower()
if algo == 'conversion':
continue # Allow 'conversion' as a virtual plugin, which does nothing
if algo not in self._plugins:
msg = ("The algorithm '{}' is not valid\n"
"Valid algorithms: {}").format(algo,
self._plugins.keys())
logger.debug(msg)
raise Error(
status=404,
message=msg)
raise Error(status=404, message=msg)
plugins.append(self._plugins[algo])
return plugins
def _process_entries(self, entries, req, plugins):
def _process(self, req, pending, done=None):
"""
Recursively process the entries with the first plugin in the list, and pass the results
to the rest of the plugins.
"""
if not plugins:
for i in entries:
yield i
return
plugin = plugins[0]
specific_params = api.parse_extra_params(req, plugin)
req.analysis.append({'plugin': plugin,
'parameters': specific_params})
results = plugin.analyse_entries(entries, specific_params)
for i in self._process_entries(results, req, plugins[1:]):
yield i
done = done or []
if not pending:
return req
plugin = pending[0]
results = plugin.process(req, conversions_applied=done)
if plugin not in results.analysis:
results.analysis.append(plugin)
return self._process(results, pending[1:], done)
def install_deps(self):
plugins.install_deps(*self.plugins())
......@@ -170,72 +169,14 @@ class Senpy(object):
by api.parse_call().
"""
logger.debug("analysing request: {}".format(request))
entries = request.entries
request.entries = []
plugins = self._get_plugins(request)
results = request
for i in self._process_entries(entries, results, plugins):
results.entries.append(i)
self.convert_emotions(results)
logger.debug("Returning analysis result: {}".format(results))
results.analysis = [i['plugin'].id for i in results.analysis]
request.parameters = api.parse_extra_params(request, plugins)
results = self._process(request, plugins)
logger.debug("Got analysis result: {}".format(results))
results = self.postprocess(results)
logger.debug("Returning post-processed result: {}".format(results))
return results
def _get_datasets(self, request):
if not self.datasets:
raise Error(
status=404,
message=("No datasets found."
" Please verify DatasetManager"))
datasets_name = request.parameters.get('dataset', None).split(',')
for dataset in datasets_name:
if dataset not in self.datasets:
logger.debug(("The dataset '{}' is not valid\n"
"Valid datasets: {}").format(dataset,
self.datasets.keys()))
raise Error(
status=404,
message="The dataset '{}' is not valid".format(dataset))
dm = gsitk_compat.DatasetManager()
datasets = dm.prepare_datasets(datasets_name)
return datasets
@property
def datasets(self):
self._dataset_list = {}
dm = gsitk_compat.DatasetManager()
for item in dm.get_datasets():
for key in item:
if key in self._dataset_list:
continue
properties = item[key]
properties['@id'] = key
self._dataset_list[key] = properties
return self._dataset_list
def evaluate(self, params):
logger.debug("evaluating request: {}".format(params))
results = AggregatedEvaluation()
results.parameters = params
datasets = self._get_datasets(results)
plugins = self._get_plugins(results)
for eval in evaluate(plugins, datasets):
results.evaluations.append(eval)
if 'with_parameters' not in results.parameters:
del results.parameters
logger.debug("Returning evaluation result: {}".format(results))
return results
def _conversion_candidates(self, fromModel, toModel):
candidates = self.plugins(plugin_type='emotionConversionPlugin')
for candidate in candidates:
for pair in candidate.onyx__doesConversion:
logging.debug(pair)
if pair['onyx:conversionFrom'] == fromModel \
and pair['onyx:conversionTo'] == toModel:
yield candidate
def convert_emotions(self, resp):
"""
Conversion of all emotions in a response **in place**.
......@@ -244,11 +185,12 @@ class Senpy(object):
Needless to say, this is far from an elegant solution, but it works.
@todo refactor and clean up
"""
plugins = [i['plugin'] for i in resp.analysis]
plugins = resp.analysis
params = resp.parameters
toModel = params.get('emotionModel', None)
if not toModel:
return
return resp
logger.debug('Asked for model: {}'.format(toModel))
output = params.get('conversion', None)
......@@ -257,7 +199,8 @@ class Senpy(object):
try:
fromModel = plugin.get('onyx:usesEmotionModel', None)
candidates[plugin.id] = next(self._conversion_candidates(fromModel, toModel))
logger.debug('Analysis plugin {} uses model: {}'.format(plugin.id, fromModel))
logger.debug('Analysis plugin {} uses model: {}'.format(
plugin.id, fromModel))
except StopIteration:
e = Error(('No conversion plugin found for: '
'{} -> {}'.format(fromModel, toModel)),
......@@ -266,6 +209,7 @@ class Senpy(object):
e.parameters = params
raise e
newentries = []
done = []
for i in resp.entries:
if output == "full":
newemotions = copy.deepcopy(i.emotions)
......@@ -274,8 +218,7 @@ class Senpy(object):
for j in i.emotions:
plugname = j['prov:wasGeneratedBy']
candidate = candidates[plugname]
resp.analysis.append({'plugin': candidate,
'parameters': params})
done.append({'plugin': candidate, 'parameters': params})
for k in candidate.convert(j, fromModel, toModel, params):
k.prov__wasGeneratedBy = candidate.id
if output == 'nested':
......@@ -284,12 +227,80 @@ class Senpy(object):
i.emotions = newemotions
newentries.append(i)
resp.entries = newentries
return resp
def _conversion_candidates(self, fromModel, toModel):
candidates = self.plugins(plugin_type=plugins.EmotionConversion)
for candidate in candidates:
for pair in candidate.onyx__doesConversion:
logging.debug(pair)
if candidate.can_convert(fromModel, toModel):
yield candidate
def postprocess(self, response):
'''
Transform the results from the analysis plugins.
It has some pre-defined post-processing like emotion conversion,
and it also allows plugins to auto-select themselves.
'''
response = self.convert_emotions(response)
for plug in self.plugins(plugin_type=plugins.PostProcessing):
if plug.check(response, response.analysis):
response = plug.process(response)
return response
def _get_datasets(self, request):
if not self.datasets:
raise Error(
status=404,
message=("No datasets found."
" Please verify DatasetManager"))
datasets_name = request.parameters.get('dataset', None).split(',')
for dataset in datasets_name:
if dataset not in self.datasets:
logger.debug(("The dataset '{}' is not valid\n"
"Valid datasets: {}").format(
dataset, self.datasets.keys()))
raise Error(
status=404,
message="The dataset '{}' is not valid".format(dataset))
dm = gsitk_compat.DatasetManager()
datasets = dm.prepare_datasets(datasets_name)
return datasets
@property
def datasets(self):
self._dataset_list = {}
dm = gsitk_compat.DatasetManager()
for item in dm.get_datasets():
for key in item:
if key in self._dataset_list:
continue
properties = item[key]
properties['@id'] = key
self._dataset_list[key] = properties
return self._dataset_list
def evaluate(self, params):
logger.debug("evaluating request: {}".format(params))
results = AggregatedEvaluation()
results.parameters = params
datasets = self._get_datasets(results)
plugins = self._get_plugins(results)
for eval in plugins.evaluate(plugins, datasets):
results.evaluations.append(eval)
if 'with_parameters' not in results.parameters:
del results.parameters
logger.debug("Returning evaluation result: {}".format(results))
return results
@property
def default_plugin(self):
if not self._default or not self._default.is_activated:
candidates = self.plugins(plugin_type='analysisPlugin',
is_activated=True)
candidates = self.plugins(
plugin_type='analysisPlugin', is_activated=True)
if len(candidates) > 0:
self._default = candidates[0]
else:
......@@ -299,7 +310,7 @@ class Senpy(object):
@default_plugin.setter
def default_plugin(self, value):
if isinstance(value, Plugin):
if isinstance(value, plugins.Plugin):
if not value.is_activated:
raise AttributeError('The default plugin has to be activated.')
self._default = value
......@@ -351,7 +362,8 @@ class Senpy(object):
logger.info("Activating plugin: {}".format(plugin.name))
if sync or not getattr(plugin, 'async', True) or getattr(plugin, 'sync', False):
if sync or not getattr(plugin, 'async', True) or getattr(
plugin, 'sync', False):
return self._activate(plugin)
else:
th = Thread(target=partial(self._activate, plugin))
......@@ -374,7 +386,8 @@ class Senpy(object):
self._set_active(plugin, False)
if sync or not getattr(plugin, 'async', True) or not getattr(plugin, 'sync', False):
if sync or not getattr(plugin, 'async', True) or not getattr(
plugin, 'sync', False):
self._deactivate(plugin)
else:
th = Thread(target=partial(self._deactivate, plugin))
......
from future import standard_library
standard_library.install_aliases()
from future.utils import with_metaclass
from functools import partial
......@@ -10,7 +9,6 @@ import os
import re
import pickle
import logging
import copy
import pprint
import inspect
......@@ -26,7 +24,6 @@ from .. import api
from .. import gsitk_compat
from .. import testing
logger = logging.getLogger(__name__)
......@@ -46,16 +43,19 @@ class PluginMeta(models.BaseMeta):
if doc:
attrs['description'] = doc
else:
logger.warn(('Plugin {} does not have a description. '
'Please, add a short summary to help other developers').format(name))
logger.warning(
('Plugin {} does not have a description. '
'Please, add a short summary to help other developers'
).format(name))
cls = super(PluginMeta, mcs).__new__(mcs, name, bases, attrs)
if alias in mcs._classes:
if os.environ.get('SENPY_TESTING', ""):
raise Exception(('The type of plugin {} already exists. '
'Please, choose a different name').format(name))
raise Exception(
('The type of plugin {} already exists. '
'Please, choose a different name').format(name))
else:
logger.warn('Overloading plugin class: {}'.format(alias))
logger.warning('Overloading plugin class: {}'.format(alias))
mcs._classes[alias] = cls
return cls
......@@ -87,10 +87,12 @@ class Plugin(with_metaclass(PluginMeta, models.Plugin)):
if info:
self.update(info)
self.validate()
self.id = 'endpoint:plugins/{}_{}'.format(self['name'], self['version'])
self.id = 'endpoint:plugins/{}_{}'.format(self['name'],
self['version'])
self.is_activated = False
self._lock = threading.Lock()
self._directory = os.path.abspath(os.path.dirname(inspect.getfile(self.__class__)))
self._directory = os.path.abspath(
os.path.dirname(inspect.getfile(self.__class__)))
data_folder = data_folder or os.getcwd()
subdir = os.path.join(data_folder, self.name)
......@@ -118,7 +120,8 @@ class Plugin(with_metaclass(PluginMeta, models.Plugin)):
if x not in self:
missing.append(x)
if missing:
raise models.Error('Missing configuration parameters: {}'.format(missing))
raise models.Error(
'Missing configuration parameters: {}'.format(missing))
def get_folder(self):
return os.path.dirname(inspect.getfile(self.__class__))
......@@ -129,22 +132,60 @@ class Plugin(with_metaclass(PluginMeta, models.Plugin)):
def deactivate(self):
pass
def process(self, request, **kwargs):
"""
An implemented plugin should override this method.
Here, we assume that a process_entries method exists."""
newentries = list(
self.process_entries(request.entries, request.parameters))
request.entries = newentries
return request
def process_entries(self, entries, parameters):
for entry in entries:
self.log.debug('Processing entry with plugin {}: {}'.format(
self, entry))
results = self.process_entry(entry, parameters)
if inspect.isgenerator(results):
for result in results:
yield result
else:
yield results
def process_entry(self, entry, parameters):
"""
This base method is here to adapt plugins which only
implement the *process* function.
Note that this method may yield an annotated entry or a list of
entries (e.g. in a tokenizer)
"""
raise NotImplementedError(
'You need to implement process, process_entries or process_entry in your plugin'
)
def test(self, test_cases=None):
if not test_cases:
if not hasattr(self, 'test_cases'):
raise AttributeError(('Plugin {} [{}] does not have any defined '
'test cases').format(self.id,
inspect.getfile(self.__class__)))
raise AttributeError(
('Plugin {} [{}] does not have any defined '
'test cases').format(self.id,
inspect.getfile(self.__class__)))
test_cases = self.test_cases
for case in test_cases:
try:
self.test_case(case)
self.log.debug('Test case passed:\n{}'.format(pprint.pformat(case)))
self.log.debug('Test case passed:\n{}'.format(
pprint.pformat(case)))
except Exception as ex:
self.log.warn('Test case failed:\n{}'.format(pprint.pformat(case)))
self.log.warning('Test case failed:\n{}'.format(
pprint.pformat(case)))
raise
def test_case(self, case, mock=testing.MOCK_REQUESTS):
if 'entry' not in case and 'input' in case:
entry = models.Entry(_auto_id=False)
entry.nif__isString = case['input']
case['entry'] = entry
entry = models.Entry(case['entry'])
given_parameters = case.get('params', case.get('parameters', {}))
expected = case.get('expected', None)
......@@ -152,21 +193,25 @@ class Plugin(with_metaclass(PluginMeta, models.Plugin)):
responses = case.get('responses', [])
try:
params = api.parse_params(given_parameters, self.extra_params)
request = models.Response()
request.parameters = api.parse_params(given_parameters,
self.extra_params)
request.entries = [
entry,
]
method = partial(self.analyse_entries, [entry, ], params)
method = partial(self.process, request)
if mock:
res = list(method())
res = method()
else:
with testing.patch_all_requests(responses):
res = list(method())
res = method()
if not isinstance(expected, list):
expected = [expected]
utils.check_template(res, expected)
for r in res:
r.validate()
utils.check_template(res.entries, expected)
res.validate()
except models.Error:
if should_fail:
return
......@@ -203,40 +248,26 @@ class Analysis(Plugin):
A subclass of Plugin that analyses text and provides an annotation.
'''
def analyse(self, *args, **kwargs):
raise NotImplementedError(
'Your plugin should implement either analyse or analyse_entry')
def analyse(self, request, parameters):
return super(Analysis, self).process(request)
def analyse_entry(self, entry, parameters):
""" An implemented plugin should override this method.
This base method is here to adapt old style plugins which only
implement the *analyse* function.
Note that this method may yield an annotated entry or a list of
entries (e.g. in a tokenizer)
"""
text = entry['nif:isString']
params = copy.copy(parameters)
params['input'] = text
results = self.analyse(**params)
for i in results.entries:
def analyse_entries(self, entries, parameters):
for i in super(Analysis, self).process_entries(entries, parameters):
yield i
def analyse_entries(self, entries, parameters):
for entry in entries:
self.log.debug('Analysing entry with plugin {}: {}'.format(self, entry))
results = self.analyse_entry(entry, parameters)
if inspect.isgenerator(results):
for result in results:
yield result
else:
yield results
def process(self, request, **kwargs):
return self.analyse(request, request.parameters)
def test_case(self, case):
if 'entry' not in case and 'input' in case:
entry = models.Entry(_auto_id=False)
entry.nif__isString = case['input']
case['entry'] = entry
super(Analysis, self).test_case(case)
def process_entries(self, entries, parameters):
for i in self.analyse_entries(entries, parameters):
yield i
def process_entry(self, entry, parameters, **kwargs):
if hasattr(self, 'analyse_entry'):
for i in self.analyse_entry(entry, parameters):
yield i
else:
super(Analysis, self).process_entry(entry, parameters, **kwargs)
AnalysisPlugin = Analysis
......@@ -247,7 +278,20 @@ class Conversion(Plugin):
A subclass of Plugins that convert between different annotation models.
e.g. a conversion of emotion models, or normalization of sentiment values.
'''
pass
def process(self, response, plugins=None, **kwargs):
plugins = plugins or []
newentries = []
for entry in response.entries:
newentries.append(
self.convert_entry(entry, response.parameters, plugins))
response.entries = newentries
return response
def convert_entry(self, entry, parameters, conversions_applied):
raise NotImplementedError(
'You should implement a way to convert each entry, or a custom process method'
)
ConversionPlugin = Conversion
......@@ -284,12 +328,28 @@ class EmotionConversion(Conversion):
'''
A subclass of Conversion that converts emotion annotations using different models
'''
pass
def can_convert(self, fromModel, toModel):
'''
Whether this plugin can convert from fromModel to toModel.
If fromModel is None, it is interpreted as "any Model"
'''
for pair in self.onyx__doesConversion:
if (pair['onyx:conversionTo'] == toModel) and \
((fromModel is None) or (pair['onyx:conversionFrom'] == fromModel)):
return True
return False
EmotionConversionPlugin = EmotionConversion
class PostProcessing(Plugin):
def check(self, request, plugins):
'''Should this plugin be run for this request?'''
return False
class Box(AnalysisPlugin):
'''