Simple Queue on mongodb

Mongodb is interesting. For someone like me who is entirely into memcached, mongodb is a wonderful {key, value} kinda-datastore with fabulous capabilities. (psst.. in reality it is not a {key, value} store.)

I was playing around with it and suddenly wanted to see what are the various types of data structure that can be implemented on top the mongodb. For example, say, a queue; how to implement a queue with mongodb as the back end?

Let us state the requirement to fixate on what I mean by “queue on mongodb:”

  1. Support multiple queues
  2. Append a message to the tail of a queue – Enqueue
  3. Consume a message from the head of a queue – Dequeue

We can complicate our requirements but for now let this be. Let us elaborate the requirements using an example, let us consider 2 queues, say, q1 and q2. So, if:

q1.enqueue('first message')
q1.enqueue('second message')
q1.enqueue('third message')

should give:

q1 = [ 'first message' , 'second message', 'third message' ]

Similarly for q2:

q2 = ['queue2 first message', 'queue2 second message']

And, then q1.dequeue() should return ‘first message‘ and q2.dequeue() should return ‘queue2 first message‘.

With my limited experience, I found 2 ways to do it.

Each queue is a collection

This is the very first approach that came to my mind. A primitive approach, in fact. The steps to create is as follows:

  • create a collection per queue.
  • (enqueue) insert the message to the collection. 
  • (dequeue) using the ‘atomic’ findandmodify pop the oldest message from the collection.
Enqueue

This operation is simple.

> db.q1.insert({'val': 'first message'})
> db.q1.insert({'val': 'second message'})
> db.q1.insert({'val': 'third message'})

And, this is how it is stored internally. Each message becomes a document in the collection.

> db.q1.find()
{ "_id" : ObjectId("51d2baaa3402f8b2dec93888"), "val" : "first message" }
{ "_id" : ObjectId("51d2bab73402f8b2dec93889"), "val" : "second message" }
{ "_id" : ObjectId("51d2babe3402f8b2dec9388a"), "val" : "third message" }

Similarly for the 2nd queue.

> db.q2.insert({'val': 'queue2 first message'})
> db.q2.insert({'val': 'queue2 second message'})
> db.q2.find()
{ "_id" : ObjectId("51d2bafd3402f8b2dec9388b"), "val" : "queue2 first message" }
{ "_id" : ObjectId("51d2bb013402f8b2dec9388c"), "val" : "queue2 second message" }

So, the mongodb collections are:

> show collections
q1
q2
Dequeue

This is again straightforward. Use the ‘findAndModify’ to pop & then remove the element.

> db.q1.findAndModify({remove: {$pop: {$val:-1}}})
{ "_id" : ObjectId("51d2baaa3402f8b2dec93888"), "val" : "first message" }
Verification
db.q1.find()
{ "_id" : ObjectId("51d2bab73402f8b2dec93889"), "val" : "second message" }
{ "_id" : ObjectId("51d2babe3402f8b2dec9388a"), "val" : "third message" }

Voila! A simple queue is implemented. 🙂

Pros
  1. The performance should be better esp in a concurrent access use-case.
  2. Each message enqueued/dequeued can be of a very high size.
Cons
  1. Too many collections.
  2. Too many indexed entries. Since, we will never search each document indexing them appears a waste of an effort.

Single collection & each queue is a document

Since mongodb supports arrays I considered a different (not really complex) approach. In here:

  • create a single collection for all the queue, say, q.
  • for each queue, create a new document.
  • (enqueue) push the message into the corresponding document of this common collection (queues)
  • (dequeue) using the ‘atomic’ findandmodify pop the oldest message from corresponding document of this common collection (queues)
Create

First create a document for each of the queues:

> db.q.insert({name: 'q1'})
> db.q.insert({name: 'q2'})

This means there is just one collection with 2 documents:

> show collections
q

> db.q.find()
{ "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "name" : "q1" }
{ "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "name" : "q2" }
Enqueue

Once the queue is created, each message is appended to an array inside the document named after the queue. So,

> db.q.update( {name: 'q1'}, {$push: {elem: 'first message'}} )
> db.q.update( {name: 'q1'}, {$push: {elem: 'second message'}} )
> db.q.update( {name: 'q1'}, {$push: {elem: 'third message'}} )

To list only the elements of queue named ‘q1’:

> db.q.find({'name':'q1'})
{ "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" }

So what is the state of the entire collection? The second queue named ‘q2’ is available but no entry enqueued under its name.

> db.q.find()
{ "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "name" : "q2" }
{ "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" }

So, now to q2:

> db.q.update( {name: 'q2'}, {$push: {elem: 'queue2 first message'}} )
> db.q.update( {name: 'q2'}, {$push: {elem: 'queue2 second message'}} )
> db.q.find({'name':'q2'})
{ "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "elem" : [ "queue2 first message", "queue2 second message" ], "name" : "q2" }

The entire collection reflects this:

> db.q.find()
{ "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" }
{ "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "elem" : [ "queue2 first message", "queue2 second message" ], "name" : "q2" }
Dequeue

Very similar as the first approach, just an addition here would be the reference to the queue name ‘q1’ or ‘q2’.

> db.q.findAndModify( {query: {name: 'q1'}, update: {$pop: {elem: -1}}} )
{
    "_id" : ObjectId("51d2bf353402f8b2dec9388d"),
    "elem" : [
        "first message",
        "second message",
        "third message"
    ],
    "name" : "q1"
}

Unfortunately it is not easy to realize from the output what happened because it prints the entire document. But, if we check for the document, we see:

> db.q.find({'name':'q1'})
{ "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "second message", "third message" ], "name" : "q1" }

So, voila! A simple is created, again!! 🙂

Pros
  1. Only one collection.
  2. Number of indices are same the number of queues, which is definitely a better number than the number of messages in the queue system.
Cons
  1. Number of entries per queue is limited by the size of a document (16MB, I think)
  2. Performance might be hit as each lookup, even for different queues, reach the same collection.

Bonus

Since in the last approach we couldn’t understand the output of the mongo shell, I tested the same from within python. Here is the unexplained output:

>>> import pymongo
>>> c = pymongo.Connection()
>>> db = c['test']
>>> db['q'].find_and_modify(query={'name':'q1'}, update={'$pop': {'elem':-1}})['elem']
[u'first message', u'second message', u'third message']

>>> db['q'].find({'name':'q1'}).next()
{u'_id': ObjectId('51d2c2973402f8b2dec9388f'), u'elem': [u'second message', u'third message'], u'name': u'q1'}

>>> db['q'].find_and_modify(query={'name':'q1'}, update={'$pop': {'elem':-1}})['elem'][0]
u'second message'

>>> db['q'].find({'name':'q1'}).next()
{u'_id': ObjectId('51d2c2973402f8b2dec9388f'), u'elem': [u'third message'], u'name': u'q1'}

 

No Comments

Post a Comment

%d bloggers like this: