Skip to content

Musings of an Anonymous Geek

Made with only the finest 1's and 0's

Menu
  • About
  • Search Results
Menu

PyTPMOTW: py-amqplib

Posted on April 3, 2010 by bkjones

What’s This Module For?

To interact with a queue broker implementing version 0.8 of the Advanced Message Queueing Protocol (AMQP) standard. Copies of various versions of the specification can be found here. At time of writing, 0.10 is the latest version of the spec, but it seems that many popular implementations used in production environments today are still using 0.8, presumably awaiting a finalization of v.1.0 of the spec, which is a work in progress.

What is AMQP?

AMQP is a queuing/messaging protocol that is implemented by server daemons (called ‘brokers’) like RabbitMQ, ActiveMQ, Apache Qpid, Red Hat Enterprise MRG, and OpenAMQ. Though messaging protocols used in the enterprise are historically proprietary, AMQP has a bold and vocal stance that AMQP will be:

  • Broadly applicable for enterprise use
  • Totally open
  • Platform agnostic
  • Interoperable

The working group consists of several huge enterprises who have a vested interest in a protocol that meets these requirements. Most are either huge enterprises who are (or were) a victim of the proprietary lock-in that came with what will now likely become ‘legacy’ protocols, or implementers of the protocols, who will sell products and services around their implementation. Here’s a brief list of those involved in the AMQP working group:

  • JPMorgan Chase (the initial developers of the protocol, along with iMatix)
  • Goldman Sachs
  • Red Hat Software
  • Cisco Systems
  • Novell

Message brokers can facilitate an awfully large amount of flexibility in an architecture. They can be used to integrate applications across platforms and languages, enable asynchronous operations for web front ends, modularize and more easily distribute complex processing operations.

Basic Publishing

The first thing to know is that when you code against an AMQP broker, you’re dealing with a hierarchy: a ‘vhost’ contains one or more ‘exchanges’ which themselves can be bound to one or more ‘queues’. Here’s how you can programmatically create an exchange and queue, bind them together, and publish a message:

from amqplib import client_0_8 as amqp

conn = amqp.Connection(userid='guest', password='guest', host='localhost', virtual_host='/', ssl=False)

# Create a channel object, queue, exchange, and binding.
chan = conn.channel()
chan.queue_declare('myqueue', durable=True)
chan.exchange_declare('myexchange', type='direct', durable=True)
chan.queue_bind('myqueue', 'myexchange', routing_key='myq.myx')

# Create an AMQP message object

msg = amqp.Message('This is a test message')
chan.basic_publish(msg, 'myexchange', 'myq.myx')

As far as we know, we have one exchange and one queue on our server right now, and if that’s the case, then technically the routing key I’ve used isn’t required. However, I strongly suggest that you always use a routing key to avoid really odd (and implementation-specific) behavior like getting multiple copies of a message on the consumer side of the equation, or getting odd exceptions from the server. The routing key can be arbitrary text like I’ve used above, or you can use a common formula of using ‘.’ as your routing key. Just remember that without the routing key, the minute more than one queue is bound to an exchange, the exchange has no way of knowing which queue to route a message to. Remeber: you don’t publish to a queue, you publish to an exchange and tell it which queue it goes in via the routing key.

Basic Consumption

Now that we’ve published a message, how do we get our hands on it? There are two methods: basic_get, which will ‘get’ a single message from the queue, or ‘basic_consume’, which technically doesn’t get *any* messages: it registers a handler with the server and tells it to send messages along as they arrive, which is great for high-volume messaging operations.

Here’s the ‘basic_get’ version of a client to grab the message we just published:

msg = chan.basic_get(queue='myqueue', no_ack=False)
chan.basic_ack(msg.delivery_tag)

In the above, I’ve used the same channel I used to publish the message to get it back again using the basic_get operation. I then acknowledged receipt of the message by sending the server a ‘basic_ack’, passing along the delivery_tag the server included as part of the incoming message.

Consuming Mass Quantities

Using basic_consume takes a little more thought than basic_get, because basic_consume does nothing more than register a method with the server to tell it to start sending messages down the pipe. Once that’s done, however, it’s up to you to do a chan.wait() to wait for messages to show up, and find some elegant way of breaking out of this wait() operation. I’ve seen and used different techniques myself, and the right thing will depend on the application.

The basic_consume method also requires a callback method which is called for each incoming message, and is passed the amqp.Message object when it arrives.

Here’s a bit of code that defines a callback method, calls basic_consume, and does a chan.wait():

consumer_tag = 'foo'
def process(msg):
   txt = msg.body
   if '-1' in txt:
      print 'Got -1'
      chan.basic_cancel(consumer_tag)
      chan.close()
   else: 
      print 'Got message!'

chan.basic_consume('messages', callback=process, consumer_tag=consumer_tag)
while True:
   print 'Message processed. Next?'
   try:
      chan.wait()
   except IOError as out:
      print "Got an IOError: %s" % out
      break
   if not chan.is_open:
      print "Done processing. Later"
      break

So, basic_consume tells the server ‘Start sending any and all messages!’. The server registers a method with a name given by the consumer_tag argument, or it assigns one and it becomes the return value of basic_consume(). I define one here because I don’t want to run into race conditions where I want to call basic_cancel() with a consumer_tag variable that doesn’t exist yet, or is out of scope, or whatever. In the callback, I look for a sentinel message whose body contains ‘-1’, and at that point I call basic_cancel (passing in the consumer_tag so the server knows who to stop sending messages to), and I close the channel. In the ‘while True’, the channel object checks its status and exits if it’s not open.

The above example starts to uncover some issues with py-amqplib. It’s not clear how errors coming back from the server are handled, as opposed to errors caused by the processing code, for example. It’s also a little clumsy trying to determine the logic for breaking out of the loop. In this case there’s a sentinel message sent to the queue representing the final message on the stack, at which point our ‘process()’ callback closes the channel, but then the channel has to check its own status to move forward. Just returning False from process() doesn’t break out of the while loop, because it’s not looking for a return value from that function. We could have our process() function raise an error of its own as well, which might be a bit more elegant, if also a bit more work.

Moving Ahead

What I’ve covered here actually covers perhaps 90% of the common cases for amqplib, but there’s plenty more you can do with it. There are various exchange types, including fanout exchanges and topic exchanges, which can facilitate more interesting messaging and pub/sub models. To learn more about them, here are a couple of places to go for information:

Broadcasting your logs with RabbitMQ and Python
Rabbits and Warrens
RabbitMQ FAQ section “Messaging Concepts: Exchanges

Share this:

  • Click to share on Twitter (Opens in new window)
  • Click to share on Reddit (Opens in new window)
  • Click to share on Tumblr (Opens in new window)
  • Click to share on Facebook (Opens in new window)

Contact Me

You should follow me on Twitter

Recent Posts

  • User Activation With Django and Djoser
  • Python Selenium Webdriver Notes
  • On Keeping A Journal and Journaling
  • What Geeks Could Learn From Working In Restaurants
  • What I’ve Been Up To
  • PyCon Talk Proposals: All You Need to Know And More
  • Sending Alerts With Graphite Graphs From Nagios
  • The Python User Group in Princeton (PUG-IP): 6 months in
  • The Happy Idiot
  • pyrabbit Makes Testing and Managing RabbitMQ Easy

Categories

  • Apple
  • Big Ideas
  • Books
  • CodeKata
  • Database
  • Django
  • Freelancing
  • Hacks
  • journaling
  • Leadership
  • Linux
  • LinuxLaboratory
  • Loghetti
  • Me stuff
  • Other Cool Blogs
  • PHP
  • Productivity
  • Python
  • PyTPMOTW
  • Ruby
  • Scripting
  • Sysadmin
  • Technology
  • Testing
  • Uncategorized
  • Web Services
  • Woodworking

Archives

  • May 2021
  • December 2020
  • January 2014
  • September 2012
  • August 2012
  • February 2012
  • November 2011
  • October 2011
  • June 2011
  • April 2011
  • February 2011
  • January 2011
  • December 2010
  • November 2010
  • September 2010
  • July 2010
  • June 2010
  • May 2010
  • April 2010
  • March 2010
  • February 2010
  • January 2010
  • December 2009
  • November 2009
  • October 2009
  • September 2009
  • August 2009
  • July 2009
  • June 2009
  • May 2009
  • April 2009
  • March 2009
  • February 2009
  • January 2009
  • December 2008
  • November 2008
  • October 2008
  • September 2008
  • August 2008
  • July 2008
  • June 2008
  • May 2008
  • April 2008
  • March 2008
  • February 2008
  • January 2008
  • December 2007
  • November 2007
  • October 2007
  • September 2007
  • August 2007
  • July 2007
  • June 2007
  • May 2007
  • April 2007
  • March 2007
  • February 2007
  • January 2007
  • December 2006
  • November 2006
  • September 2006
  • August 2006
  • July 2006
  • June 2006
  • April 2006
  • March 2006
  • February 2006
  • January 2006
  • December 2005
  • November 2005
  • October 2005
  • September 2005
  • August 2005
  • July 2005
  • June 2005
  • May 2005
  • April 2005
  • March 2005
  • February 2005
  • January 2005
  • December 2004
  • November 2004
  • October 2004
  • September 2004
  • August 2004
© 2023 Musings of an Anonymous Geek | Powered by Minimalist Blog WordPress Theme