Temporal Python SDK synchronous vs. asynchronous Activity implementations
The Temporal Python SDK supports multiple ways of implementing an Activity:
- Asynchronously using
asyncio
- Synchronously multithreaded using
concurrent.futures.ThreadPoolExecutor
- Synchronously multiprocess using
concurrent.futures.ProcessPoolExecutor
andmultiprocessing.managers.SyncManager
It is important to implement your Activities using the correct method, otherwise your application may fail in sporadic and unexpected ways. Which one you should use depends on your use case. This section provides guidance to help you choose the best approach.
The Python Asynchronous Event Loop and Blocking Calls
First, let's look at how async event loops work in Python. The Python async
event loop runs in a thread and executes all tasks in its thread. When any
task is running in the event loop, the loop is blocked and no other tasks can be
running at the same time within that event loop. Whenever a task executes an
await
expression, the task is suspended, and the event loop begins or resumes
execution of another task.
This means that the event loop can only pass the flow of control when the await
keyword is executed. If a program makes a blocking call, such as one that reads
from a file, makes a synchronous request to a network service, waits for user input,
or anything else that blocks the execution, the entire event loop must wait until
that execution has completed.
Blocking the async event loop in Python would turn your asynchronous program
into a synchronous program that executes serially, defeating the entire purpose
of using asyncio
. This can also lead to potential deadlock, and unpredictable behavior
that causes tasks to be unable to execute. Debugging these issues can be difficult
and time consuming, as locating the source of the blocking call might not always
be immediately evident.
Due to this, Python developers must be extra careful to not make blocking calls from within an asynchronous Activity, or use an async safe library to perform these actions.
For example, making an HTTP call with the popular requests
library within an
asynchronous Activity would lead to blocking your event loop. If you want to make
an HTTP call from within an asynchronous Activity, you should use an async-safe HTTP library
such as aiohttp
or httpx
. Otherwise, use a synchronous Activity.
Python SDK Worker Execution Architecture
Python workers have following components for executing code:
- Your event loop, which runs Tasks from async Activities plus the rest of the Temporal Worker, such as communicating with the server.
- An executor for executing Activity Tasks from synchronous Activities. A thread pool executor is recommended.
- A thread pool executor for executing Workflow Tasks.
See Also: docs for
worker.__init__()
Activities
- The Activities and the temporal worker SDK code both run the default asyncio event loop or whatever event loop you give the Worker.
- Synchronous Activities run in the
activity_executor
.
Workflows
Since Workflow Tasks have the following three properties, they're run in threads.
- are CPU bound
- need to be timed out for deadlock detection
- need to not block other Workflow Tasks
The workflow_task_executor
is the thread pool these Tasks are run on.
The fact that Workflow Tasks run in a thread pool can be confusing at first because Workflow Definitions are async
.
The key differentiator is that the async
in Workflow Definitions isn't referring to the standard event loop -- it's referring to the Workflow's own event loop.
Each Workflow gets its own “Workflow event loop,” which is deterministic, and described in the Python SDK blog.
The Workflow event loop doesn't constantly loop -- it just gets cycled through during a Workflow Task to make as much progress as possible on all of its futures.
When it can no longer make progress on any of its futures, then the Workflow Task is complete.
Number of CPU cores
The only ways to use more than one core in a python Worker (considering Python's GIL) are:
- Run more than one Worker Process.
- Run the synchronous Activities in a process pool executor, but a thread pool executor is recommended.
A Worker infrastructure option: Separate Activity and Workflow Workers
To reduce the risk of event loops or executors getting blocked, some users choose to deploy separate Workers for Workflow Tasks and Activity Tasks.
Activity Definition
By default, Activities should be synchronous rather than asynchronous. You should only make an Activity asynchronous if you are certain that it doesn't block the event loop.
This is because if you have blocking code in an async def
function,
it blocks your event loop and the rest of Temporal, which can cause bugs that are
hard to diagnose, including freezing your worker and blocking Workflow progress
(because Temporal can't tell the server that Workflow Tasks are completing).
The reason synchronous Activities help is because they
run in the activity_executor
(docs for worker.__init__()
)
rather than in the global event loop,
which helps because:
- There's no risk of accidentally blocking the global event loop.
- If you have multiple Activity Tasks running in a thread pool rather than an event loop, one bad Activity Task can't slow down the others; this is because the OS scheduler preemptively switches between threads, which the event loop coordinator doesn't do.
See Also: "Types of Activities" section of Python SDK README
How to implement Synchronous Activities
The following code is a synchronous Activity Definition.
It takes a name (str
) as input and returns a
customized greeting (str
) as output.
It makes a call to a microservice, and when making this call,
you'll notice that it uses the requests
library. This is safe to do in
synchronous Activities.
import urllib.parse
import requests
from temporalio import activity
class TranslateActivities:
@activity.defn
def greet_in_spanish(self, name: str) -> str:
greeting = self.call_service("get-spanish-greeting", name)
return greeting
# Utility method for making calls to the microservices
def call_service(self, stem: str, name: str) -> str:
base = f"http://localhost:9999/{stem}"
url = f"{base}?name={urllib.parse.quote(name)}"
response = requests.get(url)
return response.text
The preceeding example doesn't share a session across the Activity, so
__init__
was removed. While requests
does have the ability to create sessions,
it's currently unknown if they're thread safe. Due to no longer having or needing
__init__
, the case could be made here to not implement the Activities as a class,
but just as decorated functions as shown here:
@activity.defn
def greet_in_spanish(name: str) -> str:
greeting = call_service("get-spanish-greeting", name)
return greeting
# Utility method for making calls to the microservices
def call_service(stem: str, name: str) -> str:
base = f"http://localhost:9999/{stem}"
url = f"{base}?name={urllib.parse.quote(name)}"
response = requests.get(url)
return response.text
Whether to implement Activities as class methods or functions is a design choice left up to the developer when cross-activity state isn't needed. Both are equally valid implementations.
How to run Synchronous Activities on a Worker
When running synchronous Activities, the Worker
needs to have an activity_executor
. Temporal
recommends using a ThreadPoolExecutor
as shown here:
with ThreadPoolExecutor(max_workers=42) as executor:
worker = Worker(
# ...
activity_executor=executor,
# ...
)
How to Implement Asynchronous Activities
The following code is an implementation of the preceeding Activity, but as an asynchronous Activity Definition.
It makes
a call to a microservice, accessed through HTTP, to request this
greeting in Spanish. This Activity uses the aiohttp
library to make an async
safe HTTP request. Using the requests
library here would have resulting in
blocking code within the async event loop, which will block the entire async
event loop. For more in-depth information about this issue, refer to the
Python asyncio documentation.
The following code also implements the Activity Definition as a class, rather than a
function. The aiohttp
library requires an established Session
to perform the
HTTP request. It would be inefficient to establish a Session
every time an
Activity is invoked, so instead this code accepts a Session
object as an instance
parameter and makes it available to the methods. This approach will also be
beneficial when the execution is over and the Session
needs to be closed.
In this example, the Activity supplies the name in the URL and retrieves the greeting from the body of the response.
import aiohttp
import urllib.parse
from temporalio import activity
class TranslateActivities:
def __init__(self, session: aiohttp.ClientSession):
self.session = session
@activity.defn
async def greet_in_spanish(self, name: str) -> str:
greeting = await self.call_service("get-spanish-greeting", name)
return greeting
# Utility method for making calls to the microservices
async def call_service(self, stem: str, name: str) -> str:
base = f"http://localhost:9999/{stem}"
url = f"{base}?name={urllib.parse.quote(name)}"
async with self.session.get(url) as response:
translation = await response.text()
if response.status >= 400:
raise ApplicationError(
f"HTTP Error {response.status}: {translation}",
# We want to have Temporal automatically retry 5xx but not 4xx
non_retryable=response.status < 500,
)
return translation
How to run synchronous code from an asynchronous activity
If your Activity is asynchronous and you don't want to change it to synchronous, but you need to run blocking code inside it, then you can use python utility functions to run synchronous code in an asynchronous function:
loop.run_in_executor()
, which is also mentioned in the "running blocking code" section of the "developing with asyncio" guideasyncio.to_thread()
When Should You Use Async Activities
Asynchronous Activities have many advantages, such as potential speed up of execution. However, as discussed above, making unsafe calls within the async event loop can cause sporadic and difficult to diagnose bugs. For this reason, we recommend using asynchronous Activities only when you are certain that your Activities are async safe and don't make blocking calls.
If you experience bugs that you think may be a result of an unsafe call being made in an asynchronous Activity, convert it to a synchronous Activity and see if the issue resolves.