Commit 9292a6bb authored by militarpancho's avatar militarpancho
Browse files

Addded number of items as a luigi parameter

parent 86e2435b
......@@ -39,6 +39,8 @@ class ScrapyTask(luigi.Task):
analysisType = luigi.Parameter()
num = luigi.Parameter()
def run(self):
"""
......@@ -54,9 +56,9 @@ class ScrapyTask(luigi.Task):
#scraperImported = imp.load_source(self.website, 'scrapers/%s.py' % (self.website))
#scraperImported.startScraping(self.url, filePath)
print(self.url, filePath)
retrieveCnnNews(self.url, 10, filePath)
retrieveNytimesNews(self.url, 10, filePath)
retrieve_tweets(self.url, filePath, 10)
retrieveCnnNews(self.url, self.num, filePath)
retrieveNytimesNews(self.url, self.num, filePath)
retrieve_tweets(self.url, filePath, self.num)
def output(self):
"""
......@@ -86,13 +88,15 @@ class AnalysisTask(luigi.Task):
analysisType = luigi.Parameter()
num = luigi.Parameter()
def requires(self):
"""
This task's dependencies:
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
return ScrapyTask(self.url, self.id, self.analysisType)
return ScrapyTask(self.url, self.id, self.analysisType, self.num)
def run(self):
......@@ -133,6 +137,8 @@ class FusekiTask(luigi.Task):
id = luigi.Parameter()
analysisType = luigi.Parameter()
num = luigi.Parameter()
#file = str(random.randint(0,10000)) + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
def requires(self):
......@@ -141,7 +147,7 @@ class FusekiTask(luigi.Task):
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
return AnalysisTask(self.url, self.id, self.analysisType)
return AnalysisTask(self.url, self.id, self.analysisType, self.num)
def run(self):
"""
......@@ -202,6 +208,8 @@ class Elasticsearch(CopyToIndex):
analysisType = luigi.Parameter()
num = luigi.Parameter()
#: the name of the index in ElasticSearch to be updated.
index = luigi.Parameter()
#: the name of the document type.
......@@ -219,7 +227,7 @@ class Elasticsearch(CopyToIndex):
* :py:class:`~.SenpyTask`
:return: object (:py:class:`luigi.task.Task`)
"""
return AnalysisTask(self.url, self.id, self.analysisType)
return AnalysisTask(self.url, self.id, self.analysisType, self.num)
class PipelineTask(luigi.Task):
......@@ -231,6 +239,8 @@ class PipelineTask(luigi.Task):
analysisType = luigi.Parameter()
num = luigi.Parameter()
#: the name of the index in ElasticSearch to be updated.
index = luigi.Parameter()
#: the name of the document type.
......@@ -247,12 +257,12 @@ class PipelineTask(luigi.Task):
:return: object (:py:class:`luigi.task.Task`)
"""
yield FusekiTask(self.url, self.id, self.analysisType)
yield FusekiTask(self.url, self.id, self.analysisType, self.num)
index=self.index
doc_type=self.doc_type
yield Elasticsearch(self.url, self.id, self.analysisType, index, doc_type)
yield Elasticsearch(self.url, self.id, self.analysisType, self.num, index, doc_type)
if __name__ == "__main__":
#luigi.run(['--task', 'Elasticsearch'])
......
......@@ -6,20 +6,19 @@ import sys
s = sched.scheduler(time.time, time.sleep)
def runevery():
if len(sys.argv) > 1:
if sys.argv[1] == "tutorial2":
print("Tutorial 2 STARTED!")
identifier = time.time()
command = 'python -m luigi --module tutorialtask CrawlerTask --url "{url}" --id "{id}"'.format(url="isis",id=identifier)
subprocess.call(command.split(), shell= False)
elif sys.argv[1] == "tutorial3":
print("Tutorial 3 STARTED!")
identifier = time.time()
command = 'python -m luigi --module tutorialtask PipelineTask --index tutorial --doc-type news --url "{url}" --id "{id}"'.format(url="isis",id=identifier)
subprocess.call(command.split(), shell= False)
if sys.argv[1] == "tutorial2":
print("Tutorial 2 STARTED!")
identifier = time.time()
command = 'python -m luigi --module tutorialtask CrawlerTask --url "{url}" --id "{id}" --num 10'.format(url="isis",id=identifier)
subprocess.call(command.split(), shell= False)
elif sys.argv[1] == "tutorial3":
print("Tutorial 3 STARTED!")
identifier = time.time()
command = 'python -m luigi --module tutorialtask PipelineTask --index tutorial --doc-type news --url "{url}" --id "{id}" --num 10'.format(url="isis",id=identifier)
subprocess.call(command.split(), shell= False)
else:
identifier = time.time()
command = 'python -m luigi --module analysistask PipelineTask --index gsicrawler --doc-type news --url "{url}" --id "{id}" --analysisType "sentiments,emotions"'.format(url="isis",id=identifier)
command = 'python -m luigi --module analysistask PipelineTask --index gsicrawler --doc-type news --url "{url}" --id "{id}" --analysisType "sentiments,emotions" --num {num}'.format(url="isis",id=identifier, num=int(sys.argv[1]))
subprocess.call(command.split(), shell= False)
s.enter(86400, 1, runevery) #Change here your cron time
s.enter(2, 1, runevery)
......
......@@ -2,4 +2,4 @@
envsubst < /usr/src/app/luigienv.cfg > /usr/src/app/luigi.cfg;
luigid --background --pidfile /tmp/pidfile --logdir /tmp &
sleep 20;
python crontasks.py
\ No newline at end of file
python crontasks.py 10
......@@ -18,6 +18,7 @@ def retrieve_tweets(query, filePath, count=200):
api = tweepy.API(auth)
max_tweets = count
max_tweets = int(max_tweets)
print(max_tweets)
searched_tweets = []
last_id = -1
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment