Leveraging Python and SQL for Real-Time Analytics During MLB Games
Written on
Chapter 1: Introduction to Project Development
Are you currently on the lookout for a job? Stand out by creating a personal project with the help of my complimentary 5-page project ideation guide.
In the last three years, Major League Baseball (MLB) has made remarkable changes to enhance the pace of its games. With the average duration of games exceeding three hours, the MLB implemented a pitch clock, similar to the NFL's 40-second game timer. This adjustment has successfully decreased the average game time to 2.5 hours.
Coincidentally, this timeframe also aligned with my efforts to develop a project that automated reporting for Learning SQL. As a co-editor for a publication dedicated to nurturing the next wave of SQL developers, it felt somewhat disingenuous that we lacked a thorough and dependable source for data-driven reporting.
As I detailed in part I, I utilized the Unofficial Medium API to outline and ingest the necessary data for various metrics to evaluate our content's performance effectively.
Section 1.1: Project Architecture
While part I focused on the broader conceptualization and architecture, this section will delve deeper into the code, simplifying the perceived complexities associated with constructing a professional-grade analytics dashboard.
The components of my data pipeline include:
- Over 200 lines of Python code
- 4 API endpoints
- 37 lines of SQL
- 4 BigQuery tables
- 2 BigQuery views
These elements come together to produce a report that typically spans 3 to 4 pages, depending on the level of detail I choose to include in my insights.
Here, I will outline the key functions that drive the script's primary operations, particularly the functions handling the main GET requests.
For clarity, I categorize my request functions based on the type of data being retrieved, such as get_articles and get_writers, as well as by the number of requests made.
Here’s a glimpse of what a single request function looks like, demonstrating one of the simplest ETL functions you can craft, especially since accessing the data does not necessitate any additional keys.
import requests
import logging
def make_single_request(url: str):
logging.info(f"Making request for {url}...")
req = requests.get(url, headers=token)
data = req.json()
return data
I can efficiently retrieve the latest article ID through a single request, as article IDs remain static. As mentioned in part I, I maintain a table with these IDs within my BigQuery project.
import config as cfg
import pandas as pd
articles = make_single_request(cfg.article_url)
latest_article_id = articles["publication_articles"][0]
latest_article_id_df = pd.DataFrame()
latest_article_id_df = latest_article_id_df.append({"id": latest_article_id, "dt_updated": pd.Timestamp.utcnow()}, ignore_index=True)
latest_article_id_df = latest_article_id_df[["id", "dt_updated"]]
While I do use Pandas' append method here and throughout my script, it's worth noting that this might not be the best practice in production scenarios.
To facilitate multiple IDs, whether article or writer IDs, for the ConvertKit API endpoint, I constructed an iterative request.
The processes for articles and writers are quite similar, but I will illustrate the writer's process since it involves a bit more complexity.
Given that a writer may publish multiple times (which we actively encourage), there are instances where a writer ID could appear more than once within the article data. To avoid duplicate entries, I ensure that I'm only retrieving unique writers each time this function executes.
import pandas as pd
def get_writers(lsql_dat: pd.DataFrame):
writers = lsql_dat["author"].to_list()
distinct_writers = list(set(writers))
From this list, I can loop through only unique writer IDs:
for w in distinct_writers:
logging.info(f"Making request for writer {w}...")
writer_req = requests.get(writer_url, headers=token)
logging.info(f"Request status: {writer_req.status_code}")
writer_data = writer_req.json()
To transform this into a useful format, I append it to an empty DataFrame:
writer_df = pd.DataFrame()
for w in distinct_writers:
logging.info(f"Making request for writer {w}...")
writer_req = requests.get(writer_url, headers=token)
logging.info(f"Request status: {writer_req.status_code}")
writer_data = writer_req.json()
_id = writer_data["id"]
username = writer_data["username"]
fullname = writer_data["fullname"]
bio = writer_data["bio"]
followers = writer_data["followers_count"]
writer_df = writer_df.append({
"id": _id,
"user_name": username,
"full_name": fullname,
"bio": bio,
"followers": followers
}, ignore_index=True)
writer_df["dt_updated"] = pd.Timestamp.utcnow()
And wrap it in a concise function:
def get_writers(lsql_dat: pd.DataFrame):
writers = lsql_dat["author"].to_list()
distinct_writers = list(set(writers))
writer_df = pd.DataFrame()
for w in distinct_writers:
logging.info(f"Making request for writer {w}...")
writer_req = requests.get(writer_url, headers=token)
logging.info(f"Request status: {writer_req.status_code}")
writer_data = writer_req.json()
_id = writer_data["id"]
username = writer_data["username"]
fullname = writer_data["fullname"]
bio = writer_data["bio"]
followers = writer_data["followers_count"]
writer_df = writer_df.append({
"id": _id,
"user_name": username,
"full_name": fullname,
"bio": bio,
"followers": followers
}, ignore_index=True)
writer_df["dt_updated"] = pd.Timestamp.utcnow()
return writer_df
This function can then be referenced in the primary function:
def learning_sql(event, context):
# Get writers
writer_df = get_writers(article_df)
# Upload to BQ
if len(writer_df) > 0:
logging.info(f"Uploading to {cfg.dataset}.{cfg.writer_table}...")
upload_to_bq(writer_df, cfg.dataset, cfg.writer_table, "WRITE_APPEND", cfg.writer_schema)
logging.info(f"{cfg.writer_table} updated. All tables updated for {cfg.today}")
else:
logging.info("No data returned. Please retry...")
if __name__ == "__main__":
learning_sql("", "")
Now that the Python part is completed, it’s time to contemplate how we will structure and present the data in SQL.
If you read part I, you’ll recognize the final query that acts as my view definition:
SELECT
wi.full_name AS writer,
author AS writer_id,
claps,
ai.id AS article_id,
last_modified_at AS last_modified,
publication_id AS pub_id,
published_at AS published_at,
ROUND(reading_time, 2) AS reading_time,
responses_count,
title,
url,
voters AS clappers,
word_count,
wi.followers AS followers,
DATE(ai.dt_updated) AS dt_updated
FROM
ornate-reef-332816.learning_sql.article_info ai
LEFT JOIN
ornate-reef-332816.learning_sql.writer_info wi
ON
ai.author = wi.id
WHERE DATE(ai.dt_updated) = CURRENT_DATE()
ORDER BY voters DESC
Despite the extensive Python work and numerous API requests needed to build the underlying tables, the SQL side of things remains relatively straightforward.
The primary transformations here relate to naming conventions; since I intend to import this into a dashboard, I can always hide the original field names if necessary. However, creating a polished final product with clear field names was a significant objective from the outset. This clarity aids in writing additional queries or views.
The JOIN in this query links my article table with my writer table based on a shared writer ID, which is why only unique writer IDs are crucial.
Finally, I’ll incorporate a date filter to reflect the most recent data. To keep it simple, I use a DATE() function to convert my timestamp into a more readable format for filtering.
To maintain a continuous count of followers, I have a separate view:
SELECT followers FROM ornate-reef-332816.learning_sql.follower_count
WHERE DATE(dt_updated) = CURRENT_DATE()
While I could have simply applied a MAX() function in Looker, I prefer to perform my transformations "under the hood" rather than in the UI.
With the coding phase wrapped up, the final step is to automate the process.
I must schedule:
- The cloud function (using cron for this) that updates all tables.
- The Looker dashboard (distributed via email weekly).
After nine innings, the end product emerges.
Note: Except for follower counts, which are public information, I've obscured some figures to safeguard our data and writer identities.
The final outcome—accessible, automated reporting—makes all the effort worthwhile. One of the aspects I find fascinating about data engineering is that, in addition to creating data sources, the essence of the role lies in discovering and developing intuitive, automated solutions.
While I enjoy the challenge of assembling and organizing data in my daily tasks, the ability to compile data in a meaningful way refines a core skill: Extracting value from data.
The broader lesson from this project is that when faced with a daunting task, the best approach is to "time box" your work by allocating specific time to complete it and powering through.
In my case, I dedicated nearly three hours straight to this project. However, I’ll admit I did take a brief pause for a seventh-inning stretch.
Lastly, I’d appreciate your feedback. Please take a moment to complete a 3-question survey on how I can assist you beyond this blog. All responses will receive a free gift.
Chapter 2: Automating Data Reporting
This video tutorial covers the fundamentals of Python for beginners, providing a comprehensive overview in just one hour.
In this video, we explore the basics of variables in Python, perfect for absolute beginners.