Home Artificial Intelligence Unleashing the Power of Python Asyncio’s Queue Introduction The Magical World of asyncio’s Queue The Adventures of the Producer-Consumer Pattern in asyncio Implementing the Producer-Consumer Pattern with asyncio’s Queue Introducing the PriorityQueue Conclusion

Unleashing the Power of Python Asyncio’s Queue Introduction The Magical World of asyncio’s Queue The Adventures of the Producer-Consumer Pattern in asyncio Implementing the Producer-Consumer Pattern with asyncio’s Queue Introducing the PriorityQueue Conclusion

0
Unleashing the Power of Python Asyncio’s Queue
Introduction
The Magical World of asyncio’s Queue
The Adventures of the Producer-Consumer Pattern in asyncio
Implementing the Producer-Consumer Pattern with asyncio’s Queue
Introducing the PriorityQueue
Conclusion

The queue mentioned earlier is a First-In-First-Out (FIFO) queue, where the primary item to enter the queue is the primary to be retrieved. That is suitable when all tasks within the queue have the identical priority.

Nonetheless, consider the next situation:

Suppose there may be a queue with tasks waiting in line, each requiring a protracted processing time.

An error log or VIP user access is a high-priority task that needs immediate attention. What should we do?

Photo by Ethan Hu on Unsplash

That is where asyncio.PriorityQueue comes into play.

Briefly describe asyncio.PriorityQueue’s implementation

Unlike FIFO queues based on lists, asyncio.PriorityQueue is predicated on heaps. It’s built using a binary tree structure.

It’s possible you’ll be acquainted with binary search trees, which be certain that essentially the most minor node is at all times the leftmost node.

Nonetheless, the binary tree in asyncio.PriorityQueue ensures that essentially the most minor node is at all times at the highest, so the best priority node is permanently removed first.

On the left is the binary tree used by PriorityQueue, and on the right is the binary search tree.
On the left is the binary tree utilized by PriorityQueue, and on the precise is the binary search tree. Image by Creator

Real-world example with asyncio.PriorityQueue

Let’s illustrate the usage of asyncio.PriorityQueue with a real-world scenario that exists in practice.

Imagine we have now an order service API. The API takes time for every order to process, but we will’t keep users waiting too long.

So when a user places an order, the API first puts the order right into a queue, allowing a background task to process it asynchronously while immediately returning a message to the user.

This API accepts orders from two forms of users: regular users and VIP users. It must be certain that VIP user orders are processed with the best priority.

VIP orders are processed with the highest priority.
VIP orders are processed with the best priority. Image by Creator

To maintain the educational curve low for readers, in this instance, we’ll use aiohttp to implement the server. The particular code is as follows:

First, we define an enumeration marking the 2 categories: regular users and VIP users.

Next, we use dataclass to define a user’s order, which comprises the user type and order processing duration. The order duration shouldn’t be considered in priority sorting.

Then we define the patron method process_order_worker, which retrieves orders from the queue and simulates the order processing.

Don’t forget to make use of queue.task_done() to inform the queue that we finished processing the order.

Following that, we implement the order API using aiohttp. This API responds to user requests, generates an order object, and places it within the asyncio.PriorityQueue.

It then immediately returns a response to the user, avoiding user wait time.

When this system starts, we use create_order_queue to initialize the queue and order consumption tasks.

When this system ends, we use destroy_order_queue to be certain that all orders within the queue are processed and the background tasks are closed appropriately.

queue.join() will wait for all the info within the queue to be processed. asyncio.wait_for sets a timeout of 20 seconds, after which it would now not wait queue.join() to finish.

We will test this implementation using PyCharm’s HTTP Request:

API prioritizes orders from VIP users whenever possible.
API prioritizes orders from VIP users each time possible. Image by Creator

As you may see, the 2 high-priority tasks are processed as expected. Perfect!

LEAVE A REPLY

Please enter your comment!
Please enter your name here