diff --git a/setup.py b/setup.py index 81c1798..21c75d2 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,7 @@ packages=["stravaweblib"], python_requires=">=3.4.0", install_requires=[ - "stravalib>=0.6.6,<1.0.0", - "beautifulsoup4>=4.6.0,<5.0.0", + "stravalib>=0.10.4,<1.0.0", + "beautifulsoup4>=4.9.0,<5.0.0", ], ) diff --git a/stravaweblib/model.py b/stravaweblib/model.py new file mode 100644 index 0000000..cc048f4 --- /dev/null +++ b/stravaweblib/model.py @@ -0,0 +1,497 @@ +#!/usr/bin/env python + +import enum +from datetime import date, datetime + +from stravalib.attributes import (Attribute, DateAttribute, TimestampAttribute, + TimeIntervalAttribute, LocationAttribute) +from stravalib.model import (BaseEntity, BoundEntity, LoadableEntity as _LoadableEntity, + IdentifiableEntity, EntityCollection, EntityAttribute, + Athlete as _Athlete, Bike as _Bike) +from stravalib import unithelper as uh + + +def _parse_component_date(date_str): + if not date_str: + return None + if date_str.lower() == "since beginning": + # Different from no date, but don't know exactly when it was + return datetime.utcfromtimestamp(0).date() + try: + return datetime.strptime(date_str, "%b %d, %Y").date() + except ValueError: + return None + +def _decode_unicode_escapes(s): + """Decodes unicode escapes (\xFFFF) enbeddded in a string""" + return s.encode("utf-8").decode("unicode_escape") + + +def _dict_modify(d, prev, target, overwrite=True, default=None, fcn=None): + """Translate the prev key to target + + Only non-None values will be set + + if overwrite is true, the target key will be overwritten even if something truthy is already there + default controls if anything should be used if the prev key is not available + l is a lambda function that the value will be passed through before being set. + """ + if not overwrite and d.get(target): + return + + t = d.pop(prev, default) + if t is None: + return + if fcn: + t = fcn(t) + if t is None: + return + d[target] = t + + +class DataFormat(enum.Enum): + ORIGINAL = "original" + GPX = "gpx" + TCX = "tcx" + + def __str__(self): + return str(self.value) + + +class FrameType(enum.Enum): + MOUNTAIN_BIKE = 1 + CROSS_BIKE = 2 + ROAD_BIKE = 3 + TIME_TRIAL_BIKE = 4 + + def __str__(self): + return str(self.name).replace("_", " ").title() + + @classmethod + def from_str(cls, s): + if isinstance(s, cls): + return s + return cls[s.replace(" ", "_").upper().replace("TT_", "TIME_TRIAL_")] + + +class MetaLazy(type): + """A metaclass that returns subclasses of the class of the passed in Attribute + + This is used with the LazyLoaded class wrapper below to dynamically create + lazy-loaded subclasses. + + Also, it names the returned types LazyLoaded + """ + def __call__(cls, attr, *args, **kwargs): + attr_cls = attr.__class__ + cls = cls.__class__(cls.__name__ + attr_cls.__name__, (cls, attr_cls), {}) + return super(MetaLazy, cls).__call__(attr, *args, **kwargs) + + +class LazyLoaded(metaclass=MetaLazy): + """Class wrapper that handles lazy-loading an Attribute as it is requested""" + + def __init__(self, attr, *, fcn=None, key=None, property=False): + """Set up the LazyLoaded wrapper + + Can expand attributes individually using a lambda function (fcn), or + multiple attributes at a time via an `expand` function defined on the + class that houses it (key). + + Using `fcn`-based attributes is recommended when each attribute needs + to be retrieved separately. Using `key`-based attributes is recommended + when multiple attributes can be retrieved at the same time. + + If `property` is True, the attribute will be loaded each time it is + requested. This makes the attribute act more like a property. + + :param attr: The `Attribute` to wrap (ie. `Attribute(int)`) + :param fcn: This function will be called the first time the attribute + is requested. The result will be set as the attribute value. + :param key: The key of the attribute in the lazyload cache. The lazyload + cache is stored on the parent class. When this attribute is + requested and the key in not in the cache, the `load_attribute` + function on the parent class is called and the result is + added to the cache. Any future accesses will return the value + from the cache. If the key is not in the cache, `None` is + returned. + :param property: Don't store the result of the lazy load + + Special cases: + - If a lazy-loaded attribute is None, lazy-loading will be attempted + each time it is accessed. This allows for null values to be updated + with new data. + - If the load_attribute function returns None for a property, it will + not be attempted again. + + """ + if not (bool(fcn) ^ bool(key)): + raise ValueError("One of fcn or key (not both) is required") + self._property = property + self._fcn = fcn + self._key = key + # Mimic the child Attribute's properties + super().__init__( + type_=attr.type, + resource_states=attr.resource_states, + units=attr.units + ) + + def __get__(self, obj, clazz): + if obj is None or not (self._property or self.data.get(obj) is None): + return super().__get__(obj, clazz) + + if self._fcn: + # Call the provided function to load the attribute + value = self._fcn(obj) + if value is not None and not self._property: + self.__set__(obj, value) + return value + elif self._key: + if not hasattr(obj, "_lazyload_cache"): + obj._lazyload_cache = {} + + # Use obj.load_attribute() to ensure the object is in the cache + if self._key not in obj._lazyload_cache: + obj._lazyload_cache.update(obj.load_attribute(self._key) or {}) + + # Don't set it on the object, keep accessing out of the cache + return obj._lazyload_cache.get(self._key, None) + + raise AssertionError("No fcn or key?") + + def __set__(self, obj, val): + if self._property: + raise AttributeError( + "Can't set {} property on {!r}".format(self.__class__.__name__, obj) + ) + super().__set__(obj, val) + + +# TODO: probably delete this +class LoadableEntity(_LoadableEntity): + + def load_attribute(self, key): + return {} + + +class ScrapedGear(BaseEntity): + """Represents gear scraped from Strava + + The attributes are compatible with stravalib.model.Gear where they exist + """ + id = Attribute(str) + name = Attribute(str) + distance = Attribute(float, units=uh.meters) + primary = Attribute(bool) + brand_name = Attribute(str) + model_name = Attribute(str) + description = Attribute(str) + + def from_dict(self, d): + _dict_modify(d, "display_name", "name", overwrite=False) + _dict_modify(d, "default", "primary", overwrite=False) + _dict_modify(d, "total_distance", "distance", overwrite=False, + fcn=lambda x: float(x.replace(",", "")) * 1000) + + return super().from_dict(d) + + def __repr__(self): + return "<{} id={} name={!r}>".format( + self.__class__.__name__, + self.id, + self.name + ) + + +class ScrapedShoe(ScrapedGear): + """Represents a pair of shoes scraped from Strava + + The attributes are compatible with stravalib.model.Shoe where they exist + """ + pass + + +class ScrapedBikeComponent(BaseEntity): + """Represents a bike component scraped from Strava""" + + id = Attribute(int) + type = Attribute(str) + brand_name = Attribute(str) + model_name = Attribute(str) + added = DateAttribute() + removed = DateAttribute() + distance = Attribute(int, units=uh.meters) + + def from_dict(self, d): + # Parse and convert dates into something DateAttribute can understand + _dict_modify(d, "added", "added", fcn=_parse_component_date) + _dict_modify(d, "removed", "removed", fcn=_parse_component_date) + + return super().from_dict(d) + + def __repr__(self): + return "<{} id={} type={!r}>".format( + self.__class__.__name__, + self.id, + self.type + ) + + +class _ScrapedBikeData(LoadableEntity): + """Mixin class to add weight and components to a Bike""" + + components = LazyLoaded(EntityCollection(ScrapedBikeComponent), key="components") + weight = LazyLoaded(Attribute(float, units=uh.kg), key="weight") + + def load_attribute(self, key): + """Expand the bike with more details using scraping""" + if self.id is not None: + self.assert_bind_client() + return self.bind_client.get_bike_details(self.id) + + def components_on_date(self, on_date): + """Get bike components installed on the specified date + + :type on_date: None or datetime.date or datetime.datetime + (datetimes will lose time-precision) + """ + if on_date is None: + return self.components + + if isinstance(on_date, datetime): + on_date = on_date.date() + + return [ + c for c in self.components + if (c.added or date.min) <= on_date <= (c.removed or date.max) + ] + + +class Bike(_ScrapedBikeData, _Bike) : + __doc__ = _Bike.__doc__ + """ + Scraping adds weight and components attributes + """ + + def from_object(self, b): + self.from_dict(b.to_dict()) + return self + + +class ScrapedBike(ScrapedGear, _ScrapedBikeData): + """Represents a bike scraped from Strava + + The attributes are compatible with stravalib.models.Bike where they exist. + """ + # NOTE: These are here to take advantage of the load_attributes function + # of the _ScrapedBikeData class in case the ScrapedBike was + # constructed from a regular bike without the attributes set. + frame_type = LazyLoaded(Attribute(FrameType), key="frame_type") + brand_name = LazyLoaded(Attribute(str), key="brand_name") + model_name = LazyLoaded(Attribute(str), key="model_name") + description = LazyLoaded(Attribute(str), key="description") + + +class ScrapedActivityPhoto(BaseEntity): + """Represents a photo scraped from Strava's activity details page + + The attributes are compatible with stravalib.models.ActivityPhoto where + they exist. + """ + + unique_id = Attribute(str) + activity_id = Attribute(int) + athlete_id = Attribute(int) + caption = Attribute(str) + + location = LocationAttribute() + + urls = Attribute(dict) # dimension: url + + def from_dict(self, d): + _dict_modify(d, "photo_id", "unique_id") + _dict_modify(d, "owner_id", "athlete_id") + + # The caption has unicode escapes (ie. \uFFFF) embedded in the string + _dict_modify(d, "caption_escaped", "caption", fcn=_decode_unicode_escapes) + + if "dimensions" in d: + d["urls"] = { + str(min(dim.values())): d.pop(name) + for name, dim in d.pop("dimensions").items() + } + lat = d.pop("lat", None) + lon = d.pop("lng", None) + if lat is not None and lon is not None: + d["location"] = [lat, lon] + + return super().from_dict(d) + + +class ScrapedActivity(LoadableEntity): + """ + Represents an Activity (ride, run, etc.) that was scraped from the website + + The attributes are compatible with stravalib.model.Activity where they exist + """ + + name = Attribute(str) + description = Attribute(str) + type = Attribute(str) + workout_type = Attribute(str) + + start_date = TimestampAttribute() + distance = Attribute(float) + moving_time = TimeIntervalAttribute() + elapsed_time = TimeIntervalAttribute() + total_elevation_gain = Attribute(float) + suffer_score = Attribute(int) + calories = Attribute(float) + gear_id = Attribute(str) + + # True if the activity has GPS coordinates + # False for trainers, manual activities, etc + has_latlng = Attribute(bool) + + trainer = Attribute(bool) + commute = Attribute(bool) + private = Attribute(bool) + flagged = Attribute(bool) + + manual = LazyLoaded(Attribute(bool), key="manual") + photos = LazyLoaded(EntityCollection(ScrapedActivityPhoto), key="photos") + device_name = LazyLoaded(Attribute(str), key="device_name") + + def load_attribute(self, key): + if key not in {"manual", "photos", "device_name"}: + return super().load_attribute(key) + + self.assert_bind_client() + return self.bind_client.get_extra_activity_details(self.id) + + @property + def total_photo_count(self): + return len(self.photos) + + def from_dict(self, d): + # Only 1 of these will set the gear_id + _dict_modify(d, "bike_id", "gear_id", fcn=lambda x: "b{}".format(x)) + _dict_modify(d, "athlete_gear_id", "gear_id", fcn=lambda x: "g{}".format(x)) + + _dict_modify(d, "start_time", "start_date") + _dict_modify(d, "distance_raw", "distance") + _dict_modify(d, "moving_time_raw", "moving_time") + _dict_modify(d, "elapsed_time_raw", "elapsed_time") + _dict_modify(d, "elevation_gain_raw", "elevation_gain") + + return super().from_dict(d) + + +class ScrapedChallenge(IdentifiableEntity): + + url = Attribute(str) + name = Attribute(str) + subtitle = Attribute(str) + teaser = Attribute(str) + overview = Attribute(str) + badge_url = Attribute(str) + + start_date = TimestampAttribute() + end_date = TimestampAttribute() + + def trophy_url(self, percent_complete=100): + """Return a url for a trophy image for the percentage complete + + Note that not all challenges have images for all percentages. Using + 100 should always work. + """ + if not self.badge_url: + return + base, ext = self.badge_url.rsplit(".", 1) + return "{}-{}.{}".format(base, percent_complete, ext) + + def from_dict(self, d): + #_dict_modify(d, "title", "name") + _dict_modify(d, "description", "overview") + _dict_modify(d, "url", "badge_url") + _dict_modify(d, "share_url", "url") + return super().from_dict(d) + + +class _AthleteData(LoadableEntity): + """Mixin class to add photos, challenges, and a name to an Athlete""" + photos = LazyLoaded(EntityCollection(ScrapedActivityPhoto), key="photos") + challenges = LazyLoaded(Attribute(list), key="challenges") + bikes = LazyLoaded(EntityCollection(ScrapedBike), key="bikes") + shoes = LazyLoaded(EntityCollection(ScrapedShoe), key="shoes") + + # Dynamically compute the display name in the same way Strava does + name = LazyLoaded( + Attribute(str), + fcn=lambda x: "{} {}".format(x.firstname or "", x.lastname or "").strip(), + property=True + ) + + def load_attribute(self, key): + self.assert_bind_client() + + # TODO: bikes and shoes only returns scraping-based data + if key == "bikes": + return {"bikes": self.bind_client.get_all_bikes(self.id)} + elif key == "shoes": + return {"shoes": self.bind_client.get_all_shoes(self.id)} + elif key in {"photos", "challenges"}: + d = self.bind_client.get_athlete(self.id) + return { + "photos": d.photos, + "challenges": d.challenges, + } + else: + return super().load_attribute(key) + + +class Athlete(_AthleteData, _Athlete): + __doc__ = _Athlete.__doc__ + """ + Scraping adds photos, challenges, and name attributes + """ + def from_object(self, a): + self.from_dict(a.to_dict()) + return self + + +class ScrapedAthlete(_AthleteData): + """ + Represents Athlete data scraped from the website + + The attributes are compatible with stravalib.model.Athlete where they exist + """ + firstname = Attribute(str) + lastname = Attribute(str) + + profile = Attribute(str) + city = Attribute(str) + state = Attribute(str) + country = Attribute(str) + location = LocationAttribute() + + def from_dict(self, d): + # Merge geo subdict into the main dict + d.update(d.pop("geo", {})) + + _dict_modify(d, "photo", "profile_medium") + _dict_modify(d, "photo_large", "profile") + _dict_modify(d, "first_name", "firstname") + _dict_modify(d, "last_name", "lastname") + _dict_modify(d, "gender", "sex") + _dict_modify(d, "lat_lng", "location") + + # According to some code returned in the HTML, Strava computes the + # display name using " ". He we make an attempt to break + # the display name back up into it's parts. This is only for + # compatibility with the stravalib API - you should always use obj.name + name = d.pop("name", None) + if name and "firstname" not in d and "lastname" not in d: + # total guess: assume more last names have spaces than first + d["firstname"], d["lastname"] = name.split(" ", 1) + + return super().from_dict(d) diff --git a/stravaweblib/webclient.py b/stravaweblib/webclient.py index 4e1e5e9..4651eac 100644 --- a/stravaweblib/webclient.py +++ b/stravaweblib/webclient.py @@ -1,57 +1,61 @@ +#!/usr/bin/env python from base64 import b64decode import cgi from collections import namedtuple -from datetime import date, datetime -import enum +from datetime import datetime import functools +import html import json +import logging +import re import time +import uuid from bs4 import BeautifulSoup import requests import stravalib +from stravalib.model import Activity, Bike as _Bike +from stravaweblib.model import (DataFormat, ScrapedShoe, Bike, ScrapedBike, + ScrapedBikeComponent, ScrapedActivity, + ScrapedActivityPhoto, Athlete, ScrapedAthlete, + ScrapedChallenge, FrameType) -__all__ = ["WebClient", "FrameType", "DataFormat", "ExportFile", "ActivityFile"] +__log__ = logging.getLogger(__name__) -BASE_URL = "https://www.strava.com" +# Used for filtering when scraping the activity list +ACTIVITY_WORKOUT_TYPES = { + "Ride": {None: 10, "Race": 11, "Workout": 12}, + "Run": {None: 0, "Race": 1, "Long Run": 2, "Workout": 3} +} +# Regexes for pulling information out of the activity details page +PHOTOS_REGEX = re.compile(r"var\s+photosJson\s*=\s*(\[.*\]);") +ATHLETE_REGEX = re.compile(r"var\s+currentAthlete\s*=\s*new\s+Strava.Models.CurrentAthlete\(({.*})\);") +CHALLENGE_IDS_REGEX = re.compile(r"var\s+trophiesAnalyticsProperties\s*=\s*{.*challenge_id:\s*\[(\[[\d\s,]*\])\]") +PAGE_VIEW_REGEX = re.compile(r"pageView\s*=\s*new\s+Strava.Labs.Activities.Pages.(\S+)PageView\([\"']?\d+[\"']?,\s*[\"']([^\"']+)") +CHALLENGE_REGEX = re.compile(r"var\s+challenge\s*=\s*new\s+Strava.Models.Challenge\(({.*})\);") +CHALLENGE_DATE_REGEX = re.compile(r"(\S{3} \d{2}, \d{4}) to (\S{3} \d{2}, \d{4})") + +NON_NUMBERS = re.compile(r'[^\d\.]') ExportFile = namedtuple("ExportFile", ("filename", "content")) ActivityFile = ExportFile # TODO: deprecate and remove -class DataFormat(enum.Enum): - ORIGINAL = "original" - GPX = "gpx" - TCX = "tcx" - - def __str__(self): - return str(self.value) - - @classmethod - def classify(cls, value): - for x in cls: - if x.value == str(value): - return x - raise ValueError("Invalid format '{}'".format(value)) - - -class FrameType(enum.Enum): - MOUNTAIN_BIKE = 1 - CROSS_BIKE = 2 - ROAD_BIKE = 3 - TIME_TRIAL_BIKE = 4 +class ScrapingError(ValueError): + """An error that is retured when something fails during scraping - def __str__(self): - return str(self.name).replace("_", " ").title() + This can happen because something on the website changed. + """ -class WebClient(stravalib.Client): +class ScrapingClient: """ - An extension to the stravalib Client that fills in some of the gaps in - the official API using web scraping. + A client that uses web scraping to interface with Strava. + + Can be used as a mixin to add the extra methods to the main stravalib.Client """ def __init__(self, *args, **kwargs): @@ -70,25 +74,15 @@ def __init__(self, *args, **kwargs): if jwt: self._login_with_jwt(jwt) + __log__.info("Resumed session using JWT '%s'", jwt) elif email and password: self._login_with_password(email, password) + __log__.info("Logged in as '%s'", email) else: raise ValueError("'jwt' or both of 'email' and 'password' are required") - # Init the normal stravalib client with remaining args super().__init__(*args, **kwargs) - # Verify that REST API and Web API correspond to the same Strava user account - if self.access_token is not None: - rest_id = str(self.get_athlete().id) - web_id = self._session.cookies.get('strava_remember_id') - if rest_id != web_id: - raise stravalib.exc.LoginFailed("API and web credentials are for different accounts") - else: - # REST API does not have an access_token (yet). Should we verify the match after - # exchange_code_for_token()? - pass - @property def jwt(self): return self._session.cookies.get('strava_remember_token') @@ -99,14 +93,33 @@ def csrf(self): self._csrf = self._get_csrf_token() return self._csrf + @property + def athlete_id(self): + return int(self._session.cookies.get('strava_remember_id')) + + def request(self, method, service, *args, **kwargs): + """Request a URL from Strava + + :service: The URL to send the request to without the base URL + """ + return self._session.request(method, "https://www.strava.com/{}".format(service), *args, **kwargs) + + def request_head(self, service, *args, **kwargs): + return self.request("HEAD", service, *args, **kwargs) + + def request_get(self, service, *args, **kwargs): + return self.request("GET", service, *args, **kwargs) + + def request_post(self, service, *args, **kwargs): + return self.request("POST", service, *args, **kwargs) + def _get_csrf_token(self): """Get a CSRF token Uses the about page because it's small and doesn't redirect based on if the client is logged in or not. """ - login_html = self._session.get("{}/about".format(BASE_URL)).text - soup = BeautifulSoup(login_html, 'html.parser') + soup = BeautifulSoup(self.request_get("about").text, 'html5lib') try: head = soup.head @@ -142,8 +155,8 @@ def _login_with_jwt(self, jwt): def _login_with_password(self, email, password): """Log into the website using a username and password""" - resp = self._session.post( - "{}/session".format(BASE_URL), + resp = self.request_post( + "session", allow_redirects=False, data={ "email": email, @@ -152,9 +165,203 @@ def _login_with_password(self, email, password): **self.csrf } ) - if not resp.is_redirect or resp.next.url == "{}/login".format(BASE_URL): + if not resp.is_redirect or resp.next.url.endswith("/login"): raise stravalib.exc.LoginFailed("Couldn't log in to website, check creds") + def get_extra_activity_details(self, activity_id): + """Scapes the full activity page for various details + + Returns a dict of the properties + """ + __log__.debug("Getting extra information for activity %s", activity_id) + resp = self.request_get("activities/{}".format(activity_id)) + if not resp.ok: + raise stravalib.exc.Fault("Failed to load activity page to get details") + + ret = {} + + soup = BeautifulSoup(resp.text, 'html5lib') + + summary = soup.find("div", class_="activity-summary-container") + if summary: + name = summary.find("h1", class_="activity-name") + if name: + ret["name"] = name.text.strip() + description = summary.find("div", class_="activity-description") + if description: + ret["description"] = description.text.strip() + device = summary.find("div", class_="device") + if device: + ret["device_name"] = device.text.strip() + + for script in soup.find_all("script"): + if not script.string: + continue + + m = PAGE_VIEW_REGEX.search(script.string) + if m: + ret["manual"] = m.group(1).lower() == "manual" + ret["type"] = m.group(2) + continue + + m = PHOTOS_REGEX.search(script.string) + if m: + try: + ret["photos"] = [ScrapedActivityPhoto(**p) for p in json.loads(m.group(1))] + except (TypeError, ValueError) as e: + __log__.error("Failed to parse extracted photo data", exc_info=True) + continue + + return ret + + def get_activity_photos(self, activity_id, size=None, only_instagram=None): + """A scraping-based alternative to stravalib.Client.get_activity_photos + + :param activity_id: The activity for which to fetch photos. + :param size: [unused] (for compatbility with stravalib) + :param only_instagram: [unused] (for compatibility with stravalib) + + :return: A list of ScrapedActivityPhoto objects + """ + return self.get_extra_activity_details(activity_id).get("photos", None) + + def get_activities(self, keywords=None, activity_type=None, workout_type=None, + commute=False, is_private=False, indoor=False, gear_id=None, + before=None, after=None, limit=None): + """A scraping-based alternative to stravalib.Client.get_activities + + Note that when using multiple parameters they are treated as AND, not OR + + :param keywords: Text to search for + :param activity_type: The type of the activity. See stravalib.model:Activity.TYPES + :param workout_type: The type of workout ("Race", "Workout", etc) + :param commute: Only return activities marked as commutes + :param is_private: Only return private activities + :param indoor: Only return indoor/trainer activities + :param gear_id: Only return activities using this gear + + Parameters for compatibility with stravalib.Client.get_activities: + + :param before: Result will start with activities whose start date is + before specified date. (UTC) + :param after: Result will start with activities whose start date is after + specified value. (UTC) + :param limit: How many maximum activities to return. + + :yield: ScrapedActivity objects + """ + + __log__.debug("Getting activities") + if activity_type is not None and activity_type not in Activity.TYPES: + raise ValueError( + "Invalid activity type. Must be one of: {}".format(",".join(Activity.TYPES)) + ) + + if activity_type in ACTIVITY_WORKOUT_TYPES: + workout_type = ACTIVITY_WORKOUT_TYPES[activity_type].get(workout_type) + if workout_type is None: + raise ValueError( + "Invalid workout type for a {}. Must be one of: {}".format( + activity_type, + ", ".join(ACTIVITY_WORKOUT_TYPES[activity_type].keys()) + ) + ) + elif workout_type is not None or gear_id is not None: + raise ValueError( + "Can only filter using workout type of gear when activity type is one of: {}".format( + ", ".join(ACTIVITY_WORKOUT_TYPES.keys()) + ) + ) + + before = stravalib.Client._utc_datetime_to_epoch(None, before or datetime.max) + after = stravalib.Client._utc_datetime_to_epoch(None, after or datetime.min) + + num_yielded = 0 + page = 1 + per_page = 20 + search_session_id = uuid.uuid4() + + conv_bool = lambda x: "" if not x else "true" + + while True: + __log__.debug("Getting page %s of activities", page) + resp = self.request_get( + "athlete/training_activities", + headers= { + "Accept": "text/javascript, application/javascript, application/ecmascript, application/x-ecmascript", + "X-Requested-With": "XMLHttpRequest", + }, + params={ + "search_session_id": search_session_id, + "page": page, + "per_page": per_page, + "keywords": keywords, + "new_activity_only": "false", + "activity_type": activity_type or "", + "commute": conv_bool(commute), + "private_activities": conv_bool(is_private), + "trainer": conv_bool(indoor), + "gear": gear_id or "", + "order": "start_date_local DESC" # Return in reverse-chronological order + } + ) + if resp.status_code != 200: + raise stravalib.exc.Fault( + "Failed to list activities (status code {})".format(resp.status_code) + ) + try: + data = resp.json()["models"] + except (ValueError, TypeError, KeyError) as e: + raise ScrapingError("Invalid JSON response from Strava") from e + + # No results = done + if not data: + return + + for activity in data: + # Respect the limit + if limit is not None and num_yielded >= limit: + return + + # Translate workout types from ints back to strings + wt = activity.pop("workout_type") + if activity["type"] in ACTIVITY_WORKOUT_TYPES: + for k, v in ACTIVITY_WORKOUT_TYPES[activity["type"]].items(): + if wt == v: + activity["workout_type"] = k + break + + activity = ScrapedActivity(bind_client=self, **activity) + + # Respect the before and after filters + # Will see activities from neweset to oldest so can do less + # work to limit by time + ts = activity.start_date.timestamp() + if ts < after: + # Activity is too new, no more results + return + elif ts > before: + # Activity is too old, don't yield it + continue + + yield activity + num_yielded += 1 + + page += 1 + + def get_activity(self, activity_id): + """A scraping-based alternative to stravalib.Client.get_activity + + Note that this actually performs a search for the activity using + `get_activities` to get most of the information. Generally, it would be + more efficient to use `get_activities` to find the activities directly. + """ + d = self.get_extra_activity_details(activity_id) + for x in self.get_activities(keywords=d["name"], activity_type=d["type"]): + if x.id == activity_id: + x._do_expand(d, overwrite=False) + return x + def delete_activity(self, activity_id): """ Deletes the specified activity. @@ -162,8 +369,9 @@ def delete_activity(self, activity_id): :param activity_id: The activity to delete. :type activity_id: int """ - resp = self._session.post( - "{}/activities/{}".format(BASE_URL, activity_id), + __log__.debug("Deleting activity %s", activity_id) + resp = self.request_post( + "activities/{}".format(activity_id), allow_redirects=False, data={ "_method": "delete", @@ -171,7 +379,7 @@ def delete_activity(self, activity_id): } ) - if not resp.is_redirect or resp.next.url != "{}/athlete/training".format(BASE_URL): + if not resp.is_redirect or not resp.next.url.endswith("/athlete/training"): raise stravalib.exc.Fault( "Failed to delete activity (status code: {})".format(resp.status_code), ) @@ -201,8 +409,7 @@ def _make_export_file(resp, id_): content=resp.iter_content(chunk_size=16*1024) # 16KB ) - def get_activity_data(self, activity_id, fmt=DataFormat.ORIGINAL, - json_fmt=None): + def get_activity_data(self, activity_id, fmt=DataFormat.ORIGINAL, json_fmt=None): """ Get a file containing the provided activity's data @@ -218,74 +425,96 @@ def get_activity_data(self, activity_id, fmt=DataFormat.ORIGINAL, :param json_fmt: The backup format to request in the event that the `fmt` was DataFormat.ORIGINAL and the request returned - a JSON blob (happens for uploads from mobile apps). - Using `None` (default) will cause the JSON blob to be - returned. - :type json_fmt: :class:`DataFormat` or None + a JSON blob (happens for uploads from older mobile apps). + Using `DataFormat.ORIGINAL` will cause the JSON blob to + be returned. + (defaults to DataFormat.GPX) + :type json_fmt: :class:`DataFormat` :return: A namedtuple with `filename` and `content` attributes: - `filename` is the filename that Strava suggests for the file - `contents` is an iterator that yields file contents as bytes :rtype: :class:`ExportFile` """ - fmt = DataFormat.classify(fmt) - url = "{}/activities/{}/export_{}".format(BASE_URL, activity_id, fmt) - resp = self._session.get(url, stream=True, allow_redirects=False) + __log__.debug("Getting data (in %s format) for activity %s", fmt, activity_id) + + fmt = DataFormat(fmt) + json_fmt = DataFormat(json_fmt) + resp = self.request_get( + "activities/{}/export_{}".format(activity_id, fmt), + stream=True, + allow_redirects=False + ) + + # Gives a 302 back to the activity URL when trying to export a manual activity + # TODO: Does this also happen with other errors? if resp.status_code != 200: raise stravalib.exc.Fault("Status code '{}' received when trying " "to download an activity" "".format(resp.status_code)) - # In the case of downloading JSON, the Content-Type header will - # correctly be set to 'application/json' - if (json_fmt and fmt == DataFormat.ORIGINAL and + # When downloading JSON, the Content-Type header will set to 'application/json' + # If the json_fmt is not DataFormat.ORIGINAL, try the download again asking + # for the json_fmt. + if (fmt == DataFormat.ORIGINAL and json_fmt != fmt and resp.headers['Content-Type'].lower() == 'application/json'): - if json_fmt == DataFormat.ORIGINAL.value: - raise ValueError("`json_fmt` parameter cannot be DataFormat.ORIGINAL") - return self.get_activity_data(activity_id, fmt=json_fmt) + return self.get_activity_data(activity_id, fmt=json_fmt, json_fmt=DataFormat.ORIGINAL) return self._make_export_file(resp, activity_id) - def _parse_date(self, date_str): - if not date_str: - return None - if date_str.lower() == "since beginning": - # Different from no date, but don't know exactly when it was - return datetime.utcfromtimestamp(0).date() - try: - return datetime.strptime(date_str, "%b %d, %Y").date() - except ValueError as e: - return None - - @functools.lru_cache() - def _get_all_bike_components(self, bike_id): + def get_bike_details(self, bike_id): """ - Get all components for the specified bike + Scrape the details of the specified bike :param bike_id: The id of the bike to retreive components for (must start with a "b") :type bike_id: str """ + __log__.debug("Getting bike details for bike %s", bike_id) if not bike_id.startswith('b'): raise ValueError("Invalid bike id (must start with 'b')") - # chop off the leading "b" - url = "{}/bikes/{}".format(BASE_URL, bike_id[1:]) - - resp = self._session.get(url, allow_redirects=False) + resp = self.request_get( + "bikes/{}".format(bike_id[1:]), # chop off the leading "b" + allow_redirects=False + ) if resp.status_code != 200: raise stravalib.exc.Fault( "Failed to load bike details page (status code: {})".format(resp.status_code), ) - soup = BeautifulSoup(resp.text, 'html.parser') - for table in soup.find_all('table'): - if table.find('thead'): + soup = BeautifulSoup(resp.text, 'html5lib') + + ret = {} + + # Get data about the bike + gear_table = soup.find("div", class_="gear-details").find("table") + for k, v in zip( + ["frame_type", "brand_name", "model_name", "weight"], + [x.text for x in gear_table.find_all("td")][1::2] + ): + if not k: + continue + if k == "weight": + # Strip non-number chars ("kg") + # TODO: other units? + v = float(NON_NUMBERS.sub('', v)) + elif k == "frame_type": + v = FrameType.from_str(v) + ret[k.lower()] = v + + # Get component data + table = None + for t in soup.find_all('table'): + if t.find('thead'): + table = t break else: - raise ValueError("Bike component table not found in the HTML - layout update?") + raise ScrapingError( + "Bike component table not found in the HTML - layout update?" + ) - components = [] + ret["components"] = [] for row in table.tbody.find_all('tr'): cells = row.find_all('td') text = [cell.text.strip() for cell in cells] @@ -300,39 +529,16 @@ def _get_all_bike_components(self, bike_id): component_id = cells[6].find('a', text="Delete")['href'].rsplit("/", 1)[-1] - components.append({ - 'id': component_id, - 'type': text[0], - 'brand': text[1], - 'model': text[2], - 'added': self._parse_date(text[3]), - 'removed': self._parse_date(text[4]), - 'distance': distance - }) - return components - - def get_bike_components(self, bike_id, on_date=None): - """ - Get components for the specified bike - - :param bike_id: The id of the bike to retreive components for - (must start with a "b") - :type bike_id: str - - :param on_date: Only return components on the bike for this day. If - `None`, return all components regardless of date. - :type on_date: None or datetime.date or datetime.datetime - """ - components = self._get_all_bike_components(bike_id) - - # Filter by the on_date param - if on_date: - if isinstance(on_date, datetime): - on_date = on_date.date() - return [c for c in components if \ - (c['added'] or date.min) <= on_date <= (c['removed'] or date.max)] - else: - return components + ret["components"].append(ScrapedBikeComponent( + id=component_id, + type=text[0], + brand_name=text[1], + model_name=text[2], + added=text[3], + removed=text[4], + distance=distance + )) + return ret def get_route_data(self, route_id, fmt=DataFormat.GPX): """ @@ -362,10 +568,252 @@ def get_route_data(self, route_id, fmt=DataFormat.GPX): return self._make_export_file(resp, route_id) + def get_all_bikes(self, athlete_id=None): + """Scrape all bike information from Strava + + :yield: `ScrapedBike` objects + """ + # Return minimal information from the athlete page if this isn't the + # currently-logged in athlete. + if int(athlete_id) != self.athlete_id: + return self.get_athlete(athlete_id).bikes + + __log__.debug("Getting all bike data") + resp = self.request_get("athletes/{}/gear/bikes".format(self.athlete_id)) + if not resp.ok: + raise stravalib.exc.Fault("Failed to get list of bikes") + try: + yield from ( + ScrapedBike( + bind_client=self, + id="b{}".format(b.pop("id")), # add "b" to gear id + **b + ) + for b in resp.json() + ) + except (TypeError, ValueError) as e: + raise ScrapingError("Failed to parse bike data") from e + + def get_all_shoes(self, athlete_id=None): + """Scrape all shoe information from Strava + + :yield: `ScrapedShoe` objects + """ + # Return minimal information from the athlete page if this isn't the + # currently-logged in athlete. + if int(athlete_id) != self.athlete_id: + return self.get_athlete(athlete_id).shoes + + __log__.debug("Getting all shoe data") + resp = self.request_get("athletes/{}/gear/shoes".format(self.athlete_id)) + if not resp.ok: + raise stravalib.exc.Fault("Failed to get list of shoes") + try: + yield from (ScrapedShoe(**s) for s in resp.json()) + except (TypeError, ValueError) as e: + raise ScrapingError("Failed to parse shoe data") from e + def get_all_gear(self): + """Scrape all gear information from Strava -# Inherit parent documentation for WebClient.__init__ -WebClient.__init__.__doc__ = stravalib.Client.__init__.__doc__ + \ + :yield: `ScrapedBike` and `ScrapedShoe` objects + """ + yield from self.get_all_bikes() + yield from self.get_all_shoes() + + def get_gear(self, gear_id): + """A scraping-based replacement for `stravalib.Client.get_gear`""" + try: + if gear_id.startswith("b"): + return next(x for x in self.get_all_bikes() if x.id == gear_id) + else: + return next(x for x in self.get_all_shoes() if x.id == gear_id) + except StopIteration: + raise KeyError("No gear with id '{}' found".format(gear_id)) + + def get_athlete(self, athlete_id=None): + """A scraping-based replacement for `stravalib.Client.get_athlete`""" + if athlete_id is None: + athlete_id = self.athlete_id + + athlete_id = int(athlete_id) + + __log__.debug("Getting athlete %s", athlete_id) + resp = self.request_get("athletes/{}".format(athlete_id)) + if not resp.ok: + raise stravalib.exc.Fault("Failed to get athlete {}".format(athlete_id)) + + ret = { + "id": athlete_id, + "photos": [], + "challenges": [], + } + + soup = BeautifulSoup(resp.text, 'html5lib') + + for script in soup.find_all("script"): + data = script.string + if not data: + continue + + # This method only works on the currently-logged in athlete but returns much more data than the above + if athlete_id == self.athlete_id: + m = ATHLETE_REGEX.search(data) + if m: + try: + ret.update(json.loads(m.group(1))) + except (TypeError, ValueError) as e: + __log__.error("Failed to parse extracted athlete data", exc_info=True) + continue + + m = CHALLENGE_IDS_REGEX.search(data) + if m: + try: + ret["challenges"] = json.loads(m.group(1)) + except (TypeError, ValueError) as e: + __log__.error("Failed to parse extracted challenge data", exc_info=True) + continue + + m = PHOTOS_REGEX.search(data) + if m: + try: + ret["photos"] = [ScrapedActivityPhoto(**p) for p in json.loads(m.group(1))] + except (TypeError, ValueError) as e: + __log__.error("Failed to parse extracted photo data", exc_info=True) + continue + + if athlete_id != self.athlete_id: + # Get basic profile data for an athlete. + # There are multiple headings depending on the level of access + for heading in soup.find_all("div", class_="profile-heading"): + name = heading.find("h1", class_="athlete-name") + if name: + ret["name"] = name.text.strip() + + location = heading.find("div", class_="location") + if location: + ret["city"], ret["state"], ret["country"] = [x.strip() for x in location.text.split(",", 2)] + + profile = heading.find("img", class_="avatar-img") + if profile: + ret["profile"] = profile["src"] + + # Get basic gear info from the sidebar. + # By providing minimal data for non-logged-in athletes, no more data + # will be lazy-loaded by the bikes and shoes attributes. This is what + # we want since the lazy-load would just call this function again. + # However, when getting the logged in athlete's gear, we don't want to + # set anything since the lazy-load will use the more detailed + # get_all_bikes/gear functions instead of this one. + ret["bikes"] = [] + ret["shoes"] = [] + for gear in soup.select("div.section.stats.gear"): + if "bikes" in gear["class"]: + type_ = "bikes" + cls = ScrapedBike + elif "shoes" in gear["class"]: + type_ = "shoes" + cls = ScrapedShoe + else: + continue + + for row in gear.find("table").find_all("tr"): + name, dist = row.find_all("td") + ret[type_].append(cls( + name=name.text.strip(), + distance=int(float(NON_NUMBERS.sub('', dist.text.strip())) * 1000), + )) + + return ScrapedAthlete(bind_client=self, **ret) + + def get_challenge(self, challenge_id): + """Get data about a challenge""" + __log__.debug("Getting details for challenge %s", challenge_id) + resp = self.request_get("challenges/{}".format(challenge_id)) + if not resp.ok: + raise stravalib.exc.Fault("Failed to get challenge {}".format(challenge_id)) + + data = {} + soup = BeautifulSoup(resp.text, 'html5lib') + react_data = soup.find("div", **{"data-react-class": "Show"}) + if react_data: + # Extract data from the react version of the page + data_str = html.unescape( + react_data["data-react-props"] + .replace(" ", " ") + .replace("\n", "\\n") + ) + try: + data = json.loads(data_str) + except (TypeError, ValueError) as e: + raise ScrapingError("Failed to parse extracted challenge data") from e + + # Get the description + description_html = next(x for x in data["sections"] if x["title"] == "Overview")["content"][0]["text"].replace(" ", "") + data["description"] = BeautifulSoup(description_html, 'html5lib').text + data["name"] = data["header"]["name"] + data["subtitle"] = data["header"]["subtitle"] + data["teaser"] = data["summary"]["challenge"]["title"] + data["badge_url"] = data["header"]["challengeLogoUrl"] + data["share_url"] = "https://www.strava.com/challenges/{}".format(challenge_id) + + m = CHALLENGE_DATE_REGEX.search(data["summary"]["calendar"]["title"]) + if m: + try: + data["start_date"], data["end_date"] = [ + datetime.strptime(x, "%b %d, %Y") for x in m.groups() + ] + except ValueError: + __log__.error("Failed to parse dates {}".format(m.groups())) + else: + # Look for the data in the older-style page + for script in soup.find_all("script"): + if not script.string: + continue + + m = CHALLENGE_REGEX.search(script.string) + if not m: + continue + + data_str = html.unescape(m.group(1)) + try: + data = json.loads(data_str) + except (TypeError, ValueError) as e: + raise ScrapingError("Failed to parse extracted challenge data") from e + + desc = soup.find("div", id="desc") + if desc: + data["description"] = desc.text + + if not data: + raise ScrapingError("Failed to scrape challenge data {}".format(challenge_id)) + + data["id"] = challenge_id + + return ScrapedChallenge(**data) + + +class WebClient(stravalib.Client): + """ + An extension to the stravalib Client that fills in some of the gaps in + the official API using web scraping. + + Requires a JWT or both of email and password + """ + + def __new__(cls, *_, **__): + self = super().__new__(cls) + + # Prepend some docstrings with the parent classes one + for fcn in ("__init__", "get_gear", "get_athlete"): + getattr(cls, fcn).__doc__ = getattr(super(), fcn).__doc__ + getattr(cls, fcn).__doc__ + + # Delegate certain methods and properties to the scraper instance + for fcn in ("delete_activity", "get_activity_data", "jwt", "csrf"): + setattr(cls, fcn, cls._delegate(ScrapingClient, fcn)) + return self + + def __init__(self, *args, **kwargs): """ :param email: The email of the account to log into :type email: str @@ -375,6 +823,7 @@ def get_route_data(self, route_id, fmt=DataFormat.GPX): :param jwt: The JWT of an existing session. If not specified, email and password are required. + Can be accessed from the `.jwt` property. :type jwt: str :param csrf: A dict of the form: `{: }`. @@ -382,3 +831,60 @@ def get_route_data(self, route_id, fmt=DataFormat.GPX): Can be accessed from the `.csrf` property. :type csrf: dict """ + sc_kwargs = { + k: kwargs.pop(k, None) for k in ("email", "password", "jwt", "csrf") + } + self._scraper = ScrapingClient(**sc_kwargs) + super().__init__(*args, **kwargs) + + if self._scraper.athlete_id != self.get_athlete().id: + raise ValueError("API and web credentials are for different accounts") + + def get_athlete(self, athlete_id=None): + """ + Returned Athletes will have scraped attributes lazily added. + Also, when accessing the bikes attribute, more scraped data will be available + """ + athlete = super().get_athlete(athlete_id) + # TODO: Should make the bind client this instance + # That way scraping/API functions can be mixed + return Athlete(bind_client=self._scraper).from_object(athlete) + + def get_gear(self, gear_id): + """ + Returned Bikes will have scraped attributes lazily added + """ + gear = super().get_gear(gear_id) + if isinstance(gear, _Bike): + # TODO: Should make the bind client this instance + # That way scraping/API functions can be mixed + return Bike(bind_client=self._scraper).from_object(gear) + return gear + + def get_all_gear(self): + """Get all gear information from Strava + + :yield: `stravalib.model.Bike` and `stravalib.model.Shoe` instances + """ + athlete = self.get_athlete() + if athlete.bikes is None and athlete.shoes is None: + __log__.error("Failed to get gear data (missing profile:read_all scope?)") + return + + for gear in athlete.bikes + athlete.shoes: + yield self.get_gear(gear) + + @staticmethod + def _delegate(clazz, name): + func = getattr(clazz, name) + is_prop = isinstance(func, property) + + @functools.wraps(func) + def delegator(self, *args, **kwargs): + if is_prop: + return getattr(self._scraper, name) + return getattr(self._scraper, name)(*args, **kwargs) + + if is_prop: + delegator = property(delegator) + return delegator