Simple Queue on mongodb
Mongodb is interesting. For someone like me who is entirely into memcached, mongodb is a wonderful {key, value} kinda-datastore with fabulous capabilities. (psst.. in reality it is not a {key, value} store.)
I was playing around with it and suddenly wanted to see what are the various types of data structure that can be implemented on top the mongodb. For example, say, a queue; how to implement a queue with mongodb as the back end?
Let us state the requirement to fixate on what I mean by “queue on mongodb:”
- Support multiple queues
- Append a message to the tail of a queue – Enqueue
- Consume a message from the head of a queue – Dequeue
We can complicate our requirements but for now let this be. Let us elaborate the requirements using an example, let us consider 2 queues, say, q1 and q2. So, if:
q1.enqueue('first message') q1.enqueue('second message') q1.enqueue('third message')
should give:
q1 = [ 'first message' , 'second message', 'third message' ]
Similarly for q2:
q2 = ['queue2 first message', 'queue2 second message']
And, then q1.dequeue() should return ‘first message‘ and q2.dequeue() should return ‘queue2 first message‘.
With my limited experience, I found 2 ways to do it.
Each queue is a collection
This is the very first approach that came to my mind. A primitive approach, in fact. The steps to create is as follows:
- create a collection per queue.
- (enqueue) insert the message to the collection.
- (dequeue) using the ‘atomic’ findandmodify pop the oldest message from the collection.
Enqueue
This operation is simple.
> db.q1.insert({'val': 'first message'}) > db.q1.insert({'val': 'second message'}) > db.q1.insert({'val': 'third message'})
And, this is how it is stored internally. Each message becomes a document in the collection.
> db.q1.find() { "_id" : ObjectId("51d2baaa3402f8b2dec93888"), "val" : "first message" } { "_id" : ObjectId("51d2bab73402f8b2dec93889"), "val" : "second message" } { "_id" : ObjectId("51d2babe3402f8b2dec9388a"), "val" : "third message" }
Similarly for the 2nd queue.
> db.q2.insert({'val': 'queue2 first message'}) > db.q2.insert({'val': 'queue2 second message'}) > db.q2.find() { "_id" : ObjectId("51d2bafd3402f8b2dec9388b"), "val" : "queue2 first message" } { "_id" : ObjectId("51d2bb013402f8b2dec9388c"), "val" : "queue2 second message" }
So, the mongodb collections are:
> show collections q1 q2
Dequeue
This is again straightforward. Use the ‘findAndModify’ to pop & then remove the element.
> db.q1.findAndModify({remove: {$pop: {$val:-1}}}) { "_id" : ObjectId("51d2baaa3402f8b2dec93888"), "val" : "first message" }
Verification
db.q1.find() { "_id" : ObjectId("51d2bab73402f8b2dec93889"), "val" : "second message" } { "_id" : ObjectId("51d2babe3402f8b2dec9388a"), "val" : "third message" }
Voila! A simple queue is implemented. 🙂
Pros
- The performance should be better esp in a concurrent access use-case.
- Each message enqueued/dequeued can be of a very high size.
Cons
- Too many collections.
- Too many indexed entries. Since, we will never search each document indexing them appears a waste of an effort.
Single collection & each queue is a document
Since mongodb supports arrays I considered a different (not really complex) approach. In here:
- create a single collection for all the queue, say, q.
- for each queue, create a new document.
- (enqueue) push the message into the corresponding document of this common collection (queues)
- (dequeue) using the ‘atomic’ findandmodify pop the oldest message from corresponding document of this common collection (queues)
Create
First create a document for each of the queues:
> db.q.insert({name: 'q1'}) > db.q.insert({name: 'q2'})
This means there is just one collection with 2 documents:
> show collections q > db.q.find() { "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "name" : "q1" } { "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "name" : "q2" }
Enqueue
Once the queue is created, each message is appended to an array inside the document named after the queue. So,
> db.q.update( {name: 'q1'}, {$push: {elem: 'first message'}} ) > db.q.update( {name: 'q1'}, {$push: {elem: 'second message'}} ) > db.q.update( {name: 'q1'}, {$push: {elem: 'third message'}} )
To list only the elements of queue named ‘q1’:
> db.q.find({'name':'q1'}) { "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" }
So what is the state of the entire collection? The second queue named ‘q2’ is available but no entry enqueued under its name.
> db.q.find() { "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "name" : "q2" } { "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" }
So, now to q2:
> db.q.update( {name: 'q2'}, {$push: {elem: 'queue2 first message'}} ) > db.q.update( {name: 'q2'}, {$push: {elem: 'queue2 second message'}} ) > db.q.find({'name':'q2'}) { "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "elem" : [ "queue2 first message", "queue2 second message" ], "name" : "q2" }
The entire collection reflects this:
> db.q.find() { "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" } { "_id" : ObjectId("51d2bf513402f8b2dec9388e"), "elem" : [ "queue2 first message", "queue2 second message" ], "name" : "q2" }
Dequeue
Very similar as the first approach, just an addition here would be the reference to the queue name ‘q1’ or ‘q2’.
> db.q.findAndModify( {query: {name: 'q1'}, update: {$pop: {elem: -1}}} ) { "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "first message", "second message", "third message" ], "name" : "q1" }
Unfortunately it is not easy to realize from the output what happened because it prints the entire document. But, if we check for the document, we see:
> db.q.find({'name':'q1'}) { "_id" : ObjectId("51d2bf353402f8b2dec9388d"), "elem" : [ "second message", "third message" ], "name" : "q1" }
So, voila! A simple is created, again!! 🙂
Pros
- Only one collection.
- Number of indices are same the number of queues, which is definitely a better number than the number of messages in the queue system.
Cons
- Number of entries per queue is limited by the size of a document (16MB, I think)
- Performance might be hit as each lookup, even for different queues, reach the same collection.
Bonus
Since in the last approach we couldn’t understand the output of the mongo shell, I tested the same from within python. Here is the unexplained output:
>>> import pymongo >>> c = pymongo.Connection() >>> db = c['test'] >>> db['q'].find_and_modify(query={'name':'q1'}, update={'$pop': {'elem':-1}})['elem'] [u'first message', u'second message', u'third message'] >>> db['q'].find({'name':'q1'}).next() {u'_id': ObjectId('51d2c2973402f8b2dec9388f'), u'elem': [u'second message', u'third message'], u'name': u'q1'} >>> db['q'].find_and_modify(query={'name':'q1'}, update={'$pop': {'elem':-1}})['elem'][0] u'second message' >>> db['q'].find({'name':'q1'}).next() {u'_id': ObjectId('51d2c2973402f8b2dec9388f'), u'elem': [u'third message'], u'name': u'q1'}
Leave a Reply