-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
480 lines (365 loc) · 14.7 KB
/
main.py
File metadata and controls
480 lines (365 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# ***********************************
# URL shortener running a Flask stack
# Created by Leon Sandøy for Conmodo
# ***********************************
# built-ins
import os
import sys
from urllib.parse import urlparse
import requests
import string
import json
# third party
import psycopg2
from flask import Flask, render_template, request, redirect, url_for, abort
# user defined
from secrets import WOT_API_KEY, POSTGRES_PASS, DB_TABLE
def connect_to_pg():
"""
Logs into a PostgreSQL database
and returns a connection object.
Raises a ValueError if the login fails.
"""
try:
connect = psycopg2.connect(
"dbname=short_url user=postgres "
"host=localhost password={0}".format(POSTGRES_PASS)
)
except Exception as e:
raise ValueError('Unable to connect to PostgreSQL database:', e)
# cursor to operate database
return connect
def get_domain(url, keep_schema=False):
"""
Finds the domain for a provided url.
get_domain('short.beardfist.com') -> 'short.beardfist.com'
get_domain('http://www.beardfist.com/stuff') -> 'www.beardfist.com'
get_domain('telegraph.co.uk/news') -> 'telegraph.co.uk'
"""
parsed = urlparse(url)
domain = ''
if keep_schema and parsed.scheme:
domain += parsed.scheme + '://'
domain += parsed.netloc
return domain
def get_url_string(url):
"""
Gets the variable string part at the
end of the shortened URLs
get_url_string('short.beardfist.com/abD12') -> 'abD12'
"""
parsed = urlparse(url)
# slicing off the leading '/'
return parsed.path[1:]
def validate_short_url(url):
"""
Figures out whether or not the provided url
is a short_url created by this page.
Returns True or False
"""
parsed = urlparse(url)
domain = get_domain(request.url)
if (domain in parsed.netloc
and parsed.path and len(parsed.path) > 1):
return True
return False
def validate_schema(url):
"""
Checks if the url has either http or https schema.
If it has a valid http or https schema, it returns the url unaltered.
If it doesn't have a schema, it prepends 'http://' and returns the url.
If it has a different schema than http or https, the function raises a ValueError.
This is to prevent sketchy schemas like data:// from being allowed.
"""
# determine which schema the url has
schema = ''
if '://' in url:
schema = url[:url.find('://')]
else:
for char in url:
# if the first punctuation character is a colon
if char in string.punctuation:
if char == ':':
schema = url[:url.find(':')]
break
# return a working url or raise ValueError
if schema in ['http', 'https']:
return url
elif schema == '':
return 'http://' + url
else:
raise UserWarning(
'Illegal schema <b>{0}</b> detected in URL. Only <i>http and https</i> are permitted.'.format(schema))
def validate_url(url):
"""
Check to see if the url actually resolves.
Returns the status_code if it's able to resolve it.
If not, raises a UserWarning.
This is to prevent junk data in our databases,
and should also help prevent sketchy stuff like SQL injections.
"""
# try to connect to the url
try:
request = requests.get(url)
# generic catch because requests can raise
# a surprising number of different errors.
except Exception as e:
raise UserWarning('Could not resolve <b>{0}</b>. Make sure the URL is valid.'.format(url))
# return the status code
return str(request.status_code) + ': ' + request.reason
def safe_check(url):
"""
Uses the Web of Trust API to check if
a url is potentially dangerous.
Returns True if the website can be trusted.
Otherwise, raises a UserWarning with an explanation.
We're doing this to try to prevent people from
using this service to scam others.
"""
# WOT doesn't deal well with long URLS, so let's just feed it the domain
url = get_domain(url)
request = requests.get(
'http://api.mywot.com/0.4/public_link_json2?hosts={0}/&callback=process&key={1}'.format(url, WOT_API_KEY))
print(WOT_API_KEY)
# strips away excess process() wrapper
json_data = request.text[8:-1]
# load the data with the json library
try:
request_json = json.loads(json_data) # May raise ValueError
except ValueError as e:
raise UserWarning("Critical JSON failure - Probably an expired WOT API key.")
try:
categories = request_json[next(iter(request_json))]['categories'] # may raise KeyError
categories = [unwanted_WOT_categories[int(i)] for i in categories.keys()] # may raise KeyError
# build error message
error_message = 'We don\'t trust this page. This page may contain {0}.'.format(', '.join(categories).lower())
raise UserWarning(error_message)
except (KeyError, StopIteration):
return True # Either WOT has no data, or the page is catagorically safe.
def next_short_string(prev_string=None, protected=False):
"""
This function generates a short string
and returns it to the user.
If a previous string is provided,
it returns the next in the series.
If a list of protected strings is provided,
it will recursively generate new strings until
it finds a string that is not in that list.
Allowed characters are a-z, A-Z and 0-9 (in that order)
next_short_string('a') -> 'b'
next_short_string('abz') -> 'abA'
next_short_string('9') -> 'aa'
next_short_string('aBCf99') -> 'aCDgaa'
next_short_string('9999') -> 'aaaaa'
"""
def increment_symbol(letter):
"""
This helper function increments a symbol.
increment_symbol('a') -> 'b'
increment_symbol('z') -> 'A'
increment_symbol('9') -> 'a'
"""
return allowed_characters[(allowed_characters.find(letter) + 1) % maxchar_limit]
# if no prev_string was provided, we'll just start at 'a'
if not prev_string:
return 'a'
# make the string a list so it'll be mutable
new_string = list(prev_string)
# iterate through the list backwards
for num, symbol in reversed(list(enumerate(prev_string))):
# always increment the last symbol
if num + 1 == len(prev_string):
new_string[num] = increment_symbol(symbol)
# if that was the only symbol and it turned into an 'a', we gotta add another 'a' to the end.
if num == 0 and new_string[num] == 'a':
new_string.append('a')
# if the previous symbol was incremented to 'a', keep incrementing
elif new_string[num + 1] == 'a':
new_string[num] = increment_symbol(symbol)
# if the first symbol just turned into an 'a', we have to add another 'a' to the end
if num == 0 and new_string[num] == 'a':
new_string.append('a')
# if the previous symbol didn't turn into 'a', we don't need to keep going.
else:
break
# put it back together
new_string = ''.join(new_string)
# if the string is one of the protected urls, start over.
if protected and new_string in protected:
new_string = next_short_string(new_string, protected) # yay, recursion!
return new_string
def remove_non_ascii(s):
"""
Removes non ascii characters
from a string and returns
the string without them.
"""
return "".join(i for i in s if ord(i) < 128)
## INIT BLOCK ##
# instantiate the Flask app
app = Flask(__name__)
# connecting to the postgres database
pg = connect_to_pg()
cursor = pg.cursor()
# allowed short_string characters
allowed_characters = string.ascii_lowercase + string.ascii_uppercase + string.digits
maxchar_limit = len(allowed_characters)
# stuff we don't want to be associated with
unwanted_WOT_categories = {101: 'Malware or viruses', 103: 'Phishing attempts', 104: 'Scams',
105: 'Potentially illegal elements', 203: 'Suspicious elements',
204: 'Hate, discrimination', 205: 'Spam', 206: 'Potentially unwanted programs'}
# protected url paths
protected_paths = ['reverse']
## MAIN WEB BLOCK ##
@app.route('/', methods=['GET', 'POST'])
def main_page():
"""
This renders the main page found at root ('/').
The decorator tells Flask that it should run this
function whenever someone navigates to '/', and
that both GET and POST methods are permitted.
This function then validates a url POSTed to it,
and then checks if we already shortened it.
If it's already in our database, it returns
the existing record.
If not, it creates a new record and returns that.
"""
# init
url_is_safe = False
error = False
short_url = ''
long_url = ''
long_url_domain = ''
# get the input from POST
if request.method == 'POST':
long_url = request.form.get('long_url')
# validate long_url
if long_url:
try:
# strip the url for leading and trailing whitespace
long_url = long_url.strip()
# remove non-ascii characters
long_url = remove_non_ascii(long_url)
# confirm that the schema is http or https
long_url = validate_schema(long_url) # may raise UserWarning
# confirm that the url is valid and can be resolved
url_status = validate_url(long_url) # may raise ConnectionError
if '404' in url_status:
raise UserWarning('The domain <b>{0}</b> exists, but this specific URL gives a 404 error.'.format(
get_domain(long_url)))
# confirm that it doesn't contain malicious content
url_is_safe = safe_check(long_url) # may raise UserWarning
# confirm that the long_url isn't already a short.beardfist.com url
if (request.url in long_url
and request.url != long_url):
raise UserWarning('You cannot simplify a URL that\'s already been simplified.')
except UserWarning as e:
error = str(e)
long_url = ''
url_is_safe = False
# url has been validated
if url_is_safe:
try:
# first let's check if we've already shortened this URL
cursor.execute("SELECT short_url FROM url_table WHERE long_url = %s;", (long_url,))
short_url = cursor.fetchone()[0]
except:
# which string was used in the latest entry
cursor.execute('SELECT short_url FROM url_table WHERE id=(select max(id) from url_table)')
last_used_string = cursor.fetchone()[0]
# generate the next string in the series
short_url = next_short_string(last_used_string, protected_paths)
# add it to the database
cursor.execute("INSERT INTO url_table (SHORT_URL, LONG_URL, HITS)"
"VALUES (%s, %s, 0);", (short_url, long_url))
pg.commit()
# now prepend the domain itself
short_url = request.url + short_url
# set long_url_domain
long_url_domain = get_domain(long_url)
# display the page
return render_template('index.html', short_url=short_url, long_url=long_url, long_url_domain=long_url_domain,
error=error)
@app.route('/reverse', methods=['GET', 'POST'])
def reverse_page():
"""
This renders the reverse page found at '/reverse'.
This function validates a shortened url POSTed to it,
and then checks if it exists in the database.
If it does, it returns a table with records about the entry.
_________________________________________________________
| Short URL | Long URL | Created | Hits |
|___________|_______________________|______________|______|
| 'aGd' | www.beardfist.com | 23.01.2017 | 241 |
|___________|_______________________|______________|______| |
If it is not found, returns an error message.
"""
# init
valid_short_url = False
error = False
hits = False
created = False
short_url = ''
long_url = ''
# get the input from POST
if request.method == 'POST':
short_url = request.form.get('short_url')
# validate short_url
if short_url:
try:
# strip the url for leading and trailing whitespace
short_url = short_url.strip()
# confirm that the schema is http or https
short_url = validate_schema(short_url) # may raise UserWarning
# confirm that this is a short_url created by this app
valid_short_url = validate_short_url(short_url)
if not valid_short_url:
raise UserWarning('Invalid short URL. We can only reverse URLs created by us.')
except UserWarning as e:
error = str(e)
# url validated for reverse lookup
if valid_short_url:
# get the string itself
url_string = get_url_string(short_url)
try:
# let's see if it exists in our database
cursor.execute("SELECT * FROM url_table WHERE short_url = %s;", (url_string,))
row = cursor.fetchone()
# save the data so we can send it to the page.
hits = row[4]
long_url = row[2]
created = row[3].strftime("%d.%m.%Y")
except:
# it doesn't exist. return an error message.
error = 'The short URL has valid syntax, but was not found in our database.'
# display the page
return render_template('reverse.html', short_url=short_url, long_url=long_url, error=error, hits=hits,
created=created)
@app.route('/<short_url>')
def destination_redirect(short_url):
"""
This redirects the user to the long URL that our
short URLs are related to in the database.
It then increments the 'hits' property by 1.
If it cannot find the short URL in the database,
it returns status code 404 to the browser.
"""
# Tries to fetch the long_url based on short_url. Prone to various exceptions.
try:
cursor.execute("SELECT long_url FROM url_table WHERE short_url = %s;", (short_url))
long_url = cursor.fetchone()[0]
except Exception:
long_url = False
# Increments hits by one and then redirects the user
if long_url:
cursor.execute("UPDATE url_table SET hits = hits + 1 WHERE short_url = %s;", (short_url))
pg.commit()
return redirect(long_url)
else:
return abort(404)
# main function
if __name__ == '__main__':
app.run()