Commit a4586b98 authored by J. Fernando Sánchez's avatar J. Fernando Sánchez
Browse files

Separate gsicrawler into gsicrawler+orchestrator

parent 1b8bdd5a
......@@ -21,7 +21,7 @@ For convenience, this script is included in the gsicrawler container.
You just need to copy your backup file (a jsonlines file named `lateralbackup.jsons`) in the gsicrawler folder, and run this command:
```
docker-compose exec gsicrawler python loadBackup.py
docker-compose exec orchestrator python loadBackup.py
```
......@@ -30,14 +30,16 @@ Services
This demo includes four services:
* Luigi orchestrator (gsicrawler). This service provides both the task executer and a web interface to check your workflows status.
The tasks are executed periodically according to the period in `crontasks.py`.
* Luigi orchestrator (orchestrator). This service provides both the task executer and a web interface to check your workflows status.
The tasks are executed periodically according to the period in `tasks.py:Main`.
By default, the period is 24h.
The web interface shows the status of the tasks, and it is available on http://localhost:8082
* GSICrawler: a service to get data from different sources (e.g. Twitter, Facebook) (available on http://localhost:5000 and http://localhost:5555 (flower)).
* Elasticsearch: the official elasticsearch image. It is available on localhost:19200
* Senpy, used for sentiment and semantic analysis. It is mapped to http://localhost:5000/
* Somedi dashboard (sefarad), a website developed with sefarad. It displays the data stored in elasticsearch.
It is available on http://localhost:8080.
* Redis: a dependency of GSICrawler.
The docker-compose definition adds all these services to the same network, so they can communicate with each other using the service name, without exposing external ports.
The endpoints used in each service (e.g. the elasticsearch endpoint in the gsicrawler service) are configurable through environment variables.
......
......@@ -12,5 +12,3 @@ services:
ES_ENDPOINT_EXTERNAL: "localhost:19200"
volumes:
- ./dashboard:/usr/src/app
networks:
- sefarad-network
version: '2.1'
services:
elasticsearch:
image: "docker.elastic.co/elasticsearch/elasticsearch:5.5.2"
ulimits:
memlock:
soft: -1
hard: -1
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "xpack.security.enabled=false"
- "http.cors.enabled=true"
- 'http.cors.allow-origin=*'
volumes:
- somedi:/usr/share/elasticsearch/data/
ports:
- 19200:9200
- 19300:9300
networks:
- sefarad-network
senpy:
image: gsiupm/senpy-taiger:1.0.0
ports:
- "5000:5000"
networks:
- sefarad-network
sefarad:
build: ./dashboard/
image: registry.cluster.gsi.dit.upm.es/sefarad/dashboard-somedi/web:${VERSION}
......@@ -37,20 +8,28 @@ services:
environment:
- ES_ENDPOINT_EXTERNAL=${ES_ENDPOINT_EXTERNAL}
- FUSEKI_ENDPOINT_EXTERNAL=${FUSEKI_ENDPOINT_EXTERNAL}
networks:
- sefarad-network
gsicrawler:
build: ./gsicrawler/
image: registry.cluster.gsi.dit.upm.es/sefarad/dashboard-somedi/gsicrawler:${VERSION}
orchestrator:
image: registry.cluster.gsi.dit.upm.es/social/somedi/orchestrator:${VERSION}
build: ./orchestrator
volumes:
- ./gsicrawler/:/usr/src/app
- "./orchestrator/tasks.py:/soneti/tasks.py"
- "./backup.jsons:/soneti/backupjsons"
ports:
- "8082:8082"
environment:
- "GSICRAWLER_ENDPOINT=http://gsicrawler:5000/api/v1"
- "SENPY_ENDPOINT=http://senpy:5000/api/"
- "ES_HOST=elasticsearch"
# "External" and accessory Services
gsicrawler:
image: gsiupm/gsicrawler
ports:
- "8082:8082"
networks:
- sefarad-network
- "5000:5000"
- "5555:5555"
environment:
- ES_ENDPOINT=${ES_ENDPOINT-elasticsearch}
- GSICRAWLER_BROKER=redis://redis:6379/
- GSICRAWLER_RESULT_BACKEND=redis://redis:6379/
- ES_HOST=${ES_ENDPOINT-elasticsearch}
- ES_PORT=${ES_PORT-9200}
- FUSEKI_ENDPOINT=${FUSEKI_ENDPOINT}
- FUSEKI_PORT=${FUSEKI_PORT}
......@@ -62,15 +41,33 @@ services:
- TWITTER_ACCESS_TOKEN_SECRET=${TWITTER_ACCESS_TOKEN_SECRET}
- FACEBOOK_APP_ID=${FACEBOOK_APP_ID}
- FACEBOOK_APP_SECRET=${FACEBOOK_APP_SECRET}
- SENPY_ENDPOINT=${SENPY_ENDPOINT}
- SENPY_ALGORITHM=${SENPY_ALGORITHM:-sentiment-taiger3c}
depends_on:
- elasticsearch
networks:
sefarad-network:
driver: bridge
senpy:
image: gsiupm/senpy-taiger:1.0.0
command:
- "--default-plugins"
ports:
- "5001:5000"
redis:
image: redis
elasticsearch:
image: "docker.elastic.co/elasticsearch/elasticsearch:5.5.2"
ulimits:
memlock:
soft: -1
hard: -1
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "xpack.security.enabled=false"
- "http.cors.enabled=true"
- 'http.cors.allow-origin=*'
volumes:
- es-data:/usr/share/elasticsearch/data/
ports:
- 9200:9200
- 9300:9300
volumes:
somedi:
es-data:
FROM python:3
RUN apt-get update && apt-get install -y gettext
ADD requirements.txt .
RUN pip install -r requirements.txt
WORKDIR /usr/src/app
ADD . /usr/src/app
ADD run-cron.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/run-cron.sh
ENTRYPOINT ["run-cron.sh"]
CMD ["cron","100", "twitter_query=lateral+#restaurante", "twitter_query=@rest_lateral", "twitter_query=@lateral+madrid", "twitter_query=@lateral+barcelona", "facebook=restauranteslateral", "timeline=rest_lateral"]
#CMD ["cron","100", "twitter_query=lateral+#restaurante"]
import datetime
import json
import random
import imp
import re
import requests
import os
import time
import luigi
from luigi.contrib.esindex import CopyToIndex
from scrapers.facebookScrapper import getFBPageFeedData
from scrapers.twitter import retrieve_tweets
from scrapers.twitter import retrieve_timeline
from scrapers.twitter import retrieve_old
from analyzers.analysis import semanticAnalysis
ES_ENDPOINT = os.environ['ES_ENDPOINT']
ES_PORT = os.environ['ES_PORT']
FUSEKI_PORT = os.environ['FUSEKI_PORT']
FUSEKI_ENDPOINT = os.environ['FUSEKI_ENDPOINT']
print('ES connection: {} : {}'.format(ES_ENDPOINT, ES_PORT))
class ScrapyTask(luigi.Task):
"""
Generates a local file containing 5 elements of data in JSON format.
"""
#: the date parameter.
#date = luigi.DateParameter(default=datetime.date.today())
#field = str(random.randint(0,10000)) + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
url = luigi.Parameter()
id = luigi.Parameter()
website = luigi.Parameter()
analysisType = luigi.Parameter()
num = luigi.Parameter()
def run(self):
"""
Writes data in JSON format into the task's output target.
The data objects have the following attributes:
* `_id` is the default Elasticsearch id field,
* `text`: the text,
* `date`: the day when the data was created.
"""
#today = datetime.date.today()
print(self.analysisType)
filePath = '/tmp/_scrapy-%s.json' % self.id
#scraperImported = imp.load_source(self.website, 'scrapers/%s.py' % (self.website))
#scraperImported.startScraping(self.url, filePath)
print(self.url, filePath, self.num)
if('facebook' in self.website):
print("facebook")
getFBPageFeedData (self.url,self.num,filePath)
if('timeline' in self.website):
print('timeline')
retrieve_timeline(self.url, filePath, self.num)
if('twitter_query' in self.website):
print('query')
retrieve_old(self.url, filePath, self.num)
if('all' in self.website):
getFBPageFeedData (self.url,self.num,filePath)
retrieve_tweets(self.url, filePath, self.num)
self.set_status_message("Scraped!")
def output(self):
"""
Returns the target output for this task.
In this case, a successful execution of this task will create a file on the local filesystem.
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.LocalTarget(path='/tmp/_scrapy-%s.json' % self.id)
class AnalysisTask(luigi.Task):
"""
Generates a local file containing 5 elements of data in JSON format.
"""
#: the date parameter.
#date = luigi.DateParameter(default=datetime.date.today())
#field = str(random.randint(0,10000)) + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
url = luigi.Parameter()
id = luigi.Parameter()
website = luigi.Parameter()
analysisType = luigi.Parameter()
num = luigi.Parameter()
def requires(self):
"""
This task's dependencies:
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
return ScrapyTask(self.url, self.id, self.website, self.analysisType, self.num)
def run(self):
"""
Writes data in JSON format into the task's output target.
The data objects have the following attributes:
* `_id` is the default Elasticsearch id field,
* `text`: the text,
* `date`: the day when the data was created.
"""
with self.output().open('w') as output:
with self.input().open('r') as infile:
for j,line in enumerate(infile):
i = json.loads(line)
self.set_status_message("Lines read: %d" % j)
i = semanticAnalysis(i)
output.write(json.dumps(i))
output.write('\n')
def output(self):
"""
Returns the target output for this task.
In this case, a successful execution of this task will create a file on the local filesystem.
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.LocalTarget(path='/tmp/_analyzed-%s.json' % self.id)
class FusekiTask(luigi.Task):
"""
This task loads JSON data contained in a :py:class:`luigi.target.Target` and insert into Fuseki platform as a semantic
"""
#: date task parameter (default = today)
url = luigi.Parameter()
id = luigi.Parameter()
website = luigi.Parameter()
analysisType = luigi.Parameter()
num = luigi.Parameter()
#file = str(random.randint(0,10000)) + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
def requires(self):
"""
This task's dependencies:
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
return AnalysisTask(self.url, self.id, self.website, self.analysisType, self.num)
def run(self):
"""
Receive data from Senpy and is indexed in Fuseki
"""
f = []
with self.input().open('r') as infile:
with self.output().open('w') as outfile:
for i, line in enumerate(infile):
self.set_status_message("Lines read: %d" % i)
w = json.loads(line)
#print(w)
try:
w["schema:headline"] = {"@value": w["schema:headline"],"@language": w["language_detected"]}
except:
w["schema:articleBody"] = {"@value": w["schema:articleBody"],"@language": w["language_detected"]}
f.append(w)
f = json.dumps(f)
self.set_status_message("JSON created")
#print(f)
#g = Graph().parse(data=f, format='json-ld')
r = requests.put('http://{fuseki}:{port}/somedi/data'.format(fuseki=FUSEKI_ENDPOINT,
port=FUSEKI_PORT),
headers={'Content-Type':'application/ld+json'},
data=f)
self.set_status_message("Data sent to fuseki")
outfile.write(f)
def output(self):
"""
Returns the target output for this task.
In this case, a successful execution of this task will create a file on the local filesystem.
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.LocalTarget(path='/tmp/_n3-%s.json' % self.id)
class Elasticsearch(CopyToIndex):
"""
This task loads JSON data contained in a :py:class:`luigi.target.Target` into an ElasticSearch index.
This task's input will the target returned by :py:meth:`~.Senpy.output`.
This class uses :py:meth:`luigi.contrib.esindex.CopyToIndex.run`.
After running this task you can run:
.. code-block:: console
$ curl "localhost:9200/example_index/_search?pretty"
to see the indexed documents.
To see the update log, run
.. code-block:: console
$ curl "localhost:9200/update_log/_search?q=target_index:example_index&pretty"
To cleanup both indexes run:
.. code-block:: console
$ curl -XDELETE "localhost:9200/example_index"
$ curl -XDELETE "localhost:9200/update_log/_query?q=target_index:example_index"
"""
#: date task parameter (default = today)
url = luigi.Parameter()
id = luigi.Parameter()
website = luigi.Parameter()
analysisType = luigi.Parameter()
num = luigi.Parameter()
#: the name of the index in ElasticSearch to be updated.
index = luigi.Parameter()
#: the name of the document type.
doc_type = luigi.Parameter()
#: the host running the ElasticSearch service.
host = ES_ENDPOINT
#: the port used by the ElasticSearch service.
port = ES_PORT
#: timeout for ES post
timeout = 100
print(host,port)
def requires(self):
"""
This task's dependencies:
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
return AnalysisTask(self.url, self.id, self.website, self.analysisType, self.num)
class PipelineTask(luigi.Task):
#: date task parameter (default = today)
url = luigi.Parameter()
id = luigi.Parameter()
website = luigi.Parameter()
analysisType = luigi.Parameter()
num = luigi.Parameter()
#: the name of the index in ElasticSearch to be updated.
index = luigi.Parameter()
#: the name of the document type.
doc_type = luigi.Parameter()
#: the host running the ElasticSearch service.
host = ES_ENDPOINT
#: the port used by the ElasticSearch service.
port = ES_PORT
def requires(self):
"""
This task's dependencies:
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
#yield FusekiTask(self.url, self.id, self.website, self.analysisType, self.num)
index=self.index
doc_type=self.doc_type
yield Elasticsearch(self.url, self.id, self.website, self.analysisType, self.num, index, doc_type)
if __name__ == "__main__":
luigi.run()
import requests
import json
import os
import json
import urllib
import requests
import time
#base_url = 'http://127.0.0.1:5000/?algo=sentiment140&i=%s'
API_KEY_MEANING_CLOUD = os.environ.get('API_KEY_MEANING_CLOUD')
SENPY_ENDPOINT = os.environ.get('SENPY_ENDPOINT', 'localhost:5000')
SENPY_ALGORITHM = os.environ.get('SENPY_ALGORITHM', 'sentiment140')
senpy_context = {
'@vocab': 'http://senpy.cluster.gsi.dit.upm.es/ns/',
'analysis': {'@container': '@set',
'@id': 'AnalysisInvolved',
'@type': '@id'},
'dc': 'http://dublincore.org/2012/06/14/dcelements#',
'emoml': 'http://www.gsi.dit.upm.es/ontologies/onyx/vocabularies/emotionml/ns#',
'emotions': {'@container': '@set', '@id': 'onyx:hasEmotionSet'},
'entities': {'@id': 'me:hasEntities'},
'entries': {'@container': '@set', '@id': 'prov:used'},
'marl': 'http://www.gsi.dit.upm.es/ontologies/marl/ns#',
'me': 'http://www.mixedemotions-project.eu/ns/model#',
'nif': 'http://persistence.uni-leipzig.org/nlp2rdf/ontologies/nif-core#',
'onyx': 'http://www.gsi.dit.upm.es/ontologies/onyx/ns#',
'onyx:conversionFrom': {'@type': '@id'},
'onyx:conversionTo': {'@type': '@id'},
'onyx:hasEmotion': {'@container': '@set'},
'onyx:hasEmotionCategory': {'@type': '@id'},
'onyx:usesEmotionModel': {'@type': '@id'},
'options': {'@container': '@set'},
'plugins': {'@container': '@set'},
'prov': 'http://www.w3.org/ns/prov#',
'prov:wasGeneratedBy': {'@type': '@id'},
'sentiments': {'@container': '@set', '@id': 'marl:hasOpinion'},
'suggestions': {'@container': '@set', '@id': 'me:hasSuggestions'},
'topics': {'@id': 'dc:subject'},
'wna': 'http://www.gsi.dit.upm.es/ontologies/wnaffect/ns#',
'xsd': 'http://www.w3.org/2001/XMLSchema#'
}
senpy_context.update({
'dbps':'http://www.openlinksw.com/schemas/dbpedia-spotlight#',
'dbpedia':'http://dbpedia.org/resource/',
'dbpedia-owl': 'http://dbpedia.org/ontology/',
'schema':'http://schema.org/',
'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
'marl:hasPolarity': {
'@type': '@id'
},
'rdfs:subClassOf': {
'@type': '@id'
},
'dbps:types': {
'@type': '@id'
},
'dbps:anyURI': {
'@type': '@id'
}
})
def getContext():
return senpy_context
# r = requests.get("http://latest.senpy.cluster.gsi.dit.upm.es/api/contexts/Context.jsonld")
# senpy_context = r.json()["@context"]
# senpy_context.update({
# 'dbps':'http://www.openlinksw.com/schemas/dbpedia-spotlight#',
# 'dbpedia':'http://dbpedia.org/resource/',
# 'dbpedia-owl': 'http://dbpedia.org/ontology/',
# 'schema':'http://schema.org/',
# 'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
# 'marl:hasPolarity': {
# '@type': '@id'
# },
# 'rdfs:subClassOf': {
# '@type': '@id'
# },
# 'dbps:types': {
# '@type': '@id'
# },
# 'dbps:anyURI': {
# '@type': '@id'
# }
# })
# return senpy_context
def semanticAnalysis(i, endpoint=SENPY_ENDPOINT, sleep=0.2, algo=SENPY_ALGORITHM, lang='es', **kwargs):
i["@context"] = getContext()
REQUEST_LONG = 3000
i_len = len(i["schema:articleBody"])
number_of_requests = (len(i["schema:articleBody"])//REQUEST_LONG)
entities_arr = []
sentiments_arr = []
topics_arr = []
for k in range(0,number_of_requests+1):
if i_len - int(REQUEST_LONG*(k+1)) > 0:
r = requests.post(endpoint+'/api/', data={'algo':algo, 'lang': lang, 'apiKey':API_KEY_MEANING_CLOUD, 'i':i["schema:articleBody"][REQUEST_LONG*k:REQUEST_LONG*k+REQUEST_LONG], **kwargs})
else:
r = requests.post(endpoint+'/api/', data={'algo':algo, 'lang': lang, 'apiKey':API_KEY_MEANING_CLOUD, 'i':i["schema:articleBody"][REQUEST_LONG*k:-1], **kwargs})
time.sleep(sleep)
r = r.json()
if not 'entries' in r:
continue
if type(r["entries"][0]["entities"]) is dict:
r["entries"][0]["entities"] = [r["entries"][0]["entities"]]
for x, index in enumerate(r["entries"][0]["entities"]):
index["nif:beginIndex"] = str(int(index["nif:beginIndex"]) + (REQUEST_LONG*k))
index["nif:endIndex"] = str(int(index["nif:endIndex"]) + (REQUEST_LONG*k))
if index["@type"] == 'ODENTITY_City':
try:
geor = requests.get("https://dbpedia.org/sparql?default-graph-uri=http%3A%2F%2Fdbpedia.org&query=select+%3Fcoordinates+where+%7B%0D%0A%0D%0Adbr%3A{}+georss%3Apoint+%3Fcoordinates%0D%0A%0D%0A%7D&format=application%2Fsparql-results%2Bjson".format(index.get("marl:describesObject", "").split('/')[-1]))
coords = geor.json()['results']['bindings'][0]['coordinates']['value'].split()
index['latitude'] = coords[0]
index['longitude'] = coords[1]
except (IndexError, json.decoder.JSONDecodeError):
pass
if index["@type"] in ['ODENTITY_Person', 'ODENTITY_FullName']:
try:
peopler = requests.get("https://dbpedia.org/sparql?default-graph-uri=http%3A%2F%2Fdbpedia.org&query=select+%3Fimage+where+%7B%0D%0A++dbr%3A{}+++dbo%3Athumbnail+%3Fimage%0D%0A%0D%0A%7D+LIMIT+100&format=application%2Fsparql-results%2Bjson&CXML_redir_for_subjs=121&CXML_redir_for_hrefs=&timeout=30000&debug=on&run=+Run+Query+".format(index.get("marl:describesObject", "").split('/')[-1]))
index['dbo:thumbnail'] = peopler.json()['results']['bindings'][0]['image']['value']
except (IndexError, json.decoder.JSONDecodeError):
pass
entities_arr.append(index)
if type(r["entries"][0]["sentiments"]) is dict:
r["entries"][0]["sentiments"] = [r["entries"][0]["sentiments"]]
for x, index in enumerate(r["entries"][0]["sentiments"]):
index["@id"] = i["@id"]+"#Sentiment{num}".format(num=x+k)
if x+k is 0:
i["marl:hasPolarity"] = index["marl:hasPolarity"]
if 'nif:beginIndex' in index:
index["nif:beginIndex"] = str(int(index["nif:beginIndex"]) + (REQUEST_LONG*k))
if 'nif:endIndex' in index:
index["nif:endIndex"] = str(int(index["nif:endIndex"]) + (REQUEST_LONG*k))
sentiments_arr.append(index)
if type(r["entries"][0]["topics"]) is dict:
r["entries"][0]["topics"] = [r["entries"][0]["topics"]]
for x, index in enumerate(r["entries"][0]["topics"]):