newline

Table of Contents

  1. Part 1: running the function with a timeout
  2. Part 2: a timeout decorator

Creating a function timeout decorator in Python

Guide, Programming, Python

May 02, 2024

I had some Python code that did some processing and called external programs, which could run for a very long time and potentially not terminate. I wanted a simple solution to run some Python function with a timeout, and if it completes within the time limit, to get back its result. It would be ideal to only have to decorate a function with @timeout(number_of_seconds), and have the rest done automatically. Read on for my solution to this, and a discussion of the problems I ran into. Python 3 btw.

First, let’s plan out what we want to do. The code in question looks something like this:

import sys
import time

def might_get_stuck(arg):
    # some code that might not terminate.
    # here, simulated by a long sleep
    time.sleep(600)

    # when computation completes, return a result:
    return arg

if __name__ == '__main__':
    arg = sys.argv[1]
    # and we get stuck when we call the function:
    result = might_get_stuck(arg)
    print(result)

The function might_get_stuck could potentially run for a very long (or infinite) amount of time, i.e. it might not terminate. So, we want to run that function for some amount of time, and if it exceeds that time, we want it to stop running and continue at the next statement. We also want to detect whether that function timed out, or completed successfully; if the latter, we want to get its return value.

Part 1: running the function with a timeout

There are a few different ways we can run a function with a timeout in Python; I decided to use the multiprocessing module, which is part of the standard library. Starting the function in a Process lets us give it a timeout; after that, we can tell it to terminate, and if it still continues running, we can kill the process:

import sys
import time
import multiprocessing

# The same as before
def might_get_stuck(arg):
    # some code that might not terminate.
    # here, simulated by a long sleep
    time.sleep(600)

    # when computation completes, return a result:
    return arg

if __name__ == '__main__':
    arg = sys.argv[1]

    # Now, we start the function as a process
    proc = multiprocessing.Process(target=might_get_stuck, args=(arg,))
    proc.start()

    # Let it run, and see if it finishes within 2 seconds
    proc.join(timeout=2)

    # If it's still running
    if proc.is_alive():
        # Ask it nicely to stop
        proc.terminate()
        # If it refuses for 5 seconds
        proc.join(timeout=5)
        if proc.is_alive():
            # Ask it less nicely to stop
            proc.kill()
            proc.join()

    # Decide what to do based on the exit code
    if proc.exitcode == 0:
        # print result? how do we get it?
        pass
    else:
        print('terminated early')

The problem is that p.join() always returns None, so we have no way of obtaining the potential return value of might_get_stuck(arg). For this, we have to add a pipe, which lets the subprocess send data to the controlling process:

import sys
import time
import multiprocessing, multiprocessing.connection
from typing import Optional


# This time, we give function a connection,
# through which it can send its return value.
def might_get_stuck(arg, snd: Optional[multiprocessing.connection.Connection] = None):
    # some code that might not terminate.
    # here, simulated by a long sleep
    time.sleep(600)

    # when computation completes, return a result:
    if snd is not None:
        # send the result through the connection, and close it
        snd.send(arg)
        snd.close()

    # Keep the return, so we can also use the function in a non-timeout manner
    # (without providing the connection, `snd`)
    return arg

if __name__ == '__main__':
    arg = sys.argv[1]

    # Open a pipe
    rcv, snd = multiprocessing.Pipe()

    # And pass the sending end to the process that'll run the function
    proc = multiprocessing.Process(target=might_get_stuck, args=(arg,), kwargs={'snd': snd})

    # Same as before: start, wait, try to kill it
    proc.start()
    proc.join(timeout=2)

    if proc.is_alive():
        proc.terminate()
        proc.join(timeout=5)
        if proc.is_alive():
            proc.kill()
            proc.join()

    if proc.exitcode == 0:
        # Now if the process terminated without timing out,
        # we can read its return value from the pipe:
        result = rcv.recv()
        rcv.close()
        print(result)
    else:
        # process was terminated or failed in some other way
        print('terminated early')

We made snd an optional parameter, so the function can also be called without a pipe.

Now we know if the process’ exit code is 0 (it exited fine), there’s data in the pipe we can read. Otherwise, it terminated early.

So, now we’ve basically achieved the goal: we can run a function with a timeout, and either we get its return value, or we handle its early termination. However, it’s very verbose: we have to create a process, a pipe, pass the right arguments, etc. It would be much nicer if we could just add a decorator to any function, e.g. @timeout(2), and have everything set up automatically. Well, how convenient – Python has decorators.

Part 2: a timeout decorator

If you’re new to decorators, I wrote a post about them an at earlier point. A decorator is a function that returns a decorated (wrapped) function. A decorator with an argument requires a function, which returns a parametrized decorator (wrapper) function, which returns a decorated (wrapped) function.

A quick recap of decorators A decorator lets you add code to run before and/or after a function.

Let’s say you have a function to_decorate:

def to_decorate():
    print('Inside to_decorate')

You could decorate a function manually like this:

def decorator(func):
    def runner():
        print("Decorated!")
        func()
    return runner

def to_decorate():
    print('Inside to_decorate')

# Note here: we need two function calls
decorator(to_decorate)()

Or you could use the decorator symbol (@):

def decorator(func):
    def runner():
        print("Decorated!")
        func()
    return runner

@decorator
def to_decorate():
    print('Inside to_decorate')

# And here we only need one function call
to_decorate()

The result is the same, but with a decorator annotation you simplify the function call.

Especially when it comes to decorators with arguments. Compare this:

def decorator(arg):
    def parametrize(func):
        def runner():
            print(f"Decorated with {arg}!")
            func()
        return runner
    return parametrize

def to_decorate():
    print('Inside to_decorate')

# 3 function calls, and we need to pass `42` every time.
decorator(42)(to_decorate)()

With the version using the annotation:

def decorator(arg):
    def parametrize(func):
        def runner():
            print(f"Decorated with {arg}!")
            func()
        return runner
    return parametrize

@decorator(42)
def to_decorate():
    print('Inside to_decorate')

# Only 1 function call without any parameters
to_decorate()

So for our timeout decorator, we need 3 levels of defs. Here’s a first attempt (which is broken, and I’ll explain why):

import sys
import time
import multiprocessing, multiprocessing.connection
from typing import Optional, Union, Callable, Any
import functools

# This is a union of what our possibly-timed-out function returns,
# and an int for when we need to return the exit code.
# (in practice, don't use `Any`, but your actual return type)
TimeoutReturnType = Union[int, Any]

# The timeout takes a parameter for the number of seconds.
# It's a function (Callable) that takes a function (Callable) and returns a function (Callable).
# Remember: it's a wrapper.
def timeout(n: float) -> Callable[[Callable[..., TimeoutReturnType]], Callable[..., TimeoutReturnType]]:
    # The first inner function takes the possibly-running-forever function, and returns a function.
    # Remember: it's also a wrapper.
    def decorate(nonterminating_func: Callable[..., TimeoutReturnType]) -> Callable[..., TimeoutReturnType]:
        # `functools.wraps` helps with preserving some metadata about the wrapped function
        @functools.wraps(nonterminating_func)
        def wrapper(*args, **kwargs) -> TimeoutReturnType:
            # Same as we saw before
            rcv, snd = multiprocessing.Pipe()
            proc = multiprocessing.Process(target=nonterminating_func, args=(*args,), kwargs={'snd': snd, **kwargs})
            proc.start()

            proc.join(timeout=n)

            if proc.is_alive():
                proc.terminate()
                proc.join(timeout=10)
                if proc.is_alive():
                    proc.kill()
                    proc.join()

            # Assertion needed for typechecking
            assert proc.exitcode is not None, "process should be terminated at this point"

            if proc.exitcode == 0:
                result = rcv.recv()
                rcv.close()
                return result

            else:
                return proc.exitcode
        return wrapper
    return decorate


# Let's use it.
# We want this function to time out in 2 seconds
@timeout(2)
def might_get_stuck(arg, snd: Optional[multiprocessing.connection.Connection] = None):
    # Same as before
    time.sleep(600)

    if snd is not None:
        snd.send(arg)
        snd.close()
    return arg


if __name__ == '__main__':
    arg = sys.argv[1]

    # We can just call it directly, and all the multiprocessing/piping
    # will be handled by the decorator.
    result: TimeoutReturnType = might_get_stuck(arg)

    # Result is an int (in our case) if it's an exit code.
    if isinstance(result, int):
        print('terminated early')
    else:
        print(result)

First, for type hinting, we define a return type TimeoutReturnType for a function that can have a timeout: it either returns an int (the exit code in case of an error) or Any (in practice, the return type of whatever function we call). Otherwise, it’s basically the standard format for decorators, except we need to add an assertion that proc.exitcode is not None to satisfy the type checker: it can be None if the process proc hasn’t terminated yet, but we know at the point of the assertion it will have terminated, either nicely or after some convincing. We use functools.wraps to handle preservation of function name, arguments, docstring, etc.

The problem is, this won’t work. The error message is sort of cryptic: _pickle.PicklingError: Can't pickle <function might_get_stuck at 0x10296e440>: it's not the same object as __main__.might_get_stuck. You see, as part of starting a process, multiprocessing does a Popen. And somewhere in there, the function that is about to be spawned as a new process gets pickled.

Here’s a list of things that can be pickled. And there it is, black on white (or white on black if you’re a leet programmer with a dark theme):

  • “functions (built-in and user-defined) accessible from the top level of a module”.

Functions can only be pickled if they’re at the top level; in our case, since it’s a decorator, we are actually returning the innermost function, which can’t be pickled. So, we need to use a small hack to define it at the top-level: store the original (undecorated) function in a dictionary, and then run it via that dictionary, through a top-level wrapper function that can be pickled. Yes, it’s a bit ugly and may have some edge cases, but it’s abstracted away by the decorator and I couldn’t come up with a better way to handle this.

import sys
import time
import multiprocessing, multiprocessing.connection
from typing import Optional, Union, Callable, Any
import functools

# Same as before
TimeoutReturnType = Union[int, Any]

# Stores the original, undecorated functions
_original_functions = {}

# Given a function name, runs the corresponding undecorated function
def _timeout_func_runner(name: str, *args, **kwargs) -> Any:
    return _original_functions[name](*args, **kwargs)

# Mostly the same as before...
def timeout(n: float) -> Callable[[Callable[..., TimeoutReturnType]], Callable[..., TimeoutReturnType]]:
    def decorate(nonterminating_func: Callable[..., TimeoutReturnType]) -> Callable[..., TimeoutReturnType]:
        # ...except for this: store the original function, using its name as a key
        _original_functions[nonterminating_func.__name__] = nonterminating_func

        @functools.wraps(nonterminating_func)
        def wrapper(*args, **kwargs) -> TimeoutReturnType:
            rcv, snd = multiprocessing.Pipe()

            # ...and this: instead of calling the nonterminating function directly,
            # call it via the dictionary.
            proc = multiprocessing.Process(target=_timeout_func_runner, args=(nonterminating_func.__name__, *args,), kwargs={'snd': snd, **kwargs})
            proc.start()

            proc.join(timeout=n)

            if proc.is_alive():
                proc.terminate()
                proc.join(timeout=10)
                if proc.is_alive():
                    proc.kill()
                    proc.join()

            assert proc.exitcode is not None, "process should be terminated at this point"

            if proc.exitcode == 0:
                result = rcv.recv()
                rcv.close()
                return result

            else:
                return proc.exitcode
        return wrapper
    return decorate

# Our calling code does not change at all:
@timeout(2)
def might_get_stuck(arg, snd: Optional[multiprocessing.connection.Connection] = None):
    # some code that might not terminate.
    # here, simulated by a long sleep
    time.sleep(600)

    # when computation completes, return a result:
    if snd is not None:
        snd.send(arg)
        snd.close()
    return arg

if __name__ == '__main__':
    arg = sys.argv[1]
    result: TimeoutReturnType = might_get_stuck(arg)
    if isinstance(result, int):
        print('terminated early')
    else:
        print(result)

And now it works!