Push to Redis via VPN

In the previous blog post I described how I run a Redis on a Raspberry Pi Zero 1W in my local network. Now I want to push messages from the Internet to this Redis.

For simplicity I will use Flask on Fly.io and the Flask code will be very similar to the one that saves to S3 I posted a few weeks ago. The Flask service on Fly.io is connected via Wireguard to the Pi Zero in my local network.

As first step we need to connect the app to my local Redis. I followed the instructions in the fly.io docs on bridge-deployments. The blueprint shows howto create a wireguard config on your notebook and then run this config on the system you want to connect to the Fly.io instances.

My walkthrough:

# this assumes the app is already launched and deployed!
# create the wireguard configuration
fly wireguard create personal ams zero5 fly0.conf
# copy to my Pi Zero
scp fly0.conf pi@zero5.local:.
# login there via ssh
ssh pi@zero5.local
# install wireguard
sudo apt -y install wireguard
# copy the conf to the correct place
sudo cp fly0.conf /etc/wireguard/fly0.conf
# and start the vpn
sudo systemctl enable --now wg-quick@fly0.service
# ping the app we connected to
ping _api.internal
# leave the Pi Zero
logout

# optional: verify from the other side
fly ssh console
# first we need ping
apt install -y iputils-ping
# now ping the Pi Zero
ping zero5._peer.internal
# leave the fly machine
logout

The Code for the Flask app using RQ as message queue.

import json
import os

from flask import Flask, request
from redis import Redis
from rq import Connection, Queue
from werkzeug.middleware.proxy_fix import ProxyFix

from .utils import check_signature

app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)

def job(dataset, uid=None):
    _host = os.environ.get("REDIS_HOST")
    _password = os.environ.get("REDIS_PASSWORD")
    with Connection(Redis(_host, 6379, password=_password)):
        # keep jobs for 24h
        q = Queue("default", default_timeout=3600 * 24)
        q.enqueue(
            "work.process_webhook",
            kwargs={
                "_meta": {"uid": uid if uid else dataset.get("uid")},
                "_data": dataset,
            },
            result_ttl=0,  # no return value
        )

@app.route("/", methods=["GET", "POST"])
async def index():
    if request.method == "GET":
        return "nothing to see here"
    else:
        uid = request.args.get("uid")
        data = request.data
        signature = request.headers.get("X-MYAX-SIGNATURE")
        secret = os.environ.get("AX_WEBHOOK_SECRET")
        if check_signature(signature, data, secret):
            print("signature valid", flush=True)
            dataset = json.loads(data)
            if "id" in dataset:
                dataset["document_id"] = dataset.pop("id")
            job(dataset, uid)

    return "OK"

The request is received and send as a message into the queue with as little code as needed. For Redis two secrets need to be set via flyctl: REDIS_HOST and REDIS_PASSWORD. For my example the REDIS_HOST is zero5._peer.internal. Additionally the AX_WEBHOOK_SECRET similar to the S3 version to verify the hmac signature. The check_signature part is the one from the S3 version too.

The worker side in my local network is processing the jobs from the Redis queue. For this demo the job will write the message to disk.

We enqueued the job with the name work.process_webhook, so we need a work.py with a function process_webhook. For example:

import json

def process_webhook(**kwargs):
    json.dump(kwargs["_data"], open(kwargs["_meta"]["uid"] + ".json", "w"))

This saves everything in _data into a json file. To run the worker that fetches the jobs and runs the process_webhook we call it like this:

rq worker default --url redis://:REDIS_PASSWORD@REDIS_HOST/

The work.py has to be in the same folder. I used the same Redis password here, but as Redis host the ip-address in my local network. The : before the password is important. This marks the empty username in the url.

In my tests a message takes a few seconds, but I didn't loose any job yet. The 24h until a message is dropped should be enough for everything I plan here.