Strava Activities Project - Data Preparation and Server Side Processing

Summary

This personal project displays my up-to-date Strava fitness activity information on a interactive mobile friendly data dashboard using Leaflet to display geographical data and Chart.JS to display graphical information. Data can be filtered and explored by using buttons, date selections, searches, and by selecting geographical data.

Strava is a fitness tracking mobile application which logs GPS data recorded during outdoor fitness activities. I have been logging rides, walks, runs, and hikes for years to the application and have accumulated over 400 total activities. Strava provides an API to access these records and additional data that the service calculates. I saw this as an opportunity to expand my server and client side development skills by accessing, processing, and presenting the data in a map/dashboard.

Historic Strava data were initially pulled from the Strava API and processed using Python, then a webhook subscription was created which updates my server when new activities are available for processing. Strava activity data are processed in Python using PostGIS functions to remove private areas and to simplify geometries to reduce file sizes, at the cost of spatial accuracy. Data are pre-calculated and served to the Leaflet map in the TopoJSON format to further reduce file sizes and server response times.

The description below discusses the Strava activity processing workflow and server-side processing scripts. You can view the Python files for this project in its GitHub project folder.

I have not yet finished the write-up for the client-side JavaScript/HTML aspect of this project, but the Javascript source code and HTML are available within my Flask Application folder.

Access Activities on Strava API - stravalib

The Python library stravalib provides useful functions to query the Strava API and parse results into Python objects. Instead of using the library's documentation for my server's authorization and authentication, I ended up following this guide on Medium Analytics Vidhya which was clearer and provided example code for refreshing the API access token. Initially I followed the guide's method for Pickling credentials, however I made scalable, in case I wanted to add more athletes in the future, and I removed dependence on local files by storing the credentials in a database.

This process uses SQLAlchemy to access authentication details stored in Postgres, generates and updates the access token if needed, then populates a authorized stravalib client instance for a athlete:


def getAuth():
		"""
		Loads Strava client authentication details from Postgres and creates a authorized client instance.
		Checks if access token is expired, if so it is refreshed and updated.

		Returns
		-------
		client. Stravalib model client instance. Contains access token to Strava API for the athlete, ID is hard coded for now.
		"""
		# Build empty stravalib client instance
		client = Client()
		# create db session
		session = Session()
		# Hard coded athlete id
		athleteID = 7170058
		authDict = {}
		# Load tokens and expiration time from Postgres
		query = session.query(athletes).filter(athletes.athlete_id == athleteID)
		for i in query:
				authDict["Access_Token"] = i.access_token
				authDict["Expiration"] = i.access_token_exp
				authDict["Refresh_Token"] = i.refresh_token
		# Check if access token has expired, if so request a new one and update Postgres
		if time.time() > authDict["Expiration"]:
				refresh_response = client.refresh_access_token(client_id=int(os.environ.get('STRAVA_CLIENT_ID')),
																											 client_secret=os.environ.get('STRAVA_CLIENT_SECRET'),
																											 refresh_token=authDict["Refresh_Token"])
				# Update access token and expiration date
				session.query(athletes).filter(athletes.athlete_id == athleteID). \
						update({athletes.access_token: refresh_response['access_token'],
										athletes.access_token_exp: refresh_response['expires_at']})
				# Commit update
				session.commit()
				# Set Strava auth details
				client.access_token = refresh_response['access_token']
				client.refresh_token = authDict["Refresh_Token"]
				client.token_expires_at = refresh_response['expires_at']
		else:
				# Access token is up-to-date, set client details
				client.access_token = authDict["Access_Token"]
				client.refresh_token = authDict["Refresh_Token"]
				client.token_expires_at = authDict["Expiration"]
		# Close out session
		session.close()
		return client
							

This process uses the Pickle file created in the one-time authentication and is called and edited for all requests to the Strava API:


	from application.stravalib.client import Client
	import os
	import time
	import pickle

	def gettoken():
		# Build empty stravalib client instance
		client = Client()
		# Load access token from the Pickle file
		with open(os.path.join(app.root_path, 'access_token.pickle'), 'rb') as f:
		    access_token = pickle.load(f)
		# Check if access token has expired
		if time.time() > access_token['expires_at']:
		    # Use client ID, secret, and refresh token to generate a new access token with Strava API
		    refresh_response = client.refresh_access_token(client_id=os.getenv("STRAVA_CLIENT_ID"),
		                                                   client_secret=os.getenv("STRAVA_CLIENT_SECRET"),
		                                                   refresh_token=access_token['refresh_token'])
		    # Open Pickle file and update with new access token
		    with open(os.path.join(app.root_path, 'access_token.pickle'), 'wb') as f:
		        pickle.dump(refresh_response, f)
		    # Set new access token in client instance
		    client.access_token = refresh_response['access_token']
		    # Set refresh token in client instance
		    client.refresh_token = refresh_response['refresh_token']
		    # Set access token expiration time for client instance
		    client.token_expires_at = refresh_response['expires_at']
		# Access token is still valid, set token in client instance
		else:
		    client.access_token = access_token['access_token']
		    client.refresh_token = access_token['refresh_token']
		    client.token_expires_at = access_token['expires_at']
		return client
						

Now that I have full scope access to my account through the Strava API I can begin downloading activities. The API, and stravalib, offers a few different ways to download activities.

There are also options to access routes, segments, efforts, and other account details.

My first goal was to download all my historic activities on Strava and add them to a Postgres/PostGIS database. Considering the API methods available, I decided on the following approach:

Use the List Athlete Activities after date method, set to before I started using Strava, to return the activity IDs for all my recorded activities, then generate a list using these IDs.


def getListIds(client, days):
  """
  Gets a list of all Strava Activity IDs since (days) ago from Strava API.

  Parameters
  ----------
  client. Stravalib model client object. Contains access token to Strava API for the user.
  days. Int. How many days to look back, queries all activities since this calculated date.

  Returns
  -------
  List. List of int IDs of all strava activities for the user.
  """
  # use current datetime and timedelta to calculate previous datetime
  after = datetime.today() - timedelta(days=days)
  # after = datetime(year=2019, month=8, day=1)
  actList = []
  # Get all activities since after time and add to list
  acts = client.get_activities(after=after)
  for i in acts:
      actList.append(i.id)
  return actList
					

Iterate over activity ID list, passing each activity ID into Get Activity and Get Activity Streams. Parse results by structuring data, removing uninteresting/null details, calculating ancillary data, and combining GPS coordinate and time, provided as time since start of activity, into a PostGIS EWKT LINESTRINGM format. Even though I bring in the time information under a M-value, I am not using this time dimension in this project.


def getFullDetails(client, actId):
    """
    Gets the full details of Strava activities using get_activity() to query flat data and get_activity_streams() to get
    GPS coordinates and times. Coordinates are formatted to be inserted in PostGIS following ST_GeomFromEWKT.

    Parameters
    ----------
    client. Stravalib model client object. Contains access token to strava API for the user.
    actId. Int. Activity ID.

    Returns
    -------
    Dict. Activity and coordinate information formatted to be inserted into Postgres/PostGIS.
    """

    # Set logger to suppress debug errors, these messages aren't important and pollute the console
    Log = logging.getLogger()
    Log.setLevel('ERROR')
    # Stream data to get from activity streams
    types = ['time', 'latlng', 'altitude', 'velocity_smooth', 'grade_smooth', "distance", "heartrate", "cadence", "temp"]
    # Get activity details as a dictionary
    act = client.get_activity(actId).to_dict()
    # Get the activity stream details for the activity id
    stream = client.get_activity_streams(actId, types=types)
    # Get athlete ID directly from API call, instead of digging into the nested result provided by get_activity
    athId = client.get_athlete().id
    # Extract latlng and time information from activity stream
    latlng = stream['latlng'].data
    time = stream['time'].data
    lineStringData = []
    wktList = []
    # Iterate over time and latlng streams, combining them into a list containing sublists with lat, lng, time
    for i in range(0, len(latlng)):
        # Create new entry, swapping (lat, lon) to (lon, lat) then append time, provided as time since start of activity
        ## as datetime UTC (time is provided as time
        ## since start of the activity and is converted to datetime)
        # newEntry = [latlng[i][1], latlng[i][0], (starttime + timedelta(seconds=time[i])).timestamp()]
        newEntry = [latlng[i][1], latlng[i][0], time[i]]
        # Append data as nested list
        lineStringData.append(newEntry)
        # Take newEntry list and create a string with a space delimiter between list items, add to list of wkt
        # This formats data to be friendly with geoalchemy ST_GeomFromEWKT
        wktList.append(" ".join(str(v) for v in newEntry))
        # print(wktList)
    # Format entire list to be friendly with geoalchemy ST_GeomFromEWKT
    sep = ", "
    wktStr = f"SRID=4326;LINESTRINGM({sep.join(wktList)})"
    # Add lat, lng, time as geom key to dict
    act['geom'] = lineStringData
    act['actId'] = actId
    act['geom_wkt'] = wktStr
    # Add athlete id to dict
    act['athlete_id'] = athId
    # Extend type to account for mtb and road rides
    act['type_extended'] = None
    # Calculate type of riding activity, using GearIDs
    if act['gear_id'] in ["b4317610", "b2066194"]:
        act['type_extended'] = "Mountain Bike"
    elif act['gear_id'] == "b5970935":
        act['type_extended'] = "Road Cycling"
    elif act['type'] == "Walk":
        act['type_extended'] = "Walk"
    elif act['type'] == "Run":
        act['type_extended'] = "Run"
    elif act['type'] == "Hike":
        act['type_extended'] = "Walk"
    # Wahoo Bolt provides additional data, check if populated, if not set to null
    wahooList = ["average_temp", "has_heartrate", "max_heartrate", "average_heartrate", "average_cadence"]
    for i in wahooList:
        if act[i] == "":
            act[i] = None
    # List of dictionary keys to remove, these are null or uninteresting
    remove_keys = ['guid', 'external_id', 'athlete', 'location_city', 'location_state', 'location_country',
                   'kudos_count', 'comment_count', 'athlete_count', 'photo_count', 'total_photo_count', 'map',
                   'trainer', 'commute', 'gear', 'device_watts', 'has_kudoed', 'best_efforts',
                   'segment_efforts', 'splits_metric', 'splits_standard', 'weighted_average_watts',
                   'suffer_score',
                   'embed_token', 'trainer', 'photos', 'instagram_primary_photo', 'partner_logo_url',
                   'partner_brand_tag', 'from_accepted_tag', 'segment_leaderboard_opt_out', 'highlighted_kudosers',
                   'laps']
    # Iterate over dict keys, removing unnecessary/unwanted keys
    for key in list(act.keys()):
        if key in remove_keys:
            del (act[key])
    return {"act": act, "stream": stream}
					

Next, insert full activity data into Postgres:


def insertOriginalAct(actDict):
    """
    Inserts new activity into database, POSTed by Strava webhook update or by manually triggering process activity
    event route.

    Parameters
    ----------
    actDict. Dict. Generated by StravaWebHook.handle_sub_update() or by getStravaActivities.processActs().

    Returns
    -------
    Nothing. Data are inserted into Postgres/PostGIS.
    """
    insert = strava_activities(actID=actDict['actId'], upload_id=actDict['upload_id'], name=actDict['name'],
                               distance=actDict['distance'], moving_time=actDict['moving_time'],
                               elapsed_time=actDict['elapsed_time'],
                               total_elevation_gain=actDict['total_elevation_gain'],
                               elev_high=actDict['elev_high'], elev_low=actDict['elev_low'], type=actDict['type'],
                               start_date=actDict['start_date'], start_date_local=actDict['start_date_local'],
                               timezone=actDict['timezone'], utc_offset=actDict['utc_offset'],
                               start_latlng=actDict['start_latlng'], end_latlng=actDict['end_latlng'],
                               start_latitude=actDict['start_latitude'], start_longitude=actDict['start_longitude'],
                               achievement_count=actDict['achievement_count'], pr_count=actDict['pr_count'],
                               private=actDict['private'], gear_id=actDict['gear_id'],
                               average_speed=actDict['average_speed'], max_speed=actDict['max_speed'],
                               average_watts=actDict['average_watts'], kilojoules=actDict['kilojoules'],
                               description=actDict['description'], workout_type=actDict['workout_type'],
                               calories=actDict['calories'], device_name=actDict['device_name'],
                               manual=actDict['manual'], athlete_id=actDict['athlete_id'],
                               type_extended=actDict['type_extended'], avgtemp=actDict['average_temp'],
                               has_heartrate=actDict['has_heartrate'], average_cadence=actDict["average_cadence"],
                               average_heartrate=actDict['average_heartrate'], max_heartrate=actDict['max_heartrate'],
                               geom=actDict['geom_wkt'])
    session = Session()
    session.add(insert)
    session.commit()
    session.close()
    application.logger.debug(f"New webhook update for activity {actDict['actId']} has been added to Postgres!")
					

Obfuscate Sensitive Locations - SQLAlchemy/GeoAlchemy2

Now I have the details and coordinates of every Strava activity on my account stored in my Postgres database ready to be served to a Leaflet application. This creates another problem however, since I stored the full coordinate information for each activity, any personal locations such as my home and homes of friends and family will be visible if I share the data publicly. Strava's solution to this issue is to allow users to create privacy zones, which are used to remove any sections of publicly visible activities that start or end within the zones. This solution is bypassed in my dataset since I queried the full coordinates of my activities using full scope access.

To maintain my privacy, I decided to create my own privacy zones in QGIS and store them within my database. A second, public friendly dataset, was generated using SQLAlchemy and GeoAlchemy2 PostGIS functions which removed all sections that crossed these privacy areas. Also, since the dataset from Strava contains a coordinate vertex about every second of recorded time, I simplified the data to reduce the overall number of vertices.

Here you can see the SQLAlchemy/GeoAlchemy2 ORM expressions used to initially populate the obfuscated public friendly table:


# import GeoAlchemy2 and extended SQLAlchemy functions
from sqlalchemy import func as sqlfunc
# import session factory
from application Session
# Table holding all geometry and attribute data from Strava API
import strava_activities
# Table holding masked, public friendly, data
import strava_activities_masked

def processActivitiesPublic(recordID):
	"""
	Processes Strava activity by simplifying geometry and removing private areas. This prepares the activity to be
	shared publicly on a Leaflet map. These functions greatly reduce the number of vertices, reducing JSON file size,
	and process the data to be topoJSON friendly, preventing geometries from failing to be converted.
	Parameters
	----------
	recordID. Int. Strava activity record ID.

	Returns
	-------
	Nothing. Data are processed and committed to PostgresSQL/PostGIS database.
	"""

	# Create database session
	session = Session()
	simplifyFactor = 15
	geometricProj = 32610
	webSRID = 4326
	gridSnap = 3
	collectionExtract = 3
	# Create CTE to query privacy zone polygons, combine them, extract polygons, and transform to geometricProj
	privacy_cte = session.query(sqlfunc.ST_Transform(sqlfunc.ST_CollectionExtract(sqlfunc.ST_Collect(AOI.geom),
		 collectionExtract), geometricProj).label("priv_aoi")).filter(AOI.privacy == "Yes").cte("privacy_aoi")

	# Processes all records in the strava_activities table, used for initial masked table setup only
privacyClipQuery = session.query(strava_activities.actID, sqlfunc.ST_AsEWKB(
		sqlfunc.ST_Transform(
				sqlfunc.ST_MakeValid(
						sqlfunc.ST_Multi(
								sqlfunc.ST_Simplify(
										sqlfunc.ST_SnapToGrid(
												sqlfunc.ST_Difference(
														sqlfunc.ST_SnapToGrid(sqlfunc.ST_Transform(
																strava_activities.geom, geometricProj), nonNodedSnap), privacy_cte.c.priv_aoi)
												, gridSnap),
										simplifyFactor),
										)), webSRID)))
	for i in privacyClipQuery:
	   session.add(strava_activities_masked(actID=i[0], geom=i[1]))
	session.commit()
	session.close()
					

The above ORM select query is equivalent to the following PostgreSQL/PostGIS SQL select query:


WITH privacy_cte as
(
   SELECT
      ST_Transform(ST_CollectionExtract(ST_Collect("AOI".geom), 3), 32610) as priv_aoi
   FROM
      "AOI"
   where
      "AOI".privacy = 'Yes'
)
SELECT
   strava_activities."actID",
   ST_AsEWKB(ST_Transform(ST_MakeValid(ST_Multi(ST_Simplify(ST_SnapToGrid(ST_Difference(ST_SnapToGrid(ST_Transform(strava_activities.geom, 32610), 0.0001), privacy_cte.priv_aoi), 5), 15))), 4326))
FROM
   strava_activities,
   privacy_cte;
					

This query does the following:

  1. Create a common table expression (CTE) to select privacy zones geometry. This CTE is used to create a single multi-part polygon containing all privacy zones. This ensures that ST_Difference only calculates the difference between each activity and the privacy zones only once. If the privacy zones are not combined, then the difference between each privacy zone record and the activity would be calculated, resulting in duplicated results.
    1. Select AOI polygons flagged as privacy zones.
    2. Combine polygons into a single multi-part polygon contained inside a geometry collection (ST_Collect).
    3. Extract multi-polygon from geometry collection (ST_CollectionExtract). Even though this collection only contains the multi-polygon, it still needs to be extracted.
    4. Transform geometry to the projected coordinate system geometricProj (ST_Transform). Using a projected coordinate allows for faster geometric calculations and allows for meters to be used in PostGIS function parameters, which use the geometry's unit system.
  2. Select strava_activities activity linestring geometry based on Record ID and transform (ST_Transform) to geometricProj.
  3. Snap activity linestrings to a 0.0001m grid (ST_SnapToGrid, variant 3). This solves a non-node intersection error when running ST_Difference. See this StackExchange thread for an explanation for this problem and solution
  4. Calculate difference (ST_Difference) between activity linestring and privacy zone CTE result. ST_Difference subtracts geometry B from A, removing the vertices from A that are within B and segments that touch B.
  5. Snap activity linestring vertices to a 5m grid(ST_SnapToGrid, variant 3). This removes some messy areas by combining and removing excess vertices while also reducing resulting geometry memory/file size. This also solves geometric errors when exporting data to a topoJSON format. However, resulting linestring geometries have a step-shaped appearance resembling the grid.
  6. Simplify activity linestring with a 15m tolerance (ST_Simplify). This further removes messy areas and bends in the linestring by removing vertices to create longer straight line segments. This provides large reductions in resulting geometry memory/file sizes and mitigates the step-shaped results created by ST_SnapToGrid.
  7. Convert linestrings to multi-linestrings (ST_Multi). Geometries in the strava_activities table are stored as linestrings since activity data provided by Strava are contiguous and don't need to be stored in a multi-part format. However, ST_Difference may create multi-linestrings that must be stored as such, so all geometries are converted to this format.
  8. Fix any invalid activity linestring geometries (ST_MakeValid) that were generated during prior processing.
  9. Transform activity linestring geometry (ST_Transform) back into WGS 1984, SRID 4326. WGS 1984 is best this project since its required for display in Leaflet.
  10. Convert linestring geometry representation to Extended Well Known Binary (ST_AsEWKB). This ensures that data can be be easily inserted into the strava_activities_masked table.
  11. Query Activity ID of strava_activities record. Will be inserted as a foreign in strava_activities_masked table.

Process Activity Streams - GeoAlchemy2 & Boto3

Next, its time to query the Strava Activity Stream data. These data are recorded every second and contain time, distance, elevation, latlng, and external sensor data. This type of data lends itself well to a tabular format, and I wanted these data available in a CSV such that they can be viewed in profile over the course of the activity.

The full details of the activity are passed into this function, or are queried if not provided. Recently I acquired a bike computer which records additional data that is made available through the API, because of this I query all these stream additional details for all activities, including those which were not recorded with the computer. If the stream data are absent then the API returns nothing for that particular stream type. The following using the results from the getFullDetails function shown above:


def generateAndUploadCSVStream(client, actID, activity=None):
  """
  Generates and uploads a privacy zone masked Strava Stream CSV.

  @param client: stravalib client instance with valid access token
  @param actID: Int. Activity ID of Strava activity to process
  @param activity: Dictionary. Optional. Dictionary of full Strava Activity details, generated if not provided
  @return: Nothing. Uploads file to S3 Bucket
  """
  if not activity:
      # Get all activity details for newly created activity, including stream data
      activity = getFullDetails(client, actID)
  # Create in-memory buffer csv of stream data
  csvBuff = StravaAWSS3.writeMemoryCSV(activity["stream"])
  # Get WKT formatted latlng stream data
  wktStr = formatStreamData(activity["stream"])
  # Get list of coordinates which cross privacy areas, these will be removed from the latlng stream CSV data
  removeCoordList = DBQueriesStrava.getIntersectingPoints(wktStr)
  # Trim/remove rows from latlng CSV stream which have coordinates that intersect the privacy areas
  trimmedMemCSV = trimStreamCSV(removeCoordList, csvBuff)
  # Upload trimmed buffer csv to AWS S3 bucket
  StravaAWSS3.uploadToS3(trimmedMemCSV, activity["act"]["actId"])
				

Next, the activity stream data are written into a CSV stored in the memory buffer:


def writeMemoryCSV(streamData):
  """
  Converts activity stream data dictionary to a In-memory text buffer, avoids needing to write a local file since data
  will be uploaded up to S3.

  :param streamData: Dict. Formatted Strava Stream Data with lat/longs removed
  :return: In-memory text buffer. Activity stream CSV
  """
  # Create in-memory text buffer
  memOutput = StringIO()
  dataDict = {}
  # stream types to include, latlngs in privacy zones will be removed
  csvTypes =  ['time', 'latlng', 'altitude', 'velocity_smooth', 'grade_smooth', "distance", "heartrate", "cadence", "temp"]
  # Extract data from stream dictionary
  for streamType in csvTypes:
      try:
          dataDict[streamType] = streamData[streamType].data
      except:
          application.logger.debug(f"The stream type {streamType} doesn't exist, skipping")
  # Iterate over latlngs, which is a list with lat lng, converting to string of lat,lng
  for c, i in enumerate(dataDict['latlng']):
      dataDict['latlng'][c] = ",".join(str(x) for x in i)
  # See: https://stackoverflow.com/questions/23613426/write-dictionary-of-lists-to-a-csv-file
  # open buffer and populate with csv data
  writer = csv.writer(memOutput)
  # Write column names
  writer.writerow(dataDict.keys())
  # Each key:value(list) in dictionary is a column, write into CSV
  # I have no idea how this works, see link above for description
  writer.writerows(zip(*dataDict.values()))
  return memOutput
				

This helper function is used to format the point coordinates into a Extended Well-Known Text string:


def formatStreamData(stream):
  """
  Formats Strava Activity Stream latlng data into a EWKT string. The string is constructed using string manipulation,
  consider finding a library which can convert a list of coordinates into EWKT or WKT.

  @param stream: Strava Activity Stream with latlng data
  @return: String. EWKT representation of Strava Activity Stream data.
  """
  # Pull out latlngs
  latlng = stream['latlng'].data
  # Format first part of EWKT LINESTRING String, in 4326, WGS1984
  wktStr = f"SRID=4326;LINESTRING("
  #  Iterate over latlng records
  for c, i in enumerate(latlng):
      # Split based on comma
      lat, lng = latlng[c].split(",")
      # Make string of new lat lng value
      newEntry = f"{lat} {lng},"
      # Add new record to existing string
      wktStr += newEntry
  # Remove last comma
  wktStr = wktStr[:-1]
  # Close out wktStr
  wktStr += ")"
  return wktStr
			

The previously generate EWKT string is used in a GeoAlchemy2 POSTGIS query to determine which point coordinates reside within privacy areas:


def getIntersectingPoints(wktStr):
    """
    Takes an EWKT string of a Strava Activity Stream's latlngs and returns a list of float points which reside within
    the privacy areas.
    @param wktStr: String. EWKT representation of Strava Activity Stream latlngs
    @return: List of strings. Points are returned as WGS 1984 coordinate strings in the format lon,lat
    """
    # geometricProj = 32610
    collectionExtract = 3
    # Open session
    session = Session()
    # Get coordinates from within privacy zones
    try:
        # Create a labeled common table expression to query privacy zones geometries collected into a single multi-polygon
        privacy_cte = session.query(
            sqlfunc.ST_CollectionExtract(
            sqlfunc.ST_Collect(AOI.geom), collectionExtract).label("ctelab")).filter(
            AOI.privacy == "Yes").cte()
        # Take provided EWKT string and convert to GeoAlchemy geometry
        lineString = sqlfunc.ST_GeomFromEWKT(wktStr)

        # Get a list of points from the linestring which fall inside the privacy zone
        # ST_DumpPoints provides a point geometry per iterative loop which is converted to a text representation using As_Text
        pointQuery = session.query(sqlfunc.ST_AsText(sqlfunc.ST_DumpPoints(sqlfunc.ST_Intersection(lineString, privacy_cte.c.ctelab)).geom))
        coordinateList = []
        for i in pointQuery:
            # strip out the WKT parts of the coordinates, only want list of [lon,lat]
            coordinateList.append(formatPointResponse(i))
    finally:
        session.close()
    return coordinateList
			

These overlapping points, and their corresponding data, are removed from the buffer CSV:


	def trimStreamCSV(coordList, memCSV):
    """
    Trims out all records from the Strava stream CSV that fall within privacy zones, ensuring that the stream data do
    not contain reveal locations within sensitive areas. Coordinates are included in the stream data such that they
    can be used to draw point markers on the map on chart mouseover

    @param coordList: List. Coordinates which fall within privacy zones
    @param memCSV: StringIO CSV. Contains original, unaltered activity stream details
    @return: StringIO CSV. Memory CSV with sensitive locations removed
    """

    # see https://stackoverflow.com/a/41978062
    # Reset seek to 0 for memory CSV, after writing it the file pointer is still at the end and must be reset
    memCSV.seek(0)
    # Open original memory csv with a reader
    reader = csv.reader(memCSV)
    # Create new memory CSV to hold results
    trimmedMemOutput = StringIO()
    # Create csv writer on memory csv
    trimmedWriter = csv.writer(trimmedMemOutput)
    # Iterate over original CSV
    for c, row in enumerate(reader):
        # Write header row
        if c == 0:
            trimmedWriter.writerow(row)
        else:
            # split row into [lat, lng]
            coord = row[1].split(",")
            # Check if lat or long exist in the coordinate list
            latCheck = any(coord[0] in x for x in coordList)
            lngCheck = any(coord[1] in x for x in coordList)
            # If neither lat or long are within a privacy zone, write the entire row into the trimmed csv
            if not latCheck or not lngCheck:
                trimmedWriter.writerow(row)
    return trimmedMemOutput
				

Finally, the buffer CSV is uploaded to a S3 Bucket where it can be shared publicly (currently the Flask Application grants temporary access to individual activities as needed):


def connectToS3():
    """
    Establish connection to AWS S3 using environmental variables.

    :return: S3 service client.
    """
    s3_client = boto3.client(service_name='s3',
                             aws_access_key_id=os.getenv("BOTO3_Flask_ID"),
                             aws_secret_access_key=os.getenv("BOTO3_Flask_KEY"))
    return s3_client
def uploadToS3(file, actID=None):
  """
  Uploads file to S3 Bucket. This bucket is not public but all activities are accessible to the public through the API
  with pre-signed temporary URLs. If the Act ID is none then the input is the TopoJSON file.

  :param file: Buffer/memory file to be uploaded, either JSON or CSV.
  :param actID: Strava Activity ID, used to name uploaded file, if empty then TopoJSON is assumed, which has a static
  name
  :return:
  Nothing, file is uploaded
  """

  # Get bucket details from environmental variable
  bucket = os.getenv("S3_TRIMMED_STREAM_BUCKET")
  # Establish connection to S3 API
  conn = connectToS3()

  try:
      # conn.put_object(Body=memCSV.getvalue(), Bucket=bucket, Key=fileName, ContentType='application/vnd.ms-excel')
      if actID:
          # Add in-memory buffer csv to bucket
          # I think using getvalue and put_object on StringIO solves an issue with the StringIO object not being
          # compatible with other boto3 object creation methods see:
          fileName = f"stream_{actID}.csv"
          conn.put_object(Body=file.getvalue(), Bucket=bucket, Key=fileName)
      else:
          # Add in-memory buffer TopoJSON file to bucket, file name is static
          fileName = "topoJSONPublicActivities.json"
          conn.put_object(Body=file, Bucket=bucket, Key=fileName)
  except Exception as e:
      application.logger.error(f"Upload to S3 bucket failed in the error: {e}")
  finally:
      # Close in-memory buffer file, removing it from memory
      file.close()
				

Prepare Data for Leaflet - TopoJSON

GeoJSON is a standard and convenient format for transferring geospatial data over the web, especially since its supported by Leaflet. However, its not very efficient in storing data, largely because it stores a full list of coordinates and contains unnecessary spacing. Currently, my masked GeoJSON dataset exports out to a 2.8 MB JSON file, which is a fairly large file to transfer on every page load. Fortunately, there's the TopoJSON format that in addition to encoding a topology, which isn't useful for this multi-linestring dataset, stores coordiantes as deltas from an origin coordinate, resulting in a large reduction of stored information. Using the Topojson Python library allowed me to reduce the JSON filesize down to about 1.3 MB, still large but us under half the original filesize. While other encoding techniques are available, this format meets the project's needs since it not only reduces filesize and is easily usable in Leaflet, it also retains all attribute information which will be needed in the web map/viewer.

Process to generate TopoJSON:


	def createStravaPublicActTopoJSON():
			"""
			Creates a in memory TopoJSON file containing all database stored Strava Activities. This file will be uploaded to a
			S3 Bucket, replacing the existing file. A pre-generated file is used to speed up response time, as generating the
			file may take a few seconds. This function is called whenever a new subscription update adds a new activity to the
			database or when triggered on the admin page.

			Returns
			-------
			In memory TopoJSON file.
			"""
			# Create Postgres connection
			session = Session()
			# Query geom as GeoJSON and other attribute information
			query = session.query(sqlfunc.ST_AsGeoJSON(strava_activities_masked.geom, 5),
														strava_activities.name,
														strava_activities.actID,
														strava_activities.type,
														strava_activities.distance,
														strava_activities.private,
														strava_activities.calories,
														strava_activities.start_date,
														strava_activities.elapsed_time,
														strava_activities.moving_time,
														strava_activities.average_watts,
														strava_activities.start_date_local,
														strava_activities.total_elevation_gain,
														strava_activities.average_speed,
														strava_activities.max_speed,
														strava_activities.type_extended,
														strava_activities.has_heartrate,
														strava_activities.average_cadence,
														strava_activities.max_heartrate,
														strava_activities.average_heartrate,
														strava_gear.gear_name) \
					.join(strava_activities_masked.act_rel) \
					.join(strava_activities.gear_rel, isouter=True) \
					.order_by(strava_activities.start_date.desc())
			features = []
			for row in query:
					# Build a dictionary of the attribute information
					propDict = {"name": row.name, "actID": row.actID, "type": row.type, "distance": round(row.distance),
											"private": row.private, "calories": round(row.calories),
											"startDate": row.start_date_local.isoformat(),
											"elapsed_time": row.elapsed_time.seconds, "total_elevation_gain": round(row.total_elevation_gain),
											"average_speed": round(row.average_speed, 1), "max_speed": row.max_speed,
											"gear_name": row.gear_name,
											"type_extended": row.type_extended, "moving_time": row.moving_time.seconds,
											"average_watts": row.average_watts,"has_heartrate":row.has_heartrate,
											"average_cadence":row.average_cadence, "max_heartrate":row.max_heartrate,
											"average_heartrate":row.average_heartrate}
					# Take ST_AsGeoJSON() result and load as geojson object
					geojsonGeom = geojson.loads(row[0])
					# Build the feature and add to feature list
					features.append(Feature(geometry=MultiLineString(geojsonGeom), properties=propDict))
			session.close()
			# Build the feature collection result
			feature_collection = FeatureCollection(features)
			# Create local topoJSON file of geoJSON Feature Collection. Don't create a topology, doesn't matter for a polyline
			# and prequantize the data, this reduces file size at the cost of processing time.
			# prequantize 1e7 is used over default, 1e6, to avoid errors in which data were placed in the South Pacific Ocean
			return tp.Topology(feature_collection, topology=False, prequantize=10000000).to_json()
					

This script queries the masked activities as GeoJSON, loads and parses each record into a GeoJSON MultiLineString Feature, combines all records into a Geometry Collection, and finally creates TopoJSON file which is uploaded to an S3 Bucket using the upload function shown above.

The Topology function is very picky about incoming geometries and kept removing records without a explanation as to why, even though they passed PostGIS ST_MakeValid and ST_IsValid. All original, non-masked, GeoJSON records converted properly, I assume that ST_Difference caused geometries to break during conversion. The additional processing steps during masking, in particular ST_SnapToGrid, appeared to have resolved these issues. However, I assume they may need more fine tuning to ensure that no geometries fail to be converted to TopoJSON in the future.

Get New Activities - Strava API Webhook

Now that all my data have been processed and made available to the application, I need to keep the dataset up-to-date with newly added activities. To accomplish this I created a Strava webhook/Push subscription using stravalib. This enables my server to receive updates from the Strava API whenever I add a new activity, without needing to poll the API for changes. A update is sent whenever a new activity is added, an existing activity's title, type, or privacy is changed, or if the account revokes access to the application. As this is my own account, I do not handle requests to revoke application authorization. Also note that new activity updates include activity IDs only, its my server's responsibility to call the API for any further details.

While stravalib has functions dedicated to webhooks, they are minimally documented with no examples provided. Also, as of the time I started work on this project, the version of stravalib on PyPI, 0.10.2, did not support the newest version of the Strava API. Fortunately, the stravalib team has an updated version on their Github page which supports it.

Here is the conceptual process of creating a new webhook subscription:

The server issues a subscription creation POST request to the Strava API, containing the Client ID and Secret in addition to an optional user/server created verification token and a required callback URL configured to handle GET and POST requests. The verification token is used by the server to confirm that further setup requests are being sent by Strava. The creation request can be created in stravalib using the following (This is taken from an administration page I created and is called from a HTML POST request):


def addwebhooksub():
    """
    Adds a new Strava webhook subscription to the database and Strava API. Kicks off callback verification process.
    Called by Strava Activity admin page inputs.
    """
    # Get POST request info
    # athID = int(request.form['athID'])
    # callbackurl = str(request.form['callbackURL'])
    # Generate 14 character verify token string
    verifytoken = secrets.token_hex(7)
    # Insert token into database, will be updated if subID if successful, otherwise row will be deleted
    DBQueriesStrava.insertVerifyToken(verifytoken)
    # Get Strava API access credentials
    client = OAuthStrava.getAuth()
    try:
        # Send request to create webhook subscription, will be given the new subscription ID in response
        application.logger.debug(f"Callback url is {os.getenv('FULL_STRAVA_CALLBACK_URL')}")
        response = client.create_subscription(client_id=os.getenv("STRAVA_CLIENT_ID"),
                                              client_secret=os.getenv("STRAVA_CLIENT_SECRET"),
                                              callback_url=os.getenv('FULL_STRAVA_CALLBACK_URL'),
                                              verify_token=verifytoken)
        application.logger.debug(f"New sub id is {response.id}, updating database")
        # Update database with new sub id
        DBQueriesStrava.updateSubId(response.id, verifytoken)
        return Response(status=200)
    except Exception as e:
        DBQueriesStrava.deleteVerifyTokenRecord(verifytoken)
        return Response(status=400)
				

The above script kicks off the process of creating a new webhook subscription, using Client information, a pre-generated verification token, and a full callback URL address. After the subscription has been successfully created, the callback functions are shown further below, the stravalib "create_subscription" function will issue a GET request to the Strava API to get the ID of the newly created subscription. This ID is used to update the subscription entry in my database and is used to verify that webhook subscription updates are from Strava. The following updates the active subscription with the ID, and since an active webhook includes all athletes an application is authorized to access, this record's foreign key is applied to all athletes:


def updateSubId(subId, verifytoken):
    """
    Updates webhook subscriptions table with the new subscription id provided by Strava then updates all athletes
    with the new subID foreign key.
    @param subId: Int. Webhook subscription ID provided by Strava API
    @param verifytoken: String. Script generated verification token
    @return: Nothing. Database is updated
    """
    session = Session()
    try:
        # Update recently created record which only has the verify token populated
        session.query(webhook_subs.verify_token == verifytoken).update({webhook_subs.sub_id: subId,
                                                                        webhook_subs.activesub: "Yes"})
        session.commit()
        # Get the primary key from the new webhook subscription
        record = session.query(webhook_subs.verify_token == verifytoken).first()
        # Update all athletes with the new subscription entry foreign key
        session.query(athletes).update({athletes.sub_id: record.id})
        session.commit()
        session.close()
    except Exception as e:
        application.logger.debug(f"Update Strava athlete sub Id failed with the exception: {e}")
				

During initial setup a GET request is sent to the server's callback URL and after successful setup POST requests will be issued when specific account updates occur. The following code is used to handle Strava API webhook subscription GET and POST requests:


@stravaActDashAPI_BP.route(os.environ.get("STRAVA_CALLBACK_URL"), methods=['GET', 'POST'])
def subCallback():
    """
    Strava subscription callback URL.

    Returns
    -------
    GET request:
        JSON, echoed Strava challenge text.
    POST request:
        Success code if data are successfully added to Postgres/PostGIS. Strava must receive a 200 code in response to
        POST.
    """
    res = WebHookFunctionsStrava.handleSubCallback(request)
    if res:
        return res
    else:
        return Response(status=200)

def handleSubCallback(request):
    """
    Handles requests to Strava subscription callback URL.

    GET:
        Webhoook Subscription Creation Process:
            CallbackURL is sent a GET request containing a challenge code. This code is sent back to requester to verify
            the callback.

             The initial request to create a new webhook subscription is then provided with verification and
             the new subscription ID.
    POST:
        Webhook subscription update message. Sent when a activity on a subscribed account is created, updated, or deleted,
        or when a privacy related profile setting is changed.

        All update messages are inputted into Postgres.

        Currently, only activity creation events are handled, additional development is needed to handle other events.

    Returns
    -------
    GET request:
        JSON, echoed Strava challenge text.
    POST request:
        Success code if data are successfully added to Postgres/PostGIS. Strava must receive a 200 code in response to
        POST.
    """
    # Get application access credentials
    client = OAuthStrava.getAuth()
    # Check if request is a GET callback request, part of webhook subscription process
    if request.method == 'GET':
        # Extract challenge and verification tokens
        callBackContent = request.args.get("hub.challenge")
        callBackVerifyToken = request.args.get("hub.verify_token")
        # Form callback response as dict
        callBackResponse = {"hub.challenge": callBackContent}
        # Check if verification tokens match, i.e. if GET request is from Strava
        if DBQueriesStrava.checkVerificationToken(callBackVerifyToken):
            # Verification succeeded, return challenge code as dict
            # Using Flask Response API automatically converts it to JSON with HTTP 200 success code
            return callBackResponse
        else:
            # Verification failed, raise error
            raise ValueError('Strava token verification failed, no match found.')
    # POST request containing webhook subscription update message, new activity or other change to Strava account
    elif request.method == 'POST':
        try:
            # Convert JSON body to dict
            callbackContent = json.loads(request.data, strict=False)
            # Call function to handle update message and process new activity, if applicable
            handleSubUpdate(client, callbackContent)
        except Exception as e:
            application.logger.error(f"Strava subscription update failed with the error {e}")
				

Now that the subscription has been created and callbacks are handled, update messages can be processed. The following code processes the Strava subscription update messages by inserting them into Postgres then triggers a threaded function for activity processing, if applicable:


def handleSubUpdate(client, updateContent):
    """
    Handles Strava webhook subscription update. This function is called by a valid Strava POST request to the webhook
    subscription callback URL.

    Parameters
    ----------
    client. Stravalib model client object. Contains access token to strava API for the user.
    updateContent. Dict. POST request JSON data formatted by Flask as a dict.

    Returns
    -------
    Nothing. Data are inserted into Postgres/PostGIS.
    """

    # Parse update information into a model using stravalib
    update = client.handle_subscription_update(updateContent)
    # Verify that the athlete(s) and subscription ID contained in the message are in Postgres
    if DBQueriesStrava.checkAthleteAndSub(update.owner_id, update.subscription_id):
        application.logger.debug("Sub update from Strava appears valid")
        # Insert subscription update message details into Postgres
        DBQueriesStrava.insertSubUpdate(update)
        # Verify that the update is a activity creation event
        if update.aspect_type == "create" and update.object_type == "activity":
            application.logger.debug("This is a activity create event, creating thread to process activity")
            try:
                # Create a thread to handle async processing of the activity and its derivatives
                # Threading allows the activity to long process with a quick 200 code to be sent to the Strava API
                Thread(target=APIFunctionsStrava.singleActivityProcessing, args=(client, update.object_id)).start()
            except Exception as e:
                application.logger.error(f"Creating a thread to process new activity failed with in the error: {e}")
        elif update.aspect_type == "update" and update.object_type == "activity":
            application.logger.debug("This is a activity update event, updating existing record")
            # Update existing activity title
            DBQueriesStrava.updateExistingActivity(update)
        else:
            # Write logic to handle delete events
            application.logger.debug("Sub update message contains an delete event, skipping request")
            pass
    else:
        application.logger.debug("POST request is invalid, user ID or subscription ID don't match those in database!")
				

Insert subscription update details into Postgres:


def insertSubUpdate(content):
		"""

		Inserts Strava webhook subscription data into Postgres database. This information will be used to get full activity
		information from another query.

		Parameters
		----------
		content. Subscription Update object of Strava webhook update generated by Stravalib

		Returns
		-------
		Nothing. Updates database.
		"""
		# Verify is activity title is in update data, if not set to None. Some activities may have empty titles.
		if "title" in content.updates.keys():
				title = content.updates['title']
				application.logger.debug(f"Title of new activity is {title}")
		else:
				title = None
		session = Session()
		insert = sub_update(aspect=content.aspect_type, event_time=datetime.fromtimestamp(content.event_time.timestamp),
												object_id=content.object_id, object_type=content.object_type, owner_id=content.owner_id,
												subscription_id=content.subscription_id,
												update_title=title)
		session.add(insert)
		session.commit()
		session.close()
		application.logger.debug(f"New webhook update has been added to Postgres!")
					

The Strava API requires a success response within 2 seconds or else it will attempt 2 more requests before timing out. Since my process currently exceeds this time allowance I needed a way to process asynchronously. I did not want to spend the time setting up background processing and task queuing, instead I decided to go with a multithreaded approach, which allows the data processing to occur concurrently. While not truly asynchronous, this enables Flask to return a 200 success code while still working on the threaded process:


def singleActivityProcessing(client, actID):
  """
  Processes a single Strava Activity by placing the full activity in the database, making a simplified and masked public
  version, and by creating a privacy masked stream CSV which is added to a S3 Bucket. Finally a TopoJSON of the
  public activities is generated and uploaded to the S3 Bucket.

  @param client: stravalib client instance with valid access token
  @param actID: Int. ID of Strava Activity to be processed
  @return: Email. Message states if process was successful or failed
  """

  try:
      application.logger.debug("Getting full activity details")
      # Get all activity details for newly created activity, including stream data
      activity = getFullDetails(client, actID)
      application.logger.debug("Inserting activity details")
      # Insert original, non-masked, coordinates and attribute details into Postgres/PostGIS
      DBQueriesStrava.insertOriginalAct(activity['act'])
      # Calculate masked, publicly sharable, activities and insert into Postgres masked table
      application.logger.debug("Processing and inserting masked geometries")
      DBQueriesStrava.processActivitiesPublic(activity["act"]["actId"])
      # Handle CSV stream processing
      generateAndUploadCSVStream(client, actID, activity)
      # Create topojson file
      topoJSON = DBQueriesStrava.createStravaPublicActTopoJSON()
      # Upload topoJSON to AWS S3
      StravaAWSS3.uploadToS3(topoJSON)
      application.logger.debug("Strava activity has been processed!")
  except Exception as e:
      application.logger.error(f"Handling and inserting new webhook activity inside a thread failed with the error {e}")
      # Raise another exception, this will signal the route function to return an error 500
      raise()
				

Final Thoughts

Now a process flow is setup to automatically process new Strava activities to be consumed by a public facing dashboard using Leaflet to display geographic data. The most recent version of my dashboard is visible at the top of this page, and a full page dashboard is available here. I haven't had the motivation to finish the writeup for the client-side HTML/JavaScript for this project, however the GitHub Repo project folder can be found here.

This was a fun, challenging, and rewarding project to work on. I was able to get my first experience working with GeoAlchemy and PostGIS functions to manipulate spatial data. I also learned, through much trial and error, that spatial datasets need to be aggregated for some PostGIS functions to return desired results.