AWS Firehose, S3 and Pandas

We ingest a lot of of log messages via AWS Firehose into jsonl files on S3. A new file is created either every 5 minutes or when a threshold is reached. To make processing of this files easier they are stored in paritioned folders, for us this looks like: logger=name-of-logger/year=2023/month=05/day=14/.

We want to process these files either in Python scripts or in Jupyter notebooks without processing each of the json files every time. The first step to reduce S3 traffic is to write a parquet file for every day in the next morning. We run a CodePipeline for this but this could of course be done differently. Every json file is read and put into a pandas dataframe. Currently memory is enough to have all logs of a day in a single dataframe, but this is something that should be monitored. In the dataframe a few new fields are added for easier querying later. After this the whole dataframe is written to a file and later uploaded into a different folder structure to S3. The folder structure again contains the logger and the date, the same way to original log records are stored.

Example store command:

df.to_parquet(f"processed/logger=name-of-logger/year=2023/month=05/day=14/data.parquet.gzip", compression="gzip")

Additionally we calculate aggregates for some fields and store them in different files. For example number of users or languages filtered on conditions we want to plot later. This aggregated dataframe are stored again with the same folder structure, but with different filenames. These filenames can be filtered later by using path_suffix in read_parquet.

To use all this in a script or a notebook we used awswrangler.

Annotated example usage:

import awswrangler as wr
import pandas as pd
import datetime

# use previous day
date = datetime.date.today() - datetime.timedelta(days=1)

bucket = "the-name-of-the-bucket"
df = wr.s3.read_parquet(
   f"s3://{bucket}/processed/logger=name-of-logger/year={date.year}/month={date.month:02}/day={date.day:02}",
   path_suffix="data.parquet.gzip",
   dataset=True,
)

df_lang = df[df.language != ""]

# plot all languages by count at this day
x = df_lang.groupby(["language"])["language"].agg(count='count').sort_values("count", ascending=False)
x.reset_index().plot.bar(x="language", y="count");

Snippets for some date magic we use:

# example data
df = pd.DataFrame([[2023, 5, 17], [2023, 5, 22]], columns=["year", "month", "day"])

# add a date field based on year, month, day fields
df["date"] = pd.to_datetime([f'{y}-{m}-{d}' for y, m, d in zip(df.year, df.month, df.day)])

# add a date field for the current week (day is set to day=0 (sun) by using the week (%U))
df["week"] = pd.to_datetime(df.date.dt.strftime('%Y-%U') + "-0", format='%Y-%U-%w')

# result
#     year    month   day     date            week
# 0   2023    5       17      2023-05-17      2023-05-14
# 1   2023    5       22      2023-05-22      2023-05-21

Draw and Save a Polygon on a map

Drawing a Polygon and Saving it to GeoJSON is the next item on my exploration list.

First I tried a few Leaflet drawing plugins:

Some things I learned by fiddling around with Leaflet and Leaflet Geoman:

Add custom polygon buttons

Add a button to the draw toolbar that draws green polygons:

map.pm.Toolbar.copyDrawControl('Polygon', {
name: 'DrawEntry',
  block: 'draw',
  title: 'Draw Entry Polygon',
});
map.pm.Draw.DrawEntry.setPathOptions({ color: 'green' });

The icon is still a polygon icon. To change this I need a svg for a new icon.

Save a drawn polygon to GeoJSON

I want to save this later into a database and may need a bit more meta information, i.e. color of the polygon. But this here is enough for today:

JSON.stringify(L.PM.Utils.findLayers(map)[0].toGeoJSON())

Load a polygon to the map

Add two exported polygons to the map, one as entry (green) and one as exit (violet).

var example = [
  // entry
  {"type":"Feature","properties":{"type": "entry"},"geometry":{"type":"Polygon","coordinates":[[[9.003386,48.683486],[9.003185,48.68335],[9.003226,48.683275],[9.003769,48.683492],[9.003679,48.68359],[9.003386,48.683486]]]}},
  // exit
  {"type":"Feature","properties":{"type": "exit"},"geometry":{"type":"Polygon","coordinates":[[[9.002798,48.683225],[9.002868,48.683157],[9.002577,48.683058],[9.002519,48.683127],[9.002798,48.683225]]]}}
]
L.geoJSON(example, {
  style: function(feature) {
    if (feature.properties){
      switch (feature.properties.type) {
      case 'entry': return {color: "green"};
      case 'exit':   return {color: "violet"};
}}}}).addTo(map);

The style function changes the color based on the "type" property in the GeoJSON. If the "type" is empty or not "entry" or "exit" the color is the default which is "blue".

Full Example

I used stadiamaps to have a higher zoom level (level 20) than the default openstreetmap map.

Only a screenshot for now:

screenshot

The two polygon icons are the same, but the first one draws entry colored polygons and the second one exit colored ones.

Datasette and raster Tiles

Using simple png based tiles seems like the best choice for my planned pet project. But all the vector tiles to raster tiles posts and Github repositories are either years old or commercial. After fiddling around with some of the code out there I gave up and started using stadiamaps rastered tiles.

But now I wanted to use them in Datasette. First as a preview in the table view by using datasette-leaflet-geojson. To get a preview map in the table view the dataset needs to have a field that looks like geojson.

Generate a dataset using sqlite-utils with a valid geojson field:

echo '{"name": "Vaihingen / Hauptstrasse", "map": {"type": "Point", "coordinates": [9.1116853, 48.7297610]}}' | \
sqlite-utils insert demo.db cycle-rental -

Opened in Datasette table view this looks like:

osm

I changed the tile server to stadiamaps because I don't want to use Openstreetmap tiles without a license later.

The same dataset with stadiamaps:

stadiamaps

If the 200k credits are not enough later I will think about a different solution.