May 02, 2024
I had some Python code that did some processing and called external programs, which could run for a very long time and potentially not terminate.
I wanted a simple solution to run some Python function with a timeout, and if it completes within the time limit, to get back its result.
It would be ideal to only have to decorate a function with @timeout(number_of_seconds)
, and have the rest done automatically.
Read on for my solution to this, and a discussion of the problems I ran into.
Python 3 btw.
First, let’s plan out what we want to do. The code in question looks something like this:
import sys
import time
def might_get_stuck(arg):
# some code that might not terminate.
# here, simulated by a long sleep
time.sleep(600)
# when computation completes, return a result:
return arg
if __name__ == '__main__':
arg = sys.argv[1]
# and we get stuck when we call the function:
result = might_get_stuck(arg)
print(result)
The function might_get_stuck
could potentially run for a very long (or infinite) amount of time, i.e. it might not terminate.
So, we want to run that function for some amount of time, and if it exceeds that time, we want it to stop running and continue at the next statement.
We also want to detect whether that function timed out, or completed successfully; if the latter, we want to get its return value.
There are a few different ways we can run a function with a timeout in Python; I decided to use the multiprocessing
module, which is part of the standard library.
Starting the function in a Process lets us give it a timeout; after that, we can tell it to terminate, and if it still continues running, we can kill the process:
import sys
import time
import multiprocessing
# The same as before
def might_get_stuck(arg):
# some code that might not terminate.
# here, simulated by a long sleep
time.sleep(600)
# when computation completes, return a result:
return arg
if __name__ == '__main__':
arg = sys.argv[1]
# Now, we start the function as a process
proc = multiprocessing.Process(target=might_get_stuck, args=(arg,))
proc.start()
# Let it run, and see if it finishes within 2 seconds
proc.join(timeout=2)
# If it's still running
if proc.is_alive():
# Ask it nicely to stop
proc.terminate()
# If it refuses for 5 seconds
proc.join(timeout=5)
if proc.is_alive():
# Ask it less nicely to stop
proc.kill()
proc.join()
# Decide what to do based on the exit code
if proc.exitcode == 0:
# print result? how do we get it?
pass
else:
print('terminated early')
The problem is that p.join()
always returns None, so we have no way of obtaining the potential return value of might_get_stuck(arg)
.
For this, we have to add a pipe, which lets the subprocess send data to the controlling process:
import sys
import time
import multiprocessing, multiprocessing.connection
from typing import Optional
# This time, we give function a connection,
# through which it can send its return value.
def might_get_stuck(arg, snd: Optional[multiprocessing.connection.Connection] = None):
# some code that might not terminate.
# here, simulated by a long sleep
time.sleep(600)
# when computation completes, return a result:
if snd is not None:
# send the result through the connection, and close it
snd.send(arg)
snd.close()
# Keep the return, so we can also use the function in a non-timeout manner
# (without providing the connection, `snd`)
return arg
if __name__ == '__main__':
arg = sys.argv[1]
# Open a pipe
rcv, snd = multiprocessing.Pipe()
# And pass the sending end to the process that'll run the function
proc = multiprocessing.Process(target=might_get_stuck, args=(arg,), kwargs={'snd': snd})
# Same as before: start, wait, try to kill it
proc.start()
proc.join(timeout=2)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
if proc.exitcode == 0:
# Now if the process terminated without timing out,
# we can read its return value from the pipe:
result = rcv.recv()
rcv.close()
print(result)
else:
# process was terminated or failed in some other way
print('terminated early')
We made snd
an optional parameter, so the function can also be called without a pipe.
Now we know if the process’ exit code is 0 (it exited fine), there’s data in the pipe we can read. Otherwise, it terminated early.
So, now we’ve basically achieved the goal: we can run a function with a timeout, and either we get its return value, or we handle its early termination.
However, it’s very verbose: we have to create a process, a pipe, pass the right arguments, etc.
It would be much nicer if we could just add a decorator to any function, e.g. @timeout(2)
, and have everything set up automatically.
Well, how convenient – Python has decorators.
If you’re new to decorators, I wrote a post about them an at earlier point. A decorator is a function that returns a decorated (wrapped) function. A decorator with an argument requires a function, which returns a parametrized decorator (wrapper) function, which returns a decorated (wrapped) function.
Let’s say you have a function to_decorate
:
def to_decorate():
print('Inside to_decorate')
You could decorate a function manually like this:
def decorator(func):
def runner():
print("Decorated!")
func()
return runner
def to_decorate():
print('Inside to_decorate')
# Note here: we need two function calls
decorator(to_decorate)()
Or you could use the decorator symbol (@
):
def decorator(func):
def runner():
print("Decorated!")
func()
return runner
@decorator
def to_decorate():
print('Inside to_decorate')
# And here we only need one function call
to_decorate()
The result is the same, but with a decorator annotation you simplify the function call.
Especially when it comes to decorators with arguments. Compare this:
def decorator(arg):
def parametrize(func):
def runner():
print(f"Decorated with {arg}!")
func()
return runner
return parametrize
def to_decorate():
print('Inside to_decorate')
# 3 function calls, and we need to pass `42` every time.
decorator(42)(to_decorate)()
With the version using the annotation:
def decorator(arg):
def parametrize(func):
def runner():
print(f"Decorated with {arg}!")
func()
return runner
return parametrize
@decorator(42)
def to_decorate():
print('Inside to_decorate')
# Only 1 function call without any parameters
to_decorate()
So for our timeout decorator, we need 3 levels of defs
.
Here’s a first attempt (which is broken, and I’ll explain why):
import sys
import time
import multiprocessing, multiprocessing.connection
from typing import Optional, Union, Callable, Any
import functools
# This is a union of what our possibly-timed-out function returns,
# and an int for when we need to return the exit code.
# (in practice, don't use `Any`, but your actual return type)
TimeoutReturnType = Union[int, Any]
# The timeout takes a parameter for the number of seconds.
# It's a function (Callable) that takes a function (Callable) and returns a function (Callable).
# Remember: it's a wrapper.
def timeout(n: float) -> Callable[[Callable[..., TimeoutReturnType]], Callable[..., TimeoutReturnType]]:
# The first inner function takes the possibly-running-forever function, and returns a function.
# Remember: it's also a wrapper.
def decorate(nonterminating_func: Callable[..., TimeoutReturnType]) -> Callable[..., TimeoutReturnType]:
# `functools.wraps` helps with preserving some metadata about the wrapped function
@functools.wraps(nonterminating_func)
def wrapper(*args, **kwargs) -> TimeoutReturnType:
# Same as we saw before
rcv, snd = multiprocessing.Pipe()
proc = multiprocessing.Process(target=nonterminating_func, args=(*args,), kwargs={'snd': snd, **kwargs})
proc.start()
proc.join(timeout=n)
if proc.is_alive():
proc.terminate()
proc.join(timeout=10)
if proc.is_alive():
proc.kill()
proc.join()
# Assertion needed for typechecking
assert proc.exitcode is not None, "process should be terminated at this point"
if proc.exitcode == 0:
result = rcv.recv()
rcv.close()
return result
else:
return proc.exitcode
return wrapper
return decorate
# Let's use it.
# We want this function to time out in 2 seconds
@timeout(2)
def might_get_stuck(arg, snd: Optional[multiprocessing.connection.Connection] = None):
# Same as before
time.sleep(600)
if snd is not None:
snd.send(arg)
snd.close()
return arg
if __name__ == '__main__':
arg = sys.argv[1]
# We can just call it directly, and all the multiprocessing/piping
# will be handled by the decorator.
result: TimeoutReturnType = might_get_stuck(arg)
# Result is an int (in our case) if it's an exit code.
if isinstance(result, int):
print('terminated early')
else:
print(result)
First, for type hinting, we define a return type TimeoutReturnType
for a function that can have a timeout: it either returns an int
(the exit code in case of an error) or Any
(in practice, the return type of whatever function we call).
Otherwise, it’s basically the standard format for decorators, except we need to add an assertion that proc.exitcode is not None
to satisfy the type checker: it can be None if the process proc
hasn’t terminated yet, but we know at the point of the assertion it will have terminated, either nicely or after some convincing.
We use functools.wraps
to handle preservation of function name, arguments, docstring, etc.
The problem is, this won’t work.
The error message is sort of cryptic: _pickle.PicklingError: Can't pickle <function might_get_stuck at 0x10296e440>: it's not the same object as __main__.might_get_stuck
.
You see, as part of starting a process, multiprocessing
does a Popen
.
And somewhere in there, the function that is about to be spawned as a new process gets pickle
d.
Here’s a list of things that can be pickled. And there it is, black on white (or white on black if you’re a leet programmer with a dark theme):
- “functions (built-in and user-defined) accessible from the top level of a module”.
Functions can only be pickled if they’re at the top level; in our case, since it’s a decorator, we are actually returning the innermost function, which can’t be pickled. So, we need to use a small hack to define it at the top-level: store the original (undecorated) function in a dictionary, and then run it via that dictionary, through a top-level wrapper function that can be pickled. Yes, it’s a bit ugly and may have some edge cases, but it’s abstracted away by the decorator and I couldn’t come up with a better way to handle this.
import sys
import time
import multiprocessing, multiprocessing.connection
from typing import Optional, Union, Callable, Any
import functools
# Same as before
TimeoutReturnType = Union[int, Any]
# Stores the original, undecorated functions
_original_functions = {}
# Given a function name, runs the corresponding undecorated function
def _timeout_func_runner(name: str, *args, **kwargs) -> Any:
return _original_functions[name](*args, **kwargs)
# Mostly the same as before...
def timeout(n: float) -> Callable[[Callable[..., TimeoutReturnType]], Callable[..., TimeoutReturnType]]:
def decorate(nonterminating_func: Callable[..., TimeoutReturnType]) -> Callable[..., TimeoutReturnType]:
# ...except for this: store the original function, using its name as a key
_original_functions[nonterminating_func.__name__] = nonterminating_func
@functools.wraps(nonterminating_func)
def wrapper(*args, **kwargs) -> TimeoutReturnType:
rcv, snd = multiprocessing.Pipe()
# ...and this: instead of calling the nonterminating function directly,
# call it via the dictionary.
proc = multiprocessing.Process(target=_timeout_func_runner, args=(nonterminating_func.__name__, *args,), kwargs={'snd': snd, **kwargs})
proc.start()
proc.join(timeout=n)
if proc.is_alive():
proc.terminate()
proc.join(timeout=10)
if proc.is_alive():
proc.kill()
proc.join()
assert proc.exitcode is not None, "process should be terminated at this point"
if proc.exitcode == 0:
result = rcv.recv()
rcv.close()
return result
else:
return proc.exitcode
return wrapper
return decorate
# Our calling code does not change at all:
@timeout(2)
def might_get_stuck(arg, snd: Optional[multiprocessing.connection.Connection] = None):
# some code that might not terminate.
# here, simulated by a long sleep
time.sleep(600)
# when computation completes, return a result:
if snd is not None:
snd.send(arg)
snd.close()
return arg
if __name__ == '__main__':
arg = sys.argv[1]
result: TimeoutReturnType = might_get_stuck(arg)
if isinstance(result, int):
print('terminated early')
else:
print(result)
And now it works!