Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/dags/the_reading_machine.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import os
from airflow import DAG
from airflow import configuration as conf
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from datetime import timedelta
from thereadingmachine.utils.airflow_extra import BashWrapperOperator

# Load configuration
process_directory = os.path.join(conf.get('core', 'process_folder'))
airflow_start_date = os.getenv('AIRFLOW_START_DATE')

# Set configure
default_args = {
'owner': 'michael',
'depends_on_past': False,
'start_date': datetime.strptime(airflow_start_date, '%Y-%m-%d'),
'start_date': days_ago(1),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good practice.

Copy link
Copy Markdown
Contributor Author

@mkao006 mkao006 Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an old one, although dynamic dates are not recommended in the documentation but there are no other solutions.

This is fairly standard practice as far as I am aware and there is a reason why the function days_ago actually exist. Unless you know of any other solution.

'email': ['michael.kao@fao.org'],
'email_on_failure': True,
'email_on_retry': False,
Expand Down
12 changes: 8 additions & 4 deletions deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ Follow this
to manually launch an instance and setup Docker.


Currently, the ideal instance size has not yet been determined,
further optimisation is required. However, there are two known changes
required for the deployment.
Currently, the minimum instance size is `t2.large`. It has been tested
that `t2.medium` does not have sufficient memory for the process
`article_processing`.

After launching and ssh into the instance, update and install Docker as per tutorial.

Expand Down Expand Up @@ -100,9 +100,13 @@ and require clean up. This can be done with the following command on
the instance.

```
docker system prune -a -f
echo "0 0 * * * docker system prune -a -f" | crontab -
```

This will schedule a cron job to clean up redudant space used by
Docker everyday at midnight.


Please see the [community
forum](https://forums.docker.com/t/some-way-to-clean-up-identify-contents-of-var-lib-docker-overlay/30604/2)
for more information.
3 changes: 0 additions & 3 deletions pipeline/article_scraper/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,3 @@

# 'bloomberg' removed temporarily.
ctr.scrap_articles(env.spiders)

# Save output file
ctr.save_json_to_db(env.spiders)
1 change: 0 additions & 1 deletion set_env_var.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ export SCRAPER_FILE_PREFIX=blog_articles
export WEBAPP_DIR=$PROJECT_HOME/webapp
export WEBAPP_PLOT_DIR=$WEBAPP_DIR/templates/static/plotly
export PYTHONPATH=$PROJECT_HOME:$PYTHONPATH
export AIRFLOW_START_DATE=`date +%Y-%m-%d`
10 changes: 9 additions & 1 deletion thereadingmachine/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlalchemy import create_engine

# Spiders for scraper
spiders = ['worldgrain', 'euractiv', 'agrimoney']
spiders = ['worldgrain', 'euractiv']

# Data directory and engine
data_dir = os.environ['DATA_DIR']
Expand All @@ -26,6 +26,14 @@
market_force_table = 'MarketForce'

# Table field type
raw_article_field_type = {
'source': sqlalchemy.types.Unicode(length=255),
'title': sqlalchemy.types.Unicode(length=255),
'date': sqlalchemy.types.Date(),
'link': sqlalchemy.types.Unicode(length=255),
'article': sqlalchemy.types.UnicodeText
}

processed_article_field_type = {
'id': sqlalchemy.types.Integer(),
'date': sqlalchemy.types.Date(),
Expand Down
52 changes: 33 additions & 19 deletions thereadingmachine/modeller/compute_market_force.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from bisect import bisect_left
from plotly.offline import plot
from sklearn.linear_model import ElasticNetCV
from statsmodels.nonparametric.smoothers_lowess import lowess
from statsmodels.distributions.empirical_distribution import ECDF


Expand Down Expand Up @@ -95,18 +94,11 @@ def estimate_sentiment_weights(model_data, response_variable):
''' Estimate the model coefficient of the sentiment time series.
'''

model = ElasticNetCV(n_alphas=100,
model = ElasticNetCV(n_alphas=100, l1_ratio=1,
tol=1e-7, max_iter=1e7, cv=10, n_jobs=-1,
fit_intercept=False, normalize=True)
fit_intercept=True, normalize=True)
topic_variables = get_topic_variables()
demeaned_response = model_data['response'] - model_data['response'].mean()
smoothed_response = lowess(demeaned_response,
range(len(model_data['response'])),
return_sorted=False,
frac=0.1)
normalised_response = pd.Series(smoothed_response).diff().fillna(0)

model.fit(model_data[topic_variables], normalised_response)
model.fit(model_data[topic_variables], model_data[response_variable])
return model.coef_


Expand Down Expand Up @@ -183,18 +175,40 @@ def create_sentiment_traffic_light(data, commodity_col='commodity',

sent_level_color = [param.div_col_pallete[i - 1]
for i in data[sent_level_col]]
traffic_light = [go.Scatter(x=data[commodity_col],
y=[0] *
len(data[commodity_col]),
marker={'color': sent_level_color,
'size': 100},
mode='markers')]

num_class = len(param.div_col_pallete)
num_commodity = len(data[commodity_col].unique())

legend_x = [num_commodity] * num_class
legend_y = np.linspace(-0.7, 0.7, 5).tolist()
legend_marker_size = [40] * num_class
legend_text = ['Very Bearish', 'Bearish',
'Neutral', 'Bullish', 'Very Bullish']
data_x = range(num_commodity)
data_y = [0] * num_commodity
data_marker_size = [150] * num_commodity
plot_marker = {'color': sent_level_color + param.div_col_pallete,
'size': data_marker_size + legend_marker_size}

traffic_light = [go.Scatter(x=data_x + legend_x,
y=data_y + legend_y,
marker=plot_marker,
mode='markers+text')]

legend_annotation = [dict(x=lx - 0.25, y=ly, text=lt, showarrow=False)
for lx, ly, lt in zip(legend_x, legend_y, legend_text)]

layout = go.Layout(yaxis=dict(range=[-1, 1],
showgrid=False,
zeroline=False,
showticklabels=False),
xaxis=dict(showgrid=False, tickfont=dict(size=15)),
margin=go.Margin(l=40, r=40, b=50, t=50, pad=0))
xaxis=dict(range=[-0.5, num_commodity + 0.5],
showgrid=False, tickfont=dict(size=15),
zeroline=False,
ticktext=data[commodity_col].tolist() + [''],
tickvals=range(0, num_commodity + 1)),
annotations=legend_annotation,
margin=go.Margin(l=40, r=10, b=50, t=50, pad=0))
fig = go.Figure(data=traffic_light, layout=layout)
out_file_name = 'sentiment_traffic_light.html'
plot(fig, filename=os.path.join(env.plot_output_dir, out_file_name),
Expand Down
2 changes: 0 additions & 2 deletions thereadingmachine/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
# Model parameters
filter_coef = 1
sentiment_scale = 50
bootstrapIteration = 75
forecast_period = 0
holdout_period = 180

# RNN Model parameters
feature_size = 100
Expand Down
5 changes: 3 additions & 2 deletions thereadingmachine/processor/article_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def scraper_post_processing(raw_articles, model_start_date, id_col='id',
param.irrelevant_link)]

# Subset the data only after the model_start_date
processed_articles = processed_articles[processed_articles[date_col]
> model_start_date]
processed_articles = processed_articles[
(processed_articles[date_col] > model_start_date) &
(processed_articles[date_col] <= datetime.today())]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using today's date will behave funny, I'd refer to airflow execution date to enforce reproducibility between runs


return processed_articles

Expand Down
63 changes: 0 additions & 63 deletions thereadingmachine/scraper/article_scraper.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,7 @@
import os
import pandas as pd
import sqlalchemy
from datetime import datetime
from datetime import timedelta
from sqlalchemy import create_engine
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings


# Configuration
data_dir = os.environ['DATA_DIR']
target_data_table = 'RawArticle'
engine = create_engine('sqlite:///{0}/the_reading_machine.db'.format(data_dir))
scraper_file_prefix = os.environ['SCRAPER_FILE_PREFIX']
scraper_output_path = os.path.join(data_dir, 'scraper_output')

if not os.path.exists(scraper_output_path):
os.makedirs(scraper_output_path)


field_type = {'source': sqlalchemy.types.Unicode(length=255),
'title': sqlalchemy.types.Unicode(length=255),
'date': sqlalchemy.types.Date(),
'link': sqlalchemy.types.Unicode(length=255),
'article': sqlalchemy.types.UnicodeText}


def scrap_articles(spiders):
''' Function to start the scrapers.
'''
Expand All @@ -35,42 +11,3 @@ def scrap_articles(spiders):
process.crawl(spider)

process.start()


def save_json_to_db(spiders, date_col='date'):
''' Function to load the scraped article and save it back to the database.
'''

flattened_article_df = pd.DataFrame()
today = datetime.today()
for spider in spiders:
current_file_name = '{}_{}_{}.jsonl'.format(
scraper_file_prefix, today.strftime('%Y_%m_%d'), spider)
current_file_path = os.path.join(
scraper_output_path, current_file_name)

if os.path.isfile(current_file_path):
current_source = pd.read_json(current_file_path, lines=True)
flattened_article_df = (
flattened_article_df
.append(current_source, ignore_index=True))
else:
raise TypeError(
'source file for spider "{}" does not exist'.format(spider))

flattened_article_df.to_sql(con=engine,
name=target_data_table,
index=False,
if_exists='replace',
dtype=field_type)
# Delete old files
#
# We delete files from two days ago, so that we still have data scraped
# yester for recover.
two_days_ago = today - timedelta(days=2)
old_file_name = '{}_{}_{}.jsonl'.format(
scraper_file_prefix, two_days_ago.strftime('%Y_%m_%d'), spider)
old_file_path = os.path.join(
scraper_output_path, old_file_name)
if os.path.isfile(old_file_path):
os.remove(old_file_path)
45 changes: 16 additions & 29 deletions thereadingmachine/scraper/news_scraper/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,10 @@
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html


import os
import json
import threading
import pandas as pd
from scrapy.exceptions import DropItem
from datetime import datetime

data_dir = os.environ['DATA_DIR']
scraper_file_prefix = os.environ['SCRAPER_FILE_PREFIX']
scraper_output_path = os.path.join(data_dir, 'scraper_output')

if not os.path.exists(scraper_output_path):
os.makedirs(scraper_output_path)
import thereadingmachine.environment as env


class DuplicatesPipeline(object):
Expand Down Expand Up @@ -69,35 +61,30 @@ def process_item(self, item, spider):
raise DropItem('Empty Article in %s' % item)


class AmisJsonPipeline(object):
class AmisScrapePipeline(object):

def __init__(self):
self.datafiles = {}
self.data_list = []
self.today = datetime.today()

def open_spider(self, spider):
if spider.name not in self.datafiles.keys():
target_file_name = '{}_{}_{}.jsonl'.format(
scraper_file_prefix, self.today.strftime('%Y_%m_%d'), spider.name)
target_file_path = os.path.join(
scraper_output_path, target_file_name)
self.datafiles[spider.name] = open(target_file_path, 'a')
self.lock = threading.Lock()

def process_item(self, item, spider):
# spider.logger.info('Processing Item: ' + item['title'])
self.lock.acquire()
try:
item_dict = dict(item)
item_dict['source'] = spider.name
line = json.dumps(item_dict, ensure_ascii=False) + '\n'
self.datafiles[spider.name].write(line)
# spider.logger.info('Written Item: ' + item['title'])
self.data_list.append(item_dict)
except (UnicodeDecodeError, UnicodeEncodeError):
raise DropItem('Formatting Error in %s' % item)
self.lock.release()
return item

def close_spider(self, spider):
for datafile in self.datafiles.values():
datafile.close()
scraped_data = pd.DataFrame(self.data_list)
scraped_data['article'] = scraped_data['article'].apply(
lambda x: x.decode('utf-8'))
scraped_data['title'] = scraped_data['title'].apply(
lambda x: x.decode('utf-8'))
scraped_data['date'] = pd.to_datetime(scraped_data['date'])
scraped_data.to_sql(con=env.engine,
name=env.raw_article_table,
index=False,
if_exists='append',
dtype=env.raw_article_field_type)
7 changes: 6 additions & 1 deletion thereadingmachine/scraper/news_scraper/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
NEWSPIDER_MODULE = 'thereadingmachine.scraper.news_scraper.spiders'


# Logging
LOG_STDOUT = False
LOG_LEVEL = 'ERROR'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit harsh as a default, consider using environment variables instead and defaulting to something milder.


# Only scrap data that is new
SCRAPE_ONLY_NEW = True

# Crawl responsibly by identifying yourself (and your website) on the
# user-agent
Expand All @@ -42,5 +47,5 @@
ITEM_PIPELINES = {
'thereadingmachine.scraper.news_scraper.pipelines.DuplicatesPipeline': 100,
'thereadingmachine.scraper.news_scraper.pipelines.SanitizeArticlePipeline': 300,
'thereadingmachine.scraper.news_scraper.pipelines.AmisJsonPipeline': 500
'thereadingmachine.scraper.news_scraper.pipelines.AmisScrapePipeline': 500
}
Loading