Published on

Asynchronous Services

Overview

tldr; Any Magento service method can be executed asynchronously outside of a request lifecycle.

If you've ever worked with the Magento Web APIs, you're probably aware of the async version (or the bulk version) of it. If not, it's simple enough to use. If you prepend /async before /V1 of any API route, the API will be executed asynchronously.

For example:

Sync API
POST /V1/products

Async API
POST /async/V1/products

When you call the synchronous API you get the result right away. In the above case calling this API would create a product and then return some data.

But if you use the async API, you will be returned a uuid and a status that denotes the status of the request. The request will be fullfiled at some point in the future and you can query the status of the operation using the uuid that was returned. This is what asynchronous means in this context.

But interestingly, there's no explicit declaration for the asynchronous APIs. Even if you were to create your own custom APIs with the framework, you automatically get an asynchronous version of the API.

Asynchronous Invocation

An asynchronous request's journey starts at \Magento\WebapiAsync\Controller\Rest\AsynchronousRequestProcessor::process The code that handles it is pretty straightforward.

$entitiesParamsArray = $this->inputParamsResolver->resolve();
$topicName = $this->getTopicName($request);

try {
    $asyncResponse = $this->asyncBulkPublisher->publishMass(
        $topicName,
        $entitiesParamsArray
    );
} catch (BulkException $bulkException) {
    $asyncResponse = $bulkException->getData();
}

Request params are resolved in typical Magento fashion. Then a topic name is retrieved and is published to a queue. The response or errors are then passed back as the result to the async invocation.

We know that RabbitMQ needs to be set up in order for the async/bulk APIs to work. And the AsynchronousRequestProcessor indicates that it is indeed being published to a topic somewhere.

But if you've worked with framework-message-queue side of things in Magento you may have noticed that, before a message is published or consumed, it goes through several layers of validation that there is surely, really, definitely a communication.xml and a queue_publisher.xml defined already for a given topic.

Yet there's no explicit async API queue and topic declaration anywhere.

If you try to search for configuration files you might find one or two webapi_async.xml. But these look nothing like an async service declaration. They're merely to add aliases to existing async routes.

<!-- vendor/magento/module-catalog/etc/webapi_async.xml -->

<?xml version="1.0"?>
<services xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_WebapiAsync:etc/webapi_async.xsd">
    <route url="async/bulk/V1/products/bySku" method="PUT" alias="async/bulk/V1/products"/>
</services>

Service Discovery

The answer is, Magento already knows it from the webapi service declarations. For every route service that's declared in a webapi.xml, there exists a corresponding async declaration implicitly. Even for custom ones that you write as part of your development.

The publisher and the communication configs are generated from the webapi async config which itself is generated from the webapi config. 1

This means, for every webapi service declaration, an equivalent of a writing a communication.xml and a queue_publisher.xml is derived automatically.

<!-- communication.xml -->
<topic
    name="async.magento.catalog.api.productrepositoryinterface.save.POST"
    schema="\Magento\Catalog\Api\ProductRepositoryInterface::save"
    is_synchronous="false"
>
    <handler name="async" type="\Magento\Catalog\Api\ProductRepositoryInterface" method="save"/>
</topic>
<!-- queue_publisher.xml -->
<publisher topic="async.magento.catalog.api.productrepositoryinterface.save.POST">
    <connection name="amqp" exchange="magento" disabled="false" />
</publisher>

Technically, this means that this is all it takes to make ANY service method asynchronous without making it an API.

Topic Binding

Notice all the the topics have the async. prefix. In Magento_WebapiAsync, there's a queue declaration and an exchange declaration with a binding on it. I won't go over the details on how the queue and bindings are created because honestly this is its own topic.

<exchange name="magento" type="topic" connection="amqp">
    <binding id="async.operations.all" topic="async.#" destinationType="queue" destination="async.operations.all"/>
</exchange>

The above configuration creates a binding in RabbitMQ that any message with the topic async.# should be routed to the async.operations.all queue. The async.# is a wildcard in RabbitMQ.

A consumer is also declared for that topic in a queue_consumer.xml

<consumer
    name="async.operations.all"
    queue="async.operations.all"
    connection="amqp"
    consumerInstance="Magento\AsynchronousOperations\Model\MassConsumer"
/>

Note that there's no handler node although that can be specified. The service executors are defined in the communication.xml dynamically when the configurations are generated.

Conclusion

If you have a service method like \The\Service\Api\PaymentManagementInterface::process. You can execute this asynchronously by declaring a communication.xml and a queue_publisher.xml for a topic with an async. prefix like so

<topic name="async.payment_processor.process" schema="\The\Service\Api\PaymentManagementInterface::process"
        is_synchronous="false">
    <handler name="async" type="\The\Service\Api\PaymentManagementInterface" method="process"/>
</topic>
<publisher topic="async.payment_processor.process">
    <connection name="amqp" exchange="magento" disabled="false" />
</publisher>

and then publishing to the queue with the MassPublisher

$this->asyncPublisher->publishMass('async.payment_processor.process', [[$data]])

And since we're using the built-in async consumer handle things, there are some nice things that come for free.

  • The whole operation gets wrapped into an OperationInterface just like the async/bulk API invocations.
  • You get back a uuid and a status stating if the operation was accepted successfully. Which you might use to query the existing endpoint in future to check if it was completed or not.
  • The operation gets logged in magento_operation with the serialized data or an error if it failed to execute which might give you some visibility around what's being processed or failed.

Footnotes

  1. If you're interested in seeing how they are generated look at \Magento\WebapiAsync\Model\Config for Async API, Communication \Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader\Communication, Publisher \Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader\Publisher