- Published on
Asynchronous Services
Overview
tldr; Any Magento service method can be executed asynchronously outside of a request lifecycle.
If you've ever worked with the Magento Web APIs, you're probably aware of the async version (or the bulk version) of it. If not, it's simple enough to use. If you prepend /async
before /V1
of any API route, the API will be executed asynchronously.
For example:
Sync API
POST /V1/products
Async API
POST /async/V1/products
When you call the synchronous API you get the result right away. In the above case calling this API would create a product and then return some data.
But if you use the async API, you will be returned a uuid
and a status that denotes the status of the request. The request will be fullfiled at some point in the future and you can query the status of the operation using the uuid
that was returned. This is what asynchronous means in this context.
But interestingly, there's no explicit declaration for the asynchronous APIs. Even if you were to create your own custom APIs with the framework, you automatically get an asynchronous version of the API.
Asynchronous Invocation
An asynchronous request's journey starts at \Magento\WebapiAsync\Controller\Rest\AsynchronousRequestProcessor::process
The code that handles it is pretty straightforward.
$entitiesParamsArray = $this->inputParamsResolver->resolve();
$topicName = $this->getTopicName($request);
try {
$asyncResponse = $this->asyncBulkPublisher->publishMass(
$topicName,
$entitiesParamsArray
);
} catch (BulkException $bulkException) {
$asyncResponse = $bulkException->getData();
}
Request params are resolved in typical Magento fashion. Then a topic name is retrieved and is published to a queue. The response or errors are then passed back as the result to the async invocation.
We know that RabbitMQ needs to be set up in order for the async/bulk APIs to work. And the AsynchronousRequestProcessor
indicates that it is indeed being published to a topic somewhere.
But if you've worked with framework-message-queue
side of things in Magento you may have noticed that, before a message is published or consumed, it goes through several layers of validation that there is surely, really, definitely a communication.xml
and a queue_publisher.xml
defined already for a given topic.
Yet there's no explicit async API queue and topic declaration anywhere.
If you try to search for configuration files you might find one or two webapi_async.xml
. But these look nothing like an async service declaration. They're merely to add aliases to existing async routes.
<!-- vendor/magento/module-catalog/etc/webapi_async.xml -->
<?xml version="1.0"?>
<services xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_WebapiAsync:etc/webapi_async.xsd">
<route url="async/bulk/V1/products/bySku" method="PUT" alias="async/bulk/V1/products"/>
</services>
Service Discovery
The answer is, Magento already knows it from the webapi service declarations. For every route service that's declared in a webapi.xml
, there exists a corresponding async declaration implicitly. Even for custom ones that you write as part of your development.
The publisher and the communication configs are generated from the webapi async config which itself is generated from the webapi config. 1
This means, for every webapi service declaration, an equivalent of a writing a communication.xml
and a queue_publisher.xml
is derived automatically.
<!-- communication.xml -->
<topic
name="async.magento.catalog.api.productrepositoryinterface.save.POST"
schema="\Magento\Catalog\Api\ProductRepositoryInterface::save"
is_synchronous="false"
>
<handler name="async" type="\Magento\Catalog\Api\ProductRepositoryInterface" method="save"/>
</topic>
<!-- queue_publisher.xml -->
<publisher topic="async.magento.catalog.api.productrepositoryinterface.save.POST">
<connection name="amqp" exchange="magento" disabled="false" />
</publisher>
Technically, this means that this is all it takes to make ANY service method asynchronous without making it an API.
Topic Binding
Notice all the the topics have the async.
prefix. In Magento_WebapiAsync
, there's a queue declaration and an exchange declaration with a binding on it. I won't go over the details on how the queue and bindings are created because honestly this is its own topic.
<exchange name="magento" type="topic" connection="amqp">
<binding id="async.operations.all" topic="async.#" destinationType="queue" destination="async.operations.all"/>
</exchange>
The above configuration creates a binding in RabbitMQ that any message with the topic async.#
should be routed to the async.operations.all
queue. The async.#
is a wildcard in RabbitMQ.
A consumer is also declared for that topic in a queue_consumer.xml
<consumer
name="async.operations.all"
queue="async.operations.all"
connection="amqp"
consumerInstance="Magento\AsynchronousOperations\Model\MassConsumer"
/>
Note that there's no handler
node although that can be specified. The service executors are defined in the communication.xml
dynamically when the configurations are generated.
Conclusion
If you have a service method like \The\Service\Api\PaymentManagementInterface::process
. You can execute this asynchronously by declaring a communication.xml
and a queue_publisher.xml
for a topic with an async.
prefix like so
<topic name="async.payment_processor.process" schema="\The\Service\Api\PaymentManagementInterface::process"
is_synchronous="false">
<handler name="async" type="\The\Service\Api\PaymentManagementInterface" method="process"/>
</topic>
<publisher topic="async.payment_processor.process">
<connection name="amqp" exchange="magento" disabled="false" />
</publisher>
and then publishing to the queue with the MassPublisher
$this->asyncPublisher->publishMass('async.payment_processor.process', [[$data]])
And since we're using the built-in async
consumer handle things, there are some nice things that come for free.
- The whole operation gets wrapped into an
OperationInterface
just like the async/bulk API invocations. - You get back a
uuid
and a status stating if the operation was accepted successfully. Which you might use to query the existing endpoint in future to check if it was completed or not. - The operation gets logged in
magento_operation
with the serialized data or an error if it failed to execute which might give you some visibility around what's being processed or failed.
Footnotes
If you're interested in seeing how they are generated look at
\Magento\WebapiAsync\Model\Config
for Async API, Communication\Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader\Communication
, Publisher\Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader\Publisher
↩