from future.utils import iteritems
from .models import Error
from .models import Error, Results, Entry, from_string
import logging
logger = logging.getLogger(__name__)
"algorithm": {
"aliases": ["algorithm", "a", "algo"],
"aliases": ["algorithms", "a", "algo"],
"required": False,
"outformat": {
"@id": "outformat",
"aliases": ["outformat", "o"],
"default": "json-ld",
"required": True,
"options": ["json-ld", "turtle"],
"description": ("Algorithms that will be used to process the request."
"It may be a list of comma-separated names."),
"expanded-jsonld": {
"@id": "expanded-jsonld",
"aliases": ["expanded", "expanded-jsonld"],
"aliases": ["expanded"],
"required": True,
"default": 0
"emotionModel": {
"@id": "emotionModel",
"aliases": ["emotionModel", "emoModel"],
"required": False
"with_parameters": {
"aliases": ['withparameters',
"options": "boolean",
"default": False,
"required": True
"plugin_type": {
"@id": "pluginType",
"description": 'What kind of plugins to list',
"aliases": ["pluginType", "plugin_type"],
"aliases": ["pluginType"],
"required": True,
"default": "analysisPlugin"
"conversion": {
"@id": "conversion",
"description": "How to show the elements that have (not) been converted",
"outformat": {
"@id": "outformat",
"aliases": ["o"],
"default": "json-ld",
"required": True,
"options": ["filtered", "nested", "full"],
"default": "full"
"options": ["json-ld", "turtle"],
"help": {
"@id": "help",
"description": "Show additional help to know more about the possible parameters",
"aliases": ["help", "h"],
"aliases": ["h"],
"required": True,
"options": ["True", "False"],
"default": "False"
"options": "boolean",
"default": False
"emotionModel": {
"@id": "emotionModel",
"aliases": ["emoModel"],
"required": False
"conversion": {
"@id": "conversion",
"description": "How to show the elements that have (not) been converted",
"required": True,
"options": ["filtered", "nested", "full"],
"default": "full"
"inHeaders": {
"aliases": ["inHeaders", "headers"],
"aliases": ["headers"],
"required": True,
"default": "0"
"default": False,
"options": "boolean"
"plugin_folder": {
"aliases": ["plugin_folder", "folder"],
"aliases": ["folder"],
"required": True,
"default": "."
"input": {
"@id": "input",
"aliases": ["i", "input"],
"aliases": ["i"],
"required": True,
"help": "Input text"
"informat": {
"@id": "informat",
"aliases": ["f", "informat"],
"required": False,
"default": "text",
"options": ["turtle", "text", "json-ld"],
"intype": {
"@id": "intype",
"aliases": ["intype", "t"],
"aliases": ["t"],
"required": False,
"default": "direct",
"options": ["direct", "url", "file"],
"informat": {
"@id": "informat",
"aliases": ["f"],
"required": False,
"default": "text",
"options": ["turtle", "text", "json-ld"],
"language": {
"@id": "language",
"aliases": ["language", "l"],
"aliases": ["l"],
"required": False,
"prefix": {
"@id": "prefix",
"aliases": ["prefix", "p"],
"aliases": ["p"],
"required": True,
"default": "",
"urischeme": {
"@id": "urischeme",
"aliases": ["urischeme", "u"],
"aliases": ["u"],
"required": False,
"default": "RFC5147String",
"options": "RFC5147String"
def parse_params(indict, spec=NIF_PARAMS):
logger.debug("Parsing: {}\n{}".format(indict, spec))
def parse_params(indict, *specs):
if not specs:
specs = [NIF_PARAMS]
logger.debug("Parsing: {}\n{}".format(indict, specs))
outdict = indict.copy()
for param, options in iteritems(spec):
if param[0] != "@": # Exclude json-ld properties
for alias in options.get("aliases", []):
if alias in indict:
outdict[param] = indict[alias]
if param not in outdict:
if options.get("required", False) and "default" not in options:
wrong_params[param] = spec[param]
if "default" in options:
outdict[param] = options["default"]
if "options" in spec[param] and \
outdict[param] not in spec[param]["options"]:
wrong_params[param] = spec[param]
for spec in specs:
for param, options in iteritems(spec):
if param[0] != "@": # Exclude json-ld properties
for alias in options.get("aliases", []):
# Replace each alias with the correct name of the parameter
if alias in indict and alias is not param:
outdict[param] = indict[alias]
del indict[alias]
if param not in outdict:
if options.get("required", False) and "default" not in options:
wrong_params[param] = spec[param]
if "default" in options:
outdict[param] = options["default"]
elif "options" in spec[param]:
if spec[param]["options"] == "boolean":
outdict[param] = outdict[param] in [None, True, 'true', '1']
elif outdict[param] not in spec[param]["options"]:
wrong_params[param] = spec[param]
if wrong_params:
logger.debug("Error parsing: %s", wrong_params)
message = Error(
errors={param: error
for param, error in iteritems(wrong_params)})
raise message
if 'algorithm' in outdict and isinstance(outdict['algorithm'], str):
outdict['algorithm'] = outdict['algorithm'].split(',')
return outdict
def get_extra_params(request, plugin=None):
params = request.parameters.copy()
if plugin:
extra_params = parse_params(params, plugin.get('extra_params', {}))
return params
def parse_call(params):
'''Return a results object based on the parameters used in a call/request.
params = parse_params(params, NIF_PARAMS)
if params['informat'] == 'text':
results = Results()
entry = Entry(nif__isString=params['input'])
elif params['informat'] == 'json-ld':
results = from_string(params['input'], cls=Results)
raise NotImplemented('Informat {} is not implemented'.format(params['informat']))
results.parameters = params
return results
from flask import (Blueprint, request, current_app, render_template, url_for,
from .models import Error, Response, Plugins, read_schema
from .api import WEB_PARAMS, API_PARAMS, CLI_PARAMS, NIF_PARAMS, parse_params
from .models import Error, Response, Help, Plugins, read_schema
from . import api
from .version import __version__
from functools import wraps
raise Error(message="Invalid data")
return indict
def index():
return render_template("index.html", version=__version__)
......@@ -75,20 +76,16 @@ def basic_api(f):
def decorated_function(*args, **kwargs):
raw_params = get_params(request)
headers = {'X-ORIGINAL-PARAMS': json.dumps(raw_params)}
# Get defaults
web_params = parse_params({}, spec=WEB_PARAMS)
api_params = parse_params({}, spec=API_PARAMS)
outformat = 'json-ld'
print('Getting request:')
web_params = parse_params(raw_params, spec=WEB_PARAMS)
api_params = parse_params(raw_params, spec=API_PARAMS)
if hasattr(request, 'params'):
params = api.parse_params(raw_params, api.WEB_PARAMS, api.API_PARAMS)
if hasattr(request, 'parameters'):
request.params = api_params
request.parameters = params
response = f(*args, **kwargs)
except Error as ex:
response = ex
if current_app.debug:
in_headers = web_params['inHeaders'] != "0"
expanded = api_params['expanded-jsonld']
outformat = api_params['outformat']
in_headers = params['inHeaders']
expanded = params['expanded-jsonld']
outformat = params['outformat']
return response.flask(
prefix=url_for('.api', _external=True),
prefix=url_for('.api_root', _external=True),
......@@ -115,14 +112,14 @@ def basic_api(f):
@api_blueprint.route('/', methods=['POST', 'GET'])
def api():
phelp = request.params.get('help')
if phelp == "True":
dic = dict(API_PARAMS, **NIF_PARAMS)
response = Response(dic)
def api_root():
if request.parameters['help']:
dic = dict(api.API_PARAMS, **api.NIF_PARAMS)
response = Help(parameters=dic)
return response
response = current_app.senpy.analyse(**request.params)
req = api.parse_call(request.parameters)
response = current_app.senpy.analyse(req)
return response
def plugins():
sp = current_app.senpy
ptype = request.params.get('plugin_type')
ptype = request.parameters.get('plugin_type')
plugins = sp.filter_plugins(plugin_type=ptype)
dic = Plugins(plugins=list(plugins.values()))
return dic
import sys
from .models import Error
from .api import parse_params, CLI_PARAMS
from .extensions import Senpy
from . import api
def argv_to_dict(argv):
if argv[i][0] == '-':
key = argv[i].strip('-')
value = argv[i + 1] if len(argv) > i + 1 else None
if value and value[0] == '-':
cli_dict[key] = ""
if not value or value[0] == '-':
cli_dict[key] = True
cli_dict[key] = value
return cli_dict
def parse_cli(argv):
cli_dict = argv_to_dict(argv)
cli_params = parse_params(cli_dict, spec=CLI_PARAMS)
return cli_params, cli_dict
def main_function(argv):
'''This is the method for unit testing
cli_params, cli_dict = parse_cli(argv)
plugin_folder = cli_params['plugin_folder']
params = api.parse_params(argv_to_dict(argv),
plugin_folder = params['plugin_folder']
sp = Senpy(default_plugins=False, plugin_folder=plugin_folder)
res = sp.analyse(**cli_dict)
request = api.parse_call(params)
res = sp.analyse(request)
return res
from future import standard_library
from . import plugins
from . import plugins, api
from .plugins import SenpyPlugin
from .models import Error, Entry, Results, from_string
from .models import Error
from .blueprints import api_blueprint, demo_blueprint, ns_blueprint
from .api import API_PARAMS, NIF_PARAMS, parse_params
from threading import Thread
......@@ -72,22 +71,20 @@ class Senpy(object):
logger.debug("Not a folder: %s", folder)
def _find_plugins(self, params):
def _get_plugins(self, request):
if not self.analysis_plugins:
raise Error(
message=("No plugins found."
" Please install one."))
api_params = parse_params(params, spec=API_PARAMS)
algos = None
if "algorithm" in api_params and api_params["algorithm"]:
algos = api_params["algorithm"].split(',')
elif self.default_plugin:
algos = [, ]
raise Error(
message="No default plugin found, and None provided")
algos = request.parameters.get('algorithm', None)
if not algos:
if self.default_plugin:
algos = [, ]
raise Error(
message="No default plugin found, and None provided")
plugins = list()
for algo in algos:
......@@ -108,66 +105,46 @@ class Senpy(object):
return plugins
def _get_params(self, params, plugin=None):
nif_params = parse_params(params, spec=NIF_PARAMS)
if plugin:
extra_params = plugin.get('extra_params', {})
specific_params = parse_params(params, spec=extra_params)
return nif_params
def _get_entries(self, params):
if params['informat'] == 'text':
results = Results()
entry = Entry(text=params['input'])
elif params['informat'] == 'json-ld':
results = from_string(params['input'], cls=Results)
raise NotImplemented('Informat {} is not implemented'.format(params['informat']))
return results
def _process_entries(self, entries, plugins, nif_params):
def _process_entries(self, entries, req, plugins):
if not plugins:
for i in entries:
yield i
plugin = plugins[0]
specific_params = self._get_params(nif_params, plugin)
specific_params = api.get_extra_params(req, plugin)
req.analysis.append({'plugin': plugin,
'parameters': specific_params})
results = plugin.analyse_entries(entries, specific_params)
for i in self._process_entries(results, plugins[1:], nif_params):
for i in self._process_entries(results, req, plugins[1:]):
yield i
def _process_response(self, resp, plugins, nif_params):
entries = resp.entries
resp.entries = []
for plug in plugins:
for i in self._process_entries(entries, plugins, nif_params):
return resp
def analyse(self, **api_params):
def analyse(self, request):
Main method that analyses a request, either from CLI or HTTP.
It uses a dictionary of parameters, provided by the user.
It takes a processed request, provided by the user, as returned
by api.parse_call().
logger.debug("analysing with params: {}".format(api_params))
plugins = self._find_plugins(api_params)
nif_params = self._get_params(api_params)
resp = self._get_entries(nif_params)
if 'with_parameters' in api_params:
resp.parameters = nif_params
logger.debug("analysing request: {}".format(request))
resp = self._process_response(resp, plugins, nif_params)
self.convert_emotions(resp, plugins, nif_params)
logger.debug("Returning analysis result: {}".format(resp))
entries = request.entries
request.entries = []
plugins = self._get_plugins(request)
results = request
for i in self._process_entries(entries, results, plugins):
if 'with_parameters' not in results.parameters:
del results.parameters
logger.debug("Returning analysis result: {}".format(results))
except (Error, Exception) as ex:
if not isinstance(ex, Error):
ex = Error(message=str(ex), status=500)
msg = "Error during analysis: {} \n\t{}".format(ex,
ex = Error(message=msg, status=500)
logger.exception('Error returning analysis result')
raise ex
return resp
results.analysis = [i['plugin'].id for i in results.analysis]
return results
def _conversion_candidates(self, fromModel, toModel):
candidates = self.filter_plugins(plugin_type='emotionConversionPlugin')
......@@ -180,7 +157,7 @@ class Senpy(object):
# logging.debug('Found candidate: {}'.format(candidate))
yield candidate
def convert_emotions(self, resp, plugins, params):
def convert_emotions(self, resp):
Conversion of all emotions in a response **in place**.
In addition to converting from one model to another, it has
......@@ -188,6 +165,8 @@ class Senpy(object):
Needless to say, this is far from an elegant solution, but it works.
@todo refactor and clean up
plugins = [i['plugin'] for i in resp.analysis]
params = resp.parameters
toModel = params.get('emotionModel', None)
if not toModel:
......@@ -215,7 +194,8 @@ class Senpy(object):
for j in i.emotions:
plugname = j['prov:wasGeneratedBy']
candidate = candidates[plugname]
resp.analysis.append({'plugin': candidate,
'parameters': params})
for k in candidate.convert(j, fromModel, toModel, params):
k.prov__wasGeneratedBy =
if output == 'nested':
......@@ -224,7 +204,6 @@ class Senpy(object):
i.emotions = newemotions
resp.entries = newentries
resp.analysis = list(set(resp.analysis))
def default_plugin(self):
obj = self
if hasattr(obj, "jsonld"):
obj = obj.jsonld()
jsonschema.validate(obj, self.schema)
def __str__(self):
return str(self.serialize())
return d
_subtypes = {}
def register(rsubclass, rtype=None):
_subtypes[rtype or rsubclass.__name__] = rsubclass
_subtypes = {}
def from_dict(indict, cls=None):
if not cls:
target = indict.get('@type', None)
......@@ -286,15 +286,31 @@ def from_json(injson):
return from_dict(indict)
def from_schema(name, schema_file=None, base_classes=None):
def from_schema(name, schema=None, schema_file=None, base_classes=None):
base_classes = base_classes or []
schema_file = schema_file or '{}.json'.format(name)
class_name = '{}{}'.format(name[0].upper(), name[1:])
newclass = type(class_name, tuple(base_classes), {})
setattr(newclass, '@type', name)
setattr(newclass, 'schema', read_schema(schema_file))
setattr(newclass, 'class_name', class_name)
if '/' not in 'schema_file':
schema_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
schema_path = 'file://' + schema_file
with open(schema_file) as f:
schema = json.load(f)
dct = {}
resolver = jsonschema.RefResolver(schema_path, schema)
dct['@type'] = name
dct['_schema_file'] = schema_file
dct['schema'] = schema
dct['_validator'] = jsonschema.Draft4Validator(schema, resolver=resolver)
newclass = type(class_name, tuple(base_classes), dct)
register(newclass, name)
return newclass
......@@ -334,6 +351,9 @@ class Error(SenpyMixin, Exception):
self._error = _ErrorModel(message=message, *args, **kwargs)
self.message = message
def validate(self, obj=None):
def __getitem__(self, key):
return self._error[key]
Note that this method may yield an annotated entry or a list of
entries (e.g. in a tokenizer)
text = entry['text']
text = entry['nif:isString']
params = copy.copy(parameters)
params['input'] = text
results = self.analyse(**params)
from senpy.models import Entry
from nltk.tokenize.punkt import PunktSentenceTokenizer
from nltk.tokenize.simple import LineTokenizer
import nltk
class SplitPlugin(AnalysisPlugin):
def activate(self):'punkt')
def analyse_entry(self, entry, params):
chunker_type = params.get("delimiter", "sentence")
original_text = entry.get('nif:isString', None)
if chunker_type == "sentence":
tokenizer = PunktSentenceTokenizer()
if chunker_type == "paragraph":
tokenizer = LineTokenizer()
chars = tokenizer.span_tokenize(original_text)
for i, chunk in enumerate(tokenizer.tokenize(original_text)):
e = Entry()
e['nif:isString'] = chunk
if = + "#char={},{}".format(chars[i][0], chars[i][1])
yield e
test_cases = [
'entry': {
'nif:isString': 'Hello. World.'
'params': {
'delimiter': 'sentence',
'expected': [
'nif:isString': 'Hello.'
'nif:isString': 'World.'
'entry': {
"id": ":test",