The queue mentioned earlier is a First-In-First-Out (FIFO) queue, where the primary item to enter the queue is the primary to be retrieved. That is suitable when all tasks within the queue have the identical priority.
Nonetheless, consider the next situation:
Suppose there may be a queue with tasks waiting in line, each requiring a protracted processing time.
An error log or VIP user access is a high-priority task that needs immediate attention. What should we do?
That is where asyncio.PriorityQueue
comes into play.
Briefly describe asyncio.PriorityQueue’s implementation
Unlike FIFO queues based on lists, asyncio.PriorityQueue
is predicated on heaps. It’s built using a binary tree structure.
It’s possible you’ll be acquainted with binary search trees, which be certain that essentially the most minor node is at all times the leftmost node.
Nonetheless, the binary tree in asyncio.PriorityQueue
ensures that essentially the most minor node is at all times at the highest, so the best priority node is permanently removed first.
Real-world example with asyncio.PriorityQueue
Let’s illustrate the usage of asyncio.PriorityQueue
with a real-world scenario that exists in practice.
Imagine we have now an order service API. The API takes time for every order to process, but we will’t keep users waiting too long.
So when a user places an order, the API first puts the order right into a queue, allowing a background task to process it asynchronously while immediately returning a message to the user.
This API accepts orders from two forms of users: regular users and VIP users. It must be certain that VIP user orders are processed with the best priority.
To maintain the educational curve low for readers, in this instance, we’ll use aiohttp
to implement the server. The particular code is as follows:
First, we define an enumeration marking the 2 categories: regular users and VIP users.
Next, we use dataclass
to define a user’s order, which comprises the user type and order processing duration. The order duration shouldn’t be considered in priority sorting.
Then we define the patron method process_order_worker
, which retrieves orders from the queue and simulates the order processing.
Don’t forget to make use of queue.task_done()
to inform the queue that we finished processing the order.
Following that, we implement the order API using aiohttp
. This API responds to user requests, generates an order object, and places it within the asyncio.PriorityQueue
.
It then immediately returns a response to the user, avoiding user wait time.
When this system starts, we use create_order_queue
to initialize the queue and order consumption tasks.
When this system ends, we use destroy_order_queue
to be certain that all orders within the queue are processed and the background tasks are closed appropriately.
queue.join()
will wait for all the info within the queue to be processed. asyncio.wait_for
sets a timeout of 20 seconds, after which it would now not wait queue.join()
to finish.
We will test this implementation using PyCharm’s HTTP Request:
As you may see, the 2 high-priority tasks are processed as expected. Perfect!