Python Save File in a Thread
A friend asked me how to run compute while it is saving data to a file in the background on a Raspberry PI. The whole program will not be async, so the simplest way is to use old school threading.
The first version will use the threading Python module. The data is an image downloaded from picsum.photos but saved in a thread without blocking. Some logging is added to show execution order and timings.
import logging import threading import time from pathlib import Path import requests logger = logging.getLogger(__name__) def get_random_image(): r = requests.get("https://picsum.photos/1000/1000") return r.content def save_data(name, data): logger.info("start save") with Path(name).open("wb") as fp: fp.write(data) logger.info("end save") def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") x = threading.Thread(target=save_data, args=("image.jpg", get_random_image())) x.start() logger.info("start compute") # add some real compute here instead time.sleep(1) logger.info("end compute") # actually not needed; the program will wait until the thread finishes x.join() if __name__ == "__main__": main()
When run this is printed:
2024-04-16 22:19:10,489 start save 2024-04-16 22:19:10,489 start compute 2024-04-16 22:19:10,490 end save 2024-04-16 22:19:11,490 end compute
Compute ends last, because the sleep of 1 second is actually slower than saving a file to an SSD on my notebook.
The second version is using ThreadPoolExecutor
from the concurrent.futures module.
The ThreadPoolExecutor will execute code in a pool of threads the same as the previous example but with a newer API.
The same example as before, but using concurrent.futures.ThreadPoolExecutor
:
import logging import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path import requests logger = logging.getLogger(__name__) def get_random_image(): r = requests.get("https://picsum.photos/1000/1000") return r.content def save_data(name, data): logger.info("start save") with Path(name).open("wb") as fp: fp.write(data) logger.info("end save") def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") # start 1 threadpool executor x = ThreadPoolExecutor(1) x.submit(save_data, "image.jpg", get_random_image()) logger.info("start compute") # add some real compute here instead time.sleep(1) logger.info("end compute") # remove the executor savely; wait=True is the default x.shutdown() if __name__ == "__main__": main()
Running this code returned for me:
2024-04-16 22:38:25,391 start save 2024-04-16 22:38:25,391 start compute 2024-04-16 22:38:25,392 end save 2024-04-16 22:38:26,392 end compute
The final option is to replace ThreadPoolExecutor
with ProcessPoolExecutor
.
The API works the same, but the ProcessPoolExecutor
is using the multiprocessing module which will start extra processes.
Using extra processes helps with the Global Interpreter Lock, but all data moved between them has to be serializable.
For us this is a binary stream of content (the image) and a filename, so no issue here.
But more complex data structures may need some additional serialization to be moved around.
Using a ProcessPoolExecutor
will return nearly the same results as the previous versions:
2024-04-16 22:44:24,757 start compute 2024-04-16 22:44:24,759 start save 2024-04-16 22:44:24,759 end save 2024-04-16 22:44:25,757 end compute
I would probably use the ThreadPoolExecutor
while developing my program and replace it later with ProcessPoolExecutor
if it is actually faster without breaking anything.