Skip to main content

Temporal Python SDK synchronous vs. asynchronous Activity implementations

The Temporal Python SDK supports multiple ways of implementing an Activity:

It is important to implement your Activities using the correct method, otherwise your application may fail in sporadic and unexpected ways. Which one you should use depends on your use case. This section provides guidance to help you choose the best approach.

The Python Asynchronous Event Loop and Blocking Calls

First, let's look at how async event loops work in Python. The Python async event loop runs in a thread and executes all tasks in its thread. When any task is running in the event loop, the loop is blocked and no other tasks can be running at the same time within that event loop. Whenever a task executes an await expression, the task is suspended, and the event loop begins or resumes execution of another task.

This means that the event loop can only pass the flow of control when the await keyword is executed. If a program makes a blocking call, such as one that reads from a file, makes a synchronous request to a network service, waits for user input, or anything else that blocks the execution, the entire event loop must wait until that execution has completed.

Blocking the async event loop in Python would turn your asynchronous program into a synchronous program that executes serially, defeating the entire purpose of using asyncio. This can also lead to potential deadlock, and unpredictable behavior that causes tasks to be unable to execute. Debugging these issues can be difficult and time consuming, as locating the source of the blocking call might not always be immediately evident.

Due to this, Python developers must be extra careful to not make blocking calls from within an asynchronous Activity, or use an async safe library to perform these actions.

For example, making an HTTP call with the popular requests library within an asynchronous Activity would lead to blocking your event loop. If you want to make an HTTP call from within an asynchronous Activity, you should use an async-safe HTTP library such as aiohttp or httpx. Otherwise, use a synchronous Activity.

Python SDK Worker Execution Architecture

Python workers have following components for executing code:

  • Your event loop, which runs Tasks from async Activities plus the rest of the Temporal Worker, such as communicating with the server.
  • An executor for executing Activity Tasks from synchronous Activities. A thread pool executor is recommended.
  • A thread pool executor for executing Workflow Tasks.

See Also: docs for worker.__init__()

Activities

  • The Activities and the temporal worker SDK code both run the default asyncio event loop or whatever event loop you give the Worker.
  • Synchronous Activities run in the activity_executor.

Workflows

Since Workflow Tasks have the following three properties, they're run in threads.

  • are CPU bound
  • need to be timed out for deadlock detection
  • need to not block other Workflow Tasks

The workflow_task_executor is the thread pool these Tasks are run on. The fact that Workflow Tasks run in a thread pool can be confusing at first because Workflow Definitions are async. The key differentiator is that the async in Workflow Definitions isn't referring to the standard event loop -- it's referring to the Workflow's own event loop. Each Workflow gets its own “Workflow event loop,” which is deterministic, and described in the Python SDK blog. The Workflow event loop doesn't constantly loop -- it just gets cycled through during a Workflow Task to make as much progress as possible on all of its futures. When it can no longer make progress on any of its futures, then the Workflow Task is complete.

Number of CPU cores

The only ways to use more than one core in a python Worker (considering Python's GIL) are:

  • Run more than one Worker Process.
  • Run the synchronous Activities in a process pool executor, but a thread pool executor is recommended.

A Worker infrastructure option: Separate Activity and Workflow Workers

To reduce the risk of event loops or executors getting blocked, some users choose to deploy separate Workers for Workflow Tasks and Activity Tasks.

Activity Definition

By default, Activities should be synchronous rather than asynchronous. You should only make an Activity asynchronous if you are certain that it doesn't block the event loop.

This is because if you have blocking code in an async def function, it blocks your event loop and the rest of Temporal, which can cause bugs that are hard to diagnose, including freezing your worker and blocking Workflow progress (because Temporal can't tell the server that Workflow Tasks are completing). The reason synchronous Activities help is because they run in the activity_executor (docs for worker.__init__()) rather than in the global event loop, which helps because:

  • There's no risk of accidentally blocking the global event loop.
  • If you have multiple Activity Tasks running in a thread pool rather than an event loop, one bad Activity Task can't slow down the others; this is because the OS scheduler preemptively switches between threads, which the event loop coordinator doesn't do.

See Also: "Types of Activities" section of Python SDK README

How to implement Synchronous Activities

The following code is a synchronous Activity Definition. It takes a name (str) as input and returns a customized greeting (str) as output.

It makes a call to a microservice, and when making this call, you'll notice that it uses the requests library. This is safe to do in synchronous Activities.

import urllib.parse
import requests
from temporalio import activity

class TranslateActivities:

@activity.defn
def greet_in_spanish(self, name: str) -> str:
greeting = self.call_service("get-spanish-greeting", name)
return greeting

# Utility method for making calls to the microservices
def call_service(self, stem: str, name: str) -> str:
base = f"http://localhost:9999/{stem}"
url = f"{base}?name={urllib.parse.quote(name)}"

response = requests.get(url)
return response.text

The preceeding example doesn't share a session across the Activity, so __init__ was removed. While requests does have the ability to create sessions, it's currently unknown if they're thread safe. Due to no longer having or needing __init__, the case could be made here to not implement the Activities as a class, but just as decorated functions as shown here:

@activity.defn
def greet_in_spanish(name: str) -> str:
greeting = call_service("get-spanish-greeting", name)
return greeting

# Utility method for making calls to the microservices
def call_service(stem: str, name: str) -> str:
base = f"http://localhost:9999/{stem}"
url = f"{base}?name={urllib.parse.quote(name)}"

response = requests.get(url)
return response.text

Whether to implement Activities as class methods or functions is a design choice left up to the developer when cross-activity state isn't needed. Both are equally valid implementations.

How to run Synchronous Activities on a Worker

When running synchronous Activities, the Worker needs to have an activity_executor. Temporal recommends using a ThreadPoolExecutor as shown here:

with ThreadPoolExecutor(max_workers=42) as executor:
worker = Worker(
# ...
activity_executor=executor,
# ...
)

How to Implement Asynchronous Activities

The following code is an implementation of the preceeding Activity, but as an asynchronous Activity Definition.

It makes a call to a microservice, accessed through HTTP, to request this greeting in Spanish. This Activity uses the aiohttp library to make an async safe HTTP request. Using the requests library here would have resulting in blocking code within the async event loop, which will block the entire async event loop. For more in-depth information about this issue, refer to the Python asyncio documentation.

The following code also implements the Activity Definition as a class, rather than a function. The aiohttp library requires an established Session to perform the HTTP request. It would be inefficient to establish a Session every time an Activity is invoked, so instead this code accepts a Session object as an instance parameter and makes it available to the methods. This approach will also be beneficial when the execution is over and the Session needs to be closed.

In this example, the Activity supplies the name in the URL and retrieves the greeting from the body of the response.

import aiohttp
import urllib.parse
from temporalio import activity

class TranslateActivities:
def __init__(self, session: aiohttp.ClientSession):
self.session = session

@activity.defn
async def greet_in_spanish(self, name: str) -> str:
greeting = await self.call_service("get-spanish-greeting", name)
return greeting

# Utility method for making calls to the microservices
async def call_service(self, stem: str, name: str) -> str:
base = f"http://localhost:9999/{stem}"
url = f"{base}?name={urllib.parse.quote(name)}"

async with self.session.get(url) as response:
translation = await response.text()

if response.status >= 400:
raise ApplicationError(
f"HTTP Error {response.status}: {translation}",
# We want to have Temporal automatically retry 5xx but not 4xx
non_retryable=response.status < 500,
)

return translation

How to run synchronous code from an asynchronous activity

If your Activity is asynchronous and you don't want to change it to synchronous, but you need to run blocking code inside it, then you can use python utility functions to run synchronous code in an asynchronous function:

When Should You Use Async Activities

Asynchronous Activities have many advantages, such as potential speed up of execution. However, as discussed above, making unsafe calls within the async event loop can cause sporadic and difficult to diagnose bugs. For this reason, we recommend using asynchronous Activities only when you are certain that your Activities are async safe and don't make blocking calls.

If you experience bugs that you think may be a result of an unsafe call being made in an asynchronous Activity, convert it to a synchronous Activity and see if the issue resolves.