This personal project displays my up-to-date Strava fitness activity information on a interactive mobile friendly data dashboard using Leaflet to display geographical data and Chart.JS to display graphical information. Data can be filtered and explored by using buttons, date selections, searches, and by selecting geographical data.
Strava is a fitness tracking mobile application which logs GPS data recorded during outdoor fitness activities. I have been logging rides, walks, runs, and hikes for years to the application and have accumulated over 400 total activities. Strava provides an API to access these records and additional data that the service calculates. I saw this as an opportunity to expand my server and client side development skills by accessing, processing, and presenting the data in a map/dashboard.
Historic Strava data were initially pulled from the Strava API and processed using Python, then a webhook subscription was created which updates my server when new activities are available for processing. Strava activity data are processed in Python using PostGIS functions to remove private areas and to simplify geometries to reduce file sizes, at the cost of spatial accuracy. Data are pre-calculated and served to the Leaflet map in the TopoJSON format to further reduce file sizes and server response times.
The description below discusses the Strava activity processing workflow and server-side processing scripts. You can view the Python files for this project in its GitHub project folder.
I have not yet finished the write-up for the client-side JavaScript/HTML aspect of this project, but the Javascript source code and HTML are available within my Flask Application folder.
Project Description Contents
The Python library stravalib provides useful functions to query the Strava API and parse results into Python objects. Instead of using the library's documentation for my server's authorization and authentication, I ended up following this guide on Medium Analytics Vidhya which was clearer and provided example code for refreshing the API access token. Initially I followed the guide's method for Pickling credentials, however I made scalable, in case I wanted to add more athletes in the future, and I removed dependence on local files by storing the credentials in a database.
This process uses SQLAlchemy to access authentication details stored in Postgres, generates and updates the access token if needed, then populates a authorized stravalib client instance for a athlete:
def getAuth():
"""
Loads Strava client authentication details from Postgres and creates a authorized client instance.
Checks if access token is expired, if so it is refreshed and updated.
Returns
-------
client. Stravalib model client instance. Contains access token to Strava API for the athlete, ID is hard coded for now.
"""
# Build empty stravalib client instance
client = Client()
# create db session
session = Session()
# Hard coded athlete id
athleteID = 7170058
authDict = {}
# Load tokens and expiration time from Postgres
query = session.query(athletes).filter(athletes.athlete_id == athleteID)
for i in query:
authDict["Access_Token"] = i.access_token
authDict["Expiration"] = i.access_token_exp
authDict["Refresh_Token"] = i.refresh_token
# Check if access token has expired, if so request a new one and update Postgres
if time.time() > authDict["Expiration"]:
refresh_response = client.refresh_access_token(client_id=int(os.environ.get('STRAVA_CLIENT_ID')),
client_secret=os.environ.get('STRAVA_CLIENT_SECRET'),
refresh_token=authDict["Refresh_Token"])
# Update access token and expiration date
session.query(athletes).filter(athletes.athlete_id == athleteID). \
update({athletes.access_token: refresh_response['access_token'],
athletes.access_token_exp: refresh_response['expires_at']})
# Commit update
session.commit()
# Set Strava auth details
client.access_token = refresh_response['access_token']
client.refresh_token = authDict["Refresh_Token"]
client.token_expires_at = refresh_response['expires_at']
else:
# Access token is up-to-date, set client details
client.access_token = authDict["Access_Token"]
client.refresh_token = authDict["Refresh_Token"]
client.token_expires_at = authDict["Expiration"]
# Close out session
session.close()
return client
This process uses the Pickle file created in the one-time authentication and is called and edited for all requests to the Strava API:
from application.stravalib.client import Client
import os
import time
import pickle
def gettoken():
# Build empty stravalib client instance
client = Client()
# Load access token from the Pickle file
with open(os.path.join(app.root_path, 'access_token.pickle'), 'rb') as f:
access_token = pickle.load(f)
# Check if access token has expired
if time.time() > access_token['expires_at']:
# Use client ID, secret, and refresh token to generate a new access token with Strava API
refresh_response = client.refresh_access_token(client_id=os.getenv("STRAVA_CLIENT_ID"),
client_secret=os.getenv("STRAVA_CLIENT_SECRET"),
refresh_token=access_token['refresh_token'])
# Open Pickle file and update with new access token
with open(os.path.join(app.root_path, 'access_token.pickle'), 'wb') as f:
pickle.dump(refresh_response, f)
# Set new access token in client instance
client.access_token = refresh_response['access_token']
# Set refresh token in client instance
client.refresh_token = refresh_response['refresh_token']
# Set access token expiration time for client instance
client.token_expires_at = refresh_response['expires_at']
# Access token is still valid, set token in client instance
else:
client.access_token = access_token['access_token']
client.refresh_token = access_token['refresh_token']
client.token_expires_at = access_token['expires_at']
return client
Now that I have full scope access to my account through the Strava API I can begin downloading activities. The API, and stravalib, offers a few different ways to download activities.
There are also options to access routes, segments, efforts, and other account details.
My first goal was to download all my historic activities on Strava and add them to a Postgres/PostGIS database. Considering the API methods available, I decided on the following approach:
Use the List Athlete Activities after date method, set to before I started using Strava, to return the activity IDs for all my recorded activities, then generate a list using these IDs.
def getListIds(client, days):
"""
Gets a list of all Strava Activity IDs since (days) ago from Strava API.
Parameters
----------
client. Stravalib model client object. Contains access token to Strava API for the user.
days. Int. How many days to look back, queries all activities since this calculated date.
Returns
-------
List. List of int IDs of all strava activities for the user.
"""
# use current datetime and timedelta to calculate previous datetime
after = datetime.today() - timedelta(days=days)
# after = datetime(year=2019, month=8, day=1)
actList = []
# Get all activities since after time and add to list
acts = client.get_activities(after=after)
for i in acts:
actList.append(i.id)
return actList
Iterate over activity ID list, passing each activity ID into Get Activity and Get Activity Streams. Parse results by structuring data, removing uninteresting/null details, calculating ancillary data, and combining GPS coordinate and time, provided as time since start of activity, into a PostGIS EWKT LINESTRINGM format. Even though I bring in the time information under a M-value, I am not using this time dimension in this project.
def getFullDetails(client, actId):
"""
Gets the full details of Strava activities using get_activity() to query flat data and get_activity_streams() to get
GPS coordinates and times. Coordinates are formatted to be inserted in PostGIS following ST_GeomFromEWKT.
Parameters
----------
client. Stravalib model client object. Contains access token to strava API for the user.
actId. Int. Activity ID.
Returns
-------
Dict. Activity and coordinate information formatted to be inserted into Postgres/PostGIS.
"""
# Set logger to suppress debug errors, these messages aren't important and pollute the console
Log = logging.getLogger()
Log.setLevel('ERROR')
# Stream data to get from activity streams
types = ['time', 'latlng', 'altitude', 'velocity_smooth', 'grade_smooth', "distance", "heartrate", "cadence", "temp"]
# Get activity details as a dictionary
act = client.get_activity(actId).to_dict()
# Get the activity stream details for the activity id
stream = client.get_activity_streams(actId, types=types)
# Get athlete ID directly from API call, instead of digging into the nested result provided by get_activity
athId = client.get_athlete().id
# Extract latlng and time information from activity stream
latlng = stream['latlng'].data
time = stream['time'].data
lineStringData = []
wktList = []
# Iterate over time and latlng streams, combining them into a list containing sublists with lat, lng, time
for i in range(0, len(latlng)):
# Create new entry, swapping (lat, lon) to (lon, lat) then append time, provided as time since start of activity
## as datetime UTC (time is provided as time
## since start of the activity and is converted to datetime)
# newEntry = [latlng[i][1], latlng[i][0], (starttime + timedelta(seconds=time[i])).timestamp()]
newEntry = [latlng[i][1], latlng[i][0], time[i]]
# Append data as nested list
lineStringData.append(newEntry)
# Take newEntry list and create a string with a space delimiter between list items, add to list of wkt
# This formats data to be friendly with geoalchemy ST_GeomFromEWKT
wktList.append(" ".join(str(v) for v in newEntry))
# print(wktList)
# Format entire list to be friendly with geoalchemy ST_GeomFromEWKT
sep = ", "
wktStr = f"SRID=4326;LINESTRINGM({sep.join(wktList)})"
# Add lat, lng, time as geom key to dict
act['geom'] = lineStringData
act['actId'] = actId
act['geom_wkt'] = wktStr
# Add athlete id to dict
act['athlete_id'] = athId
# Extend type to account for mtb and road rides
act['type_extended'] = None
# Calculate type of riding activity, using GearIDs
if act['gear_id'] in ["b4317610", "b2066194"]:
act['type_extended'] = "Mountain Bike"
elif act['gear_id'] == "b5970935":
act['type_extended'] = "Road Cycling"
elif act['type'] == "Walk":
act['type_extended'] = "Walk"
elif act['type'] == "Run":
act['type_extended'] = "Run"
elif act['type'] == "Hike":
act['type_extended'] = "Walk"
# Wahoo Bolt provides additional data, check if populated, if not set to null
wahooList = ["average_temp", "has_heartrate", "max_heartrate", "average_heartrate", "average_cadence"]
for i in wahooList:
if act[i] == "":
act[i] = None
# List of dictionary keys to remove, these are null or uninteresting
remove_keys = ['guid', 'external_id', 'athlete', 'location_city', 'location_state', 'location_country',
'kudos_count', 'comment_count', 'athlete_count', 'photo_count', 'total_photo_count', 'map',
'trainer', 'commute', 'gear', 'device_watts', 'has_kudoed', 'best_efforts',
'segment_efforts', 'splits_metric', 'splits_standard', 'weighted_average_watts',
'suffer_score',
'embed_token', 'trainer', 'photos', 'instagram_primary_photo', 'partner_logo_url',
'partner_brand_tag', 'from_accepted_tag', 'segment_leaderboard_opt_out', 'highlighted_kudosers',
'laps']
# Iterate over dict keys, removing unnecessary/unwanted keys
for key in list(act.keys()):
if key in remove_keys:
del (act[key])
return {"act": act, "stream": stream}
Next, insert full activity data into Postgres:
def insertOriginalAct(actDict):
"""
Inserts new activity into database, POSTed by Strava webhook update or by manually triggering process activity
event route.
Parameters
----------
actDict. Dict. Generated by StravaWebHook.handle_sub_update() or by getStravaActivities.processActs().
Returns
-------
Nothing. Data are inserted into Postgres/PostGIS.
"""
insert = strava_activities(actID=actDict['actId'], upload_id=actDict['upload_id'], name=actDict['name'],
distance=actDict['distance'], moving_time=actDict['moving_time'],
elapsed_time=actDict['elapsed_time'],
total_elevation_gain=actDict['total_elevation_gain'],
elev_high=actDict['elev_high'], elev_low=actDict['elev_low'], type=actDict['type'],
start_date=actDict['start_date'], start_date_local=actDict['start_date_local'],
timezone=actDict['timezone'], utc_offset=actDict['utc_offset'],
start_latlng=actDict['start_latlng'], end_latlng=actDict['end_latlng'],
start_latitude=actDict['start_latitude'], start_longitude=actDict['start_longitude'],
achievement_count=actDict['achievement_count'], pr_count=actDict['pr_count'],
private=actDict['private'], gear_id=actDict['gear_id'],
average_speed=actDict['average_speed'], max_speed=actDict['max_speed'],
average_watts=actDict['average_watts'], kilojoules=actDict['kilojoules'],
description=actDict['description'], workout_type=actDict['workout_type'],
calories=actDict['calories'], device_name=actDict['device_name'],
manual=actDict['manual'], athlete_id=actDict['athlete_id'],
type_extended=actDict['type_extended'], avgtemp=actDict['average_temp'],
has_heartrate=actDict['has_heartrate'], average_cadence=actDict["average_cadence"],
average_heartrate=actDict['average_heartrate'], max_heartrate=actDict['max_heartrate'],
geom=actDict['geom_wkt'])
session = Session()
session.add(insert)
session.commit()
session.close()
application.logger.debug(f"New webhook update for activity {actDict['actId']} has been added to Postgres!")
Now I have the details and coordinates of every Strava activity on my account stored in my Postgres database ready to be served to a Leaflet application. This creates another problem however, since I stored the full coordinate information for each activity, any personal locations such as my home and homes of friends and family will be visible if I share the data publicly. Strava's solution to this issue is to allow users to create privacy zones, which are used to remove any sections of publicly visible activities that start or end within the zones. This solution is bypassed in my dataset since I queried the full coordinates of my activities using full scope access.
To maintain my privacy, I decided to create my own privacy zones in QGIS and store them within my database. A second, public friendly dataset, was generated using SQLAlchemy and GeoAlchemy2 PostGIS functions which removed all sections that crossed these privacy areas. Also, since the dataset from Strava contains a coordinate vertex about every second of recorded time, I simplified the data to reduce the overall number of vertices.
Here you can see the SQLAlchemy/GeoAlchemy2 ORM expressions used to initially populate the obfuscated public friendly table:
# import GeoAlchemy2 and extended SQLAlchemy functions
from sqlalchemy import func as sqlfunc
# import session factory
from application Session
# Table holding all geometry and attribute data from Strava API
import strava_activities
# Table holding masked, public friendly, data
import strava_activities_masked
def processActivitiesPublic(recordID):
"""
Processes Strava activity by simplifying geometry and removing private areas. This prepares the activity to be
shared publicly on a Leaflet map. These functions greatly reduce the number of vertices, reducing JSON file size,
and process the data to be topoJSON friendly, preventing geometries from failing to be converted.
Parameters
----------
recordID. Int. Strava activity record ID.
Returns
-------
Nothing. Data are processed and committed to PostgresSQL/PostGIS database.
"""
# Create database session
session = Session()
simplifyFactor = 15
geometricProj = 32610
webSRID = 4326
gridSnap = 3
collectionExtract = 3
# Create CTE to query privacy zone polygons, combine them, extract polygons, and transform to geometricProj
privacy_cte = session.query(sqlfunc.ST_Transform(sqlfunc.ST_CollectionExtract(sqlfunc.ST_Collect(AOI.geom),
collectionExtract), geometricProj).label("priv_aoi")).filter(AOI.privacy == "Yes").cte("privacy_aoi")
# Processes all records in the strava_activities table, used for initial masked table setup only
privacyClipQuery = session.query(strava_activities.actID, sqlfunc.ST_AsEWKB(
sqlfunc.ST_Transform(
sqlfunc.ST_MakeValid(
sqlfunc.ST_Multi(
sqlfunc.ST_Simplify(
sqlfunc.ST_SnapToGrid(
sqlfunc.ST_Difference(
sqlfunc.ST_SnapToGrid(sqlfunc.ST_Transform(
strava_activities.geom, geometricProj), nonNodedSnap), privacy_cte.c.priv_aoi)
, gridSnap),
simplifyFactor),
)), webSRID)))
for i in privacyClipQuery:
session.add(strava_activities_masked(actID=i[0], geom=i[1]))
session.commit()
session.close()
The above ORM select query is equivalent to the following PostgreSQL/PostGIS SQL select query:
WITH privacy_cte as
(
SELECT
ST_Transform(ST_CollectionExtract(ST_Collect("AOI".geom), 3), 32610) as priv_aoi
FROM
"AOI"
where
"AOI".privacy = 'Yes'
)
SELECT
strava_activities."actID",
ST_AsEWKB(ST_Transform(ST_MakeValid(ST_Multi(ST_Simplify(ST_SnapToGrid(ST_Difference(ST_SnapToGrid(ST_Transform(strava_activities.geom, 32610), 0.0001), privacy_cte.priv_aoi), 5), 15))), 4326))
FROM
strava_activities,
privacy_cte;
This query does the following:
Next, its time to query the Strava Activity Stream data. These data are recorded every second and contain time, distance, elevation, latlng, and external sensor data. This type of data lends itself well to a tabular format, and I wanted these data available in a CSV such that they can be viewed in profile over the course of the activity.
The full details of the activity are passed into this function, or are queried if not provided. Recently I acquired a bike computer which records additional data that is made available through the API, because of this I query all these stream additional details for all activities, including those which were not recorded with the computer. If the stream data are absent then the API returns nothing for that particular stream type. The following using the results from the getFullDetails function shown above:
def generateAndUploadCSVStream(client, actID, activity=None):
"""
Generates and uploads a privacy zone masked Strava Stream CSV.
@param client: stravalib client instance with valid access token
@param actID: Int. Activity ID of Strava activity to process
@param activity: Dictionary. Optional. Dictionary of full Strava Activity details, generated if not provided
@return: Nothing. Uploads file to S3 Bucket
"""
if not activity:
# Get all activity details for newly created activity, including stream data
activity = getFullDetails(client, actID)
# Create in-memory buffer csv of stream data
csvBuff = StravaAWSS3.writeMemoryCSV(activity["stream"])
# Get WKT formatted latlng stream data
wktStr = formatStreamData(activity["stream"])
# Get list of coordinates which cross privacy areas, these will be removed from the latlng stream CSV data
removeCoordList = DBQueriesStrava.getIntersectingPoints(wktStr)
# Trim/remove rows from latlng CSV stream which have coordinates that intersect the privacy areas
trimmedMemCSV = trimStreamCSV(removeCoordList, csvBuff)
# Upload trimmed buffer csv to AWS S3 bucket
StravaAWSS3.uploadToS3(trimmedMemCSV, activity["act"]["actId"])
Next, the activity stream data are written into a CSV stored in the memory buffer:
def writeMemoryCSV(streamData):
"""
Converts activity stream data dictionary to a In-memory text buffer, avoids needing to write a local file since data
will be uploaded up to S3.
:param streamData: Dict. Formatted Strava Stream Data with lat/longs removed
:return: In-memory text buffer. Activity stream CSV
"""
# Create in-memory text buffer
memOutput = StringIO()
dataDict = {}
# stream types to include, latlngs in privacy zones will be removed
csvTypes = ['time', 'latlng', 'altitude', 'velocity_smooth', 'grade_smooth', "distance", "heartrate", "cadence", "temp"]
# Extract data from stream dictionary
for streamType in csvTypes:
try:
dataDict[streamType] = streamData[streamType].data
except:
application.logger.debug(f"The stream type {streamType} doesn't exist, skipping")
# Iterate over latlngs, which is a list with lat lng, converting to string of lat,lng
for c, i in enumerate(dataDict['latlng']):
dataDict['latlng'][c] = ",".join(str(x) for x in i)
# See: https://stackoverflow.com/questions/23613426/write-dictionary-of-lists-to-a-csv-file
# open buffer and populate with csv data
writer = csv.writer(memOutput)
# Write column names
writer.writerow(dataDict.keys())
# Each key:value(list) in dictionary is a column, write into CSV
# I have no idea how this works, see link above for description
writer.writerows(zip(*dataDict.values()))
return memOutput
This helper function is used to format the point coordinates into a Extended Well-Known Text string:
def formatStreamData(stream):
"""
Formats Strava Activity Stream latlng data into a EWKT string. The string is constructed using string manipulation,
consider finding a library which can convert a list of coordinates into EWKT or WKT.
@param stream: Strava Activity Stream with latlng data
@return: String. EWKT representation of Strava Activity Stream data.
"""
# Pull out latlngs
latlng = stream['latlng'].data
# Format first part of EWKT LINESTRING String, in 4326, WGS1984
wktStr = f"SRID=4326;LINESTRING("
# Iterate over latlng records
for c, i in enumerate(latlng):
# Split based on comma
lat, lng = latlng[c].split(",")
# Make string of new lat lng value
newEntry = f"{lat} {lng},"
# Add new record to existing string
wktStr += newEntry
# Remove last comma
wktStr = wktStr[:-1]
# Close out wktStr
wktStr += ")"
return wktStr
The previously generate EWKT string is used in a GeoAlchemy2 POSTGIS query to determine which point coordinates reside within privacy areas:
def getIntersectingPoints(wktStr):
"""
Takes an EWKT string of a Strava Activity Stream's latlngs and returns a list of float points which reside within
the privacy areas.
@param wktStr: String. EWKT representation of Strava Activity Stream latlngs
@return: List of strings. Points are returned as WGS 1984 coordinate strings in the format lon,lat
"""
# geometricProj = 32610
collectionExtract = 3
# Open session
session = Session()
# Get coordinates from within privacy zones
try:
# Create a labeled common table expression to query privacy zones geometries collected into a single multi-polygon
privacy_cte = session.query(
sqlfunc.ST_CollectionExtract(
sqlfunc.ST_Collect(AOI.geom), collectionExtract).label("ctelab")).filter(
AOI.privacy == "Yes").cte()
# Take provided EWKT string and convert to GeoAlchemy geometry
lineString = sqlfunc.ST_GeomFromEWKT(wktStr)
# Get a list of points from the linestring which fall inside the privacy zone
# ST_DumpPoints provides a point geometry per iterative loop which is converted to a text representation using As_Text
pointQuery = session.query(sqlfunc.ST_AsText(sqlfunc.ST_DumpPoints(sqlfunc.ST_Intersection(lineString, privacy_cte.c.ctelab)).geom))
coordinateList = []
for i in pointQuery:
# strip out the WKT parts of the coordinates, only want list of [lon,lat]
coordinateList.append(formatPointResponse(i))
finally:
session.close()
return coordinateList
These overlapping points, and their corresponding data, are removed from the buffer CSV:
def trimStreamCSV(coordList, memCSV):
"""
Trims out all records from the Strava stream CSV that fall within privacy zones, ensuring that the stream data do
not contain reveal locations within sensitive areas. Coordinates are included in the stream data such that they
can be used to draw point markers on the map on chart mouseover
@param coordList: List. Coordinates which fall within privacy zones
@param memCSV: StringIO CSV. Contains original, unaltered activity stream details
@return: StringIO CSV. Memory CSV with sensitive locations removed
"""
# see https://stackoverflow.com/a/41978062
# Reset seek to 0 for memory CSV, after writing it the file pointer is still at the end and must be reset
memCSV.seek(0)
# Open original memory csv with a reader
reader = csv.reader(memCSV)
# Create new memory CSV to hold results
trimmedMemOutput = StringIO()
# Create csv writer on memory csv
trimmedWriter = csv.writer(trimmedMemOutput)
# Iterate over original CSV
for c, row in enumerate(reader):
# Write header row
if c == 0:
trimmedWriter.writerow(row)
else:
# split row into [lat, lng]
coord = row[1].split(",")
# Check if lat or long exist in the coordinate list
latCheck = any(coord[0] in x for x in coordList)
lngCheck = any(coord[1] in x for x in coordList)
# If neither lat or long are within a privacy zone, write the entire row into the trimmed csv
if not latCheck or not lngCheck:
trimmedWriter.writerow(row)
return trimmedMemOutput
Finally, the buffer CSV is uploaded to a S3 Bucket where it can be shared publicly (currently the Flask Application grants temporary access to individual activities as needed):
def connectToS3():
"""
Establish connection to AWS S3 using environmental variables.
:return: S3 service client.
"""
s3_client = boto3.client(service_name='s3',
aws_access_key_id=os.getenv("BOTO3_Flask_ID"),
aws_secret_access_key=os.getenv("BOTO3_Flask_KEY"))
return s3_client
def uploadToS3(file, actID=None):
"""
Uploads file to S3 Bucket. This bucket is not public but all activities are accessible to the public through the API
with pre-signed temporary URLs. If the Act ID is none then the input is the TopoJSON file.
:param file: Buffer/memory file to be uploaded, either JSON or CSV.
:param actID: Strava Activity ID, used to name uploaded file, if empty then TopoJSON is assumed, which has a static
name
:return:
Nothing, file is uploaded
"""
# Get bucket details from environmental variable
bucket = os.getenv("S3_TRIMMED_STREAM_BUCKET")
# Establish connection to S3 API
conn = connectToS3()
try:
# conn.put_object(Body=memCSV.getvalue(), Bucket=bucket, Key=fileName, ContentType='application/vnd.ms-excel')
if actID:
# Add in-memory buffer csv to bucket
# I think using getvalue and put_object on StringIO solves an issue with the StringIO object not being
# compatible with other boto3 object creation methods see:
fileName = f"stream_{actID}.csv"
conn.put_object(Body=file.getvalue(), Bucket=bucket, Key=fileName)
else:
# Add in-memory buffer TopoJSON file to bucket, file name is static
fileName = "topoJSONPublicActivities.json"
conn.put_object(Body=file, Bucket=bucket, Key=fileName)
except Exception as e:
application.logger.error(f"Upload to S3 bucket failed in the error: {e}")
finally:
# Close in-memory buffer file, removing it from memory
file.close()
GeoJSON is a standard and convenient format for transferring geospatial data over the web, especially since its supported by Leaflet. However, its not very efficient in storing data, largely because it stores a full list of coordinates and contains unnecessary spacing. Currently, my masked GeoJSON dataset exports out to a 2.8 MB JSON file, which is a fairly large file to transfer on every page load. Fortunately, there's the TopoJSON format that in addition to encoding a topology, which isn't useful for this multi-linestring dataset, stores coordiantes as deltas from an origin coordinate, resulting in a large reduction of stored information. Using the Topojson Python library allowed me to reduce the JSON filesize down to about 1.3 MB, still large but us under half the original filesize. While other encoding techniques are available, this format meets the project's needs since it not only reduces filesize and is easily usable in Leaflet, it also retains all attribute information which will be needed in the web map/viewer.
Process to generate TopoJSON:
def createStravaPublicActTopoJSON():
"""
Creates a in memory TopoJSON file containing all database stored Strava Activities. This file will be uploaded to a
S3 Bucket, replacing the existing file. A pre-generated file is used to speed up response time, as generating the
file may take a few seconds. This function is called whenever a new subscription update adds a new activity to the
database or when triggered on the admin page.
Returns
-------
In memory TopoJSON file.
"""
# Create Postgres connection
session = Session()
# Query geom as GeoJSON and other attribute information
query = session.query(sqlfunc.ST_AsGeoJSON(strava_activities_masked.geom, 5),
strava_activities.name,
strava_activities.actID,
strava_activities.type,
strava_activities.distance,
strava_activities.private,
strava_activities.calories,
strava_activities.start_date,
strava_activities.elapsed_time,
strava_activities.moving_time,
strava_activities.average_watts,
strava_activities.start_date_local,
strava_activities.total_elevation_gain,
strava_activities.average_speed,
strava_activities.max_speed,
strava_activities.type_extended,
strava_activities.has_heartrate,
strava_activities.average_cadence,
strava_activities.max_heartrate,
strava_activities.average_heartrate,
strava_gear.gear_name) \
.join(strava_activities_masked.act_rel) \
.join(strava_activities.gear_rel, isouter=True) \
.order_by(strava_activities.start_date.desc())
features = []
for row in query:
# Build a dictionary of the attribute information
propDict = {"name": row.name, "actID": row.actID, "type": row.type, "distance": round(row.distance),
"private": row.private, "calories": round(row.calories),
"startDate": row.start_date_local.isoformat(),
"elapsed_time": row.elapsed_time.seconds, "total_elevation_gain": round(row.total_elevation_gain),
"average_speed": round(row.average_speed, 1), "max_speed": row.max_speed,
"gear_name": row.gear_name,
"type_extended": row.type_extended, "moving_time": row.moving_time.seconds,
"average_watts": row.average_watts,"has_heartrate":row.has_heartrate,
"average_cadence":row.average_cadence, "max_heartrate":row.max_heartrate,
"average_heartrate":row.average_heartrate}
# Take ST_AsGeoJSON() result and load as geojson object
geojsonGeom = geojson.loads(row[0])
# Build the feature and add to feature list
features.append(Feature(geometry=MultiLineString(geojsonGeom), properties=propDict))
session.close()
# Build the feature collection result
feature_collection = FeatureCollection(features)
# Create local topoJSON file of geoJSON Feature Collection. Don't create a topology, doesn't matter for a polyline
# and prequantize the data, this reduces file size at the cost of processing time.
# prequantize 1e7 is used over default, 1e6, to avoid errors in which data were placed in the South Pacific Ocean
return tp.Topology(feature_collection, topology=False, prequantize=10000000).to_json()
This script queries the masked activities as GeoJSON, loads and parses each record into a GeoJSON MultiLineString Feature, combines all records into a Geometry Collection, and finally creates TopoJSON file which is uploaded to an S3 Bucket using the upload function shown above.
The Topology function is very picky about incoming geometries and kept removing records without a explanation as to why, even though they passed PostGIS ST_MakeValid and ST_IsValid. All original, non-masked, GeoJSON records converted properly, I assume that ST_Difference caused geometries to break during conversion. The additional processing steps during masking, in particular ST_SnapToGrid, appeared to have resolved these issues. However, I assume they may need more fine tuning to ensure that no geometries fail to be converted to TopoJSON in the future.
Now that all my data have been processed and made available to the application, I need to keep the dataset up-to-date with newly added activities. To accomplish this I created a Strava webhook/Push subscription using stravalib. This enables my server to receive updates from the Strava API whenever I add a new activity, without needing to poll the API for changes. A update is sent whenever a new activity is added, an existing activity's title, type, or privacy is changed, or if the account revokes access to the application. As this is my own account, I do not handle requests to revoke application authorization. Also note that new activity updates include activity IDs only, its my server's responsibility to call the API for any further details.
While stravalib has functions dedicated to webhooks, they are minimally documented with no examples provided. Also, as of the time I started work on this project, the version of stravalib on PyPI, 0.10.2, did not support the newest version of the Strava API. Fortunately, the stravalib team has an updated version on their Github page which supports it.
Here is the conceptual process of creating a new webhook subscription:
The server issues a subscription creation POST request to the Strava API, containing the Client ID and Secret in addition to an optional user/server created verification token and a required callback URL configured to handle GET and POST requests. The verification token is used by the server to confirm that further setup requests are being sent by Strava. The creation request can be created in stravalib using the following (This is taken from an administration page I created and is called from a HTML POST request):
def addwebhooksub():
"""
Adds a new Strava webhook subscription to the database and Strava API. Kicks off callback verification process.
Called by Strava Activity admin page inputs.
"""
# Get POST request info
# athID = int(request.form['athID'])
# callbackurl = str(request.form['callbackURL'])
# Generate 14 character verify token string
verifytoken = secrets.token_hex(7)
# Insert token into database, will be updated if subID if successful, otherwise row will be deleted
DBQueriesStrava.insertVerifyToken(verifytoken)
# Get Strava API access credentials
client = OAuthStrava.getAuth()
try:
# Send request to create webhook subscription, will be given the new subscription ID in response
application.logger.debug(f"Callback url is {os.getenv('FULL_STRAVA_CALLBACK_URL')}")
response = client.create_subscription(client_id=os.getenv("STRAVA_CLIENT_ID"),
client_secret=os.getenv("STRAVA_CLIENT_SECRET"),
callback_url=os.getenv('FULL_STRAVA_CALLBACK_URL'),
verify_token=verifytoken)
application.logger.debug(f"New sub id is {response.id}, updating database")
# Update database with new sub id
DBQueriesStrava.updateSubId(response.id, verifytoken)
return Response(status=200)
except Exception as e:
DBQueriesStrava.deleteVerifyTokenRecord(verifytoken)
return Response(status=400)
The above script kicks off the process of creating a new webhook subscription, using Client information, a pre-generated verification token, and a full callback URL address. After the subscription has been successfully created, the callback functions are shown further below, the stravalib "create_subscription" function will issue a GET request to the Strava API to get the ID of the newly created subscription. This ID is used to update the subscription entry in my database and is used to verify that webhook subscription updates are from Strava. The following updates the active subscription with the ID, and since an active webhook includes all athletes an application is authorized to access, this record's foreign key is applied to all athletes:
def updateSubId(subId, verifytoken):
"""
Updates webhook subscriptions table with the new subscription id provided by Strava then updates all athletes
with the new subID foreign key.
@param subId: Int. Webhook subscription ID provided by Strava API
@param verifytoken: String. Script generated verification token
@return: Nothing. Database is updated
"""
session = Session()
try:
# Update recently created record which only has the verify token populated
session.query(webhook_subs.verify_token == verifytoken).update({webhook_subs.sub_id: subId,
webhook_subs.activesub: "Yes"})
session.commit()
# Get the primary key from the new webhook subscription
record = session.query(webhook_subs.verify_token == verifytoken).first()
# Update all athletes with the new subscription entry foreign key
session.query(athletes).update({athletes.sub_id: record.id})
session.commit()
session.close()
except Exception as e:
application.logger.debug(f"Update Strava athlete sub Id failed with the exception: {e}")
During initial setup a GET request is sent to the server's callback URL and after successful setup POST requests will be issued when specific account updates occur. The following code is used to handle Strava API webhook subscription GET and POST requests:
@stravaActDashAPI_BP.route(os.environ.get("STRAVA_CALLBACK_URL"), methods=['GET', 'POST'])
def subCallback():
"""
Strava subscription callback URL.
Returns
-------
GET request:
JSON, echoed Strava challenge text.
POST request:
Success code if data are successfully added to Postgres/PostGIS. Strava must receive a 200 code in response to
POST.
"""
res = WebHookFunctionsStrava.handleSubCallback(request)
if res:
return res
else:
return Response(status=200)
def handleSubCallback(request):
"""
Handles requests to Strava subscription callback URL.
GET:
Webhoook Subscription Creation Process:
CallbackURL is sent a GET request containing a challenge code. This code is sent back to requester to verify
the callback.
The initial request to create a new webhook subscription is then provided with verification and
the new subscription ID.
POST:
Webhook subscription update message. Sent when a activity on a subscribed account is created, updated, or deleted,
or when a privacy related profile setting is changed.
All update messages are inputted into Postgres.
Currently, only activity creation events are handled, additional development is needed to handle other events.
Returns
-------
GET request:
JSON, echoed Strava challenge text.
POST request:
Success code if data are successfully added to Postgres/PostGIS. Strava must receive a 200 code in response to
POST.
"""
# Get application access credentials
client = OAuthStrava.getAuth()
# Check if request is a GET callback request, part of webhook subscription process
if request.method == 'GET':
# Extract challenge and verification tokens
callBackContent = request.args.get("hub.challenge")
callBackVerifyToken = request.args.get("hub.verify_token")
# Form callback response as dict
callBackResponse = {"hub.challenge": callBackContent}
# Check if verification tokens match, i.e. if GET request is from Strava
if DBQueriesStrava.checkVerificationToken(callBackVerifyToken):
# Verification succeeded, return challenge code as dict
# Using Flask Response API automatically converts it to JSON with HTTP 200 success code
return callBackResponse
else:
# Verification failed, raise error
raise ValueError('Strava token verification failed, no match found.')
# POST request containing webhook subscription update message, new activity or other change to Strava account
elif request.method == 'POST':
try:
# Convert JSON body to dict
callbackContent = json.loads(request.data, strict=False)
# Call function to handle update message and process new activity, if applicable
handleSubUpdate(client, callbackContent)
except Exception as e:
application.logger.error(f"Strava subscription update failed with the error {e}")
Now that the subscription has been created and callbacks are handled, update messages can be processed. The following code processes the Strava subscription update messages by inserting them into Postgres then triggers a threaded function for activity processing, if applicable:
def handleSubUpdate(client, updateContent):
"""
Handles Strava webhook subscription update. This function is called by a valid Strava POST request to the webhook
subscription callback URL.
Parameters
----------
client. Stravalib model client object. Contains access token to strava API for the user.
updateContent. Dict. POST request JSON data formatted by Flask as a dict.
Returns
-------
Nothing. Data are inserted into Postgres/PostGIS.
"""
# Parse update information into a model using stravalib
update = client.handle_subscription_update(updateContent)
# Verify that the athlete(s) and subscription ID contained in the message are in Postgres
if DBQueriesStrava.checkAthleteAndSub(update.owner_id, update.subscription_id):
application.logger.debug("Sub update from Strava appears valid")
# Insert subscription update message details into Postgres
DBQueriesStrava.insertSubUpdate(update)
# Verify that the update is a activity creation event
if update.aspect_type == "create" and update.object_type == "activity":
application.logger.debug("This is a activity create event, creating thread to process activity")
try:
# Create a thread to handle async processing of the activity and its derivatives
# Threading allows the activity to long process with a quick 200 code to be sent to the Strava API
Thread(target=APIFunctionsStrava.singleActivityProcessing, args=(client, update.object_id)).start()
except Exception as e:
application.logger.error(f"Creating a thread to process new activity failed with in the error: {e}")
elif update.aspect_type == "update" and update.object_type == "activity":
application.logger.debug("This is a activity update event, updating existing record")
# Update existing activity title
DBQueriesStrava.updateExistingActivity(update)
else:
# Write logic to handle delete events
application.logger.debug("Sub update message contains an delete event, skipping request")
pass
else:
application.logger.debug("POST request is invalid, user ID or subscription ID don't match those in database!")
Insert subscription update details into Postgres:
def insertSubUpdate(content):
"""
Inserts Strava webhook subscription data into Postgres database. This information will be used to get full activity
information from another query.
Parameters
----------
content. Subscription Update object of Strava webhook update generated by Stravalib
Returns
-------
Nothing. Updates database.
"""
# Verify is activity title is in update data, if not set to None. Some activities may have empty titles.
if "title" in content.updates.keys():
title = content.updates['title']
application.logger.debug(f"Title of new activity is {title}")
else:
title = None
session = Session()
insert = sub_update(aspect=content.aspect_type, event_time=datetime.fromtimestamp(content.event_time.timestamp),
object_id=content.object_id, object_type=content.object_type, owner_id=content.owner_id,
subscription_id=content.subscription_id,
update_title=title)
session.add(insert)
session.commit()
session.close()
application.logger.debug(f"New webhook update has been added to Postgres!")
The Strava API requires a success response within 2 seconds or else it will attempt 2 more requests before timing out. Since my process currently exceeds this time allowance I needed a way to process asynchronously. I did not want to spend the time setting up background processing and task queuing, instead I decided to go with a multithreaded approach, which allows the data processing to occur concurrently. While not truly asynchronous, this enables Flask to return a 200 success code while still working on the threaded process:
def singleActivityProcessing(client, actID):
"""
Processes a single Strava Activity by placing the full activity in the database, making a simplified and masked public
version, and by creating a privacy masked stream CSV which is added to a S3 Bucket. Finally a TopoJSON of the
public activities is generated and uploaded to the S3 Bucket.
@param client: stravalib client instance with valid access token
@param actID: Int. ID of Strava Activity to be processed
@return: Email. Message states if process was successful or failed
"""
try:
application.logger.debug("Getting full activity details")
# Get all activity details for newly created activity, including stream data
activity = getFullDetails(client, actID)
application.logger.debug("Inserting activity details")
# Insert original, non-masked, coordinates and attribute details into Postgres/PostGIS
DBQueriesStrava.insertOriginalAct(activity['act'])
# Calculate masked, publicly sharable, activities and insert into Postgres masked table
application.logger.debug("Processing and inserting masked geometries")
DBQueriesStrava.processActivitiesPublic(activity["act"]["actId"])
# Handle CSV stream processing
generateAndUploadCSVStream(client, actID, activity)
# Create topojson file
topoJSON = DBQueriesStrava.createStravaPublicActTopoJSON()
# Upload topoJSON to AWS S3
StravaAWSS3.uploadToS3(topoJSON)
application.logger.debug("Strava activity has been processed!")
except Exception as e:
application.logger.error(f"Handling and inserting new webhook activity inside a thread failed with the error {e}")
# Raise another exception, this will signal the route function to return an error 500
raise()
Now a process flow is setup to automatically process new Strava activities to be consumed by a public facing dashboard using Leaflet to display geographic data. The most recent version of my dashboard is visible at the top of this page, and a full page dashboard is available here. I haven't had the motivation to finish the writeup for the client-side HTML/JavaScript for this project, however the GitHub Repo project folder can be found here.
This was a fun, challenging, and rewarding project to work on. I was able to get my first experience working with GeoAlchemy and PostGIS functions to manipulate spatial data. I also learned, through much trial and error, that spatial datasets need to be aggregated for some PostGIS functions to return desired results.