AWS Firehose, S3 and Pandas

We ingest a lot of of log messages via AWS Firehose into jsonl files on S3. A new file is created either every 5 minutes or when a threshold is reached. To make processing of this files easier they are stored in paritioned folders, for us this looks like: logger=name-of-logger/year=2023/month=05/day=14/.

We want to process these files either in Python scripts or in Jupyter notebooks without processing each of the json files every time. The first step to reduce S3 traffic is to write a parquet file for every day in the next morning. We run a CodePipeline for this but this could of course be done differently. Every json file is read and put into a pandas dataframe. Currently memory is enough to have all logs of a day in a single dataframe, but this is something that should be monitored. In the dataframe a few new fields are added for easier querying later. After this the whole dataframe is written to a file and later uploaded into a different folder structure to S3. The folder structure again contains the logger and the date, the same way to original log records are stored.

Example store command:

df.to_parquet(f"processed/logger=name-of-logger/year=2023/month=05/day=14/data.parquet.gzip", compression="gzip")

Additionally we calculate aggregates for some fields and store them in different files. For example number of users or languages filtered on conditions we want to plot later. This aggregated dataframe are stored again with the same folder structure, but with different filenames. These filenames can be filtered later by using path_suffix in read_parquet.

To use all this in a script or a notebook we used awswrangler.

Annotated example usage:

import awswrangler as wr
import pandas as pd
import datetime

# use previous day
date = datetime.date.today() - datetime.timedelta(days=1)

bucket = "the-name-of-the-bucket"
df = wr.s3.read_parquet(
   f"s3://{bucket}/processed/logger=name-of-logger/year={date.year}/month={date.month:02}/day={date.day:02}",
   path_suffix="data.parquet.gzip",
   dataset=True,
)

df_lang = df[df.language != ""]

# plot all languages by count at this day
x = df_lang.groupby(["language"])["language"].agg(count='count').sort_values("count", ascending=False)
x.reset_index().plot.bar(x="language", y="count");

Snippets for some date magic we use:

# example data
df = pd.DataFrame([[2023, 5, 17], [2023, 5, 22]], columns=["year", "month", "day"])

# add a date field based on year, month, day fields
df["date"] = pd.to_datetime([f'{y}-{m}-{d}' for y, m, d in zip(df.year, df.month, df.day)])

# add a date field for the current week (day is set to day=0 (sun) by using the week (%U))
df["week"] = pd.to_datetime(df.date.dt.strftime('%Y-%U') + "-0", format='%Y-%U-%w')

# result
#     year    month   day     date            week
# 0   2023    5       17      2023-05-17      2023-05-14
# 1   2023    5       22      2023-05-22      2023-05-21