[LRUG] Queuing systems

Sean O'Halpin sean.ohalpin at gmail.com
Wed Sep 7 08:04:26 PDT 2011


On Wed, Sep 7, 2011 at 10:14 AM, Neil Middleton
<neil.middleton at gmail.com> wrote:
> Simply put - I've got a requirement to capture a crap load of data from an
> API and stuff it all into a queue to be processed later on.  We would be
> receiving hundreds of new items per second, over a sustained period.
> We're looking at hosting on EngineYard so ideally need to stick with what's
> available there.
> We cannot really lose any of the received data before processing.
> Any ideas?
> Neil
>

Hi,

There's no simple answer to this but I can give you some advice (based
on an entirely different constraints but what the heck).

The first thing is to question whether you really need to process
absolutely everything or whether there is an acceptable level of data
loss.
For example, in the twitter processing pipeline I presented at LRUG
(thanks for the feedback Mr Jaba!) we decided that losing the
occasional tweet was no big deal as we were only going for aggregated
results to rank BBC links.
That meant we could get away with not using persistence along the
pipeline. A simple test showed that our setup could sustain up to 20K
tweets/sec at some points.
Using persistence slowed that down to about 2K/sec (using rabbitmq and
the amqp eventmachine and bunny gems with acks).

That "at some points" is important. Your pipeline will be able to
sustain throughput only at the rate of the slowest stage in it. You
need to parallelize as much as possible. Don't forget your backend.
You need to consider the lifetime of the data from the beginning to
the end of its life. It's great if you manage to do all the processing
at the front end but if you end up with a huge database table you
can't process in any reasonable amount of time, you're out of luck.

There are multiple points of failure: where the data comes into the
system (the ingest), the processes manipulating the data (message
handlers), the message broker itself and any systems such as databases
that your overall system depends on.

The most difficult one to address is the ingest, i.e. where you are
consuming data from the API. If the API does not provide a restart
mechanism (like the Twitter firehose does for example), then you'll
have a hard time recovering from losing the process reading from it.
One thing you can do to mitigate the problem is to do as little as
possible at the ingest stage. It should concentrate on grabbing the
data and chucking it onto your internal system as fast as possible
where you can engineer more resilience. The less it does, the less
there is to go wrong. Once you've got data onto your message queue,
you can persist it then use as many processes as you like to do the
work. Still, you need to ask the question: can the application survive
the loss of some data from the API? If the answer is yes, then
reconsider your need to process absolutely everything.

Using persistent queues and acks will get you a high level of
resilience to message handler failure and to some degree the failure
of the message broker itself. But you are still likely to lose data if
your single rabbitmq instance goes down (e.g. due to a pipeline stall
and erlang running out of memory). So, you'll need to use a cluster of
rabbitmq instances. That's not so straightforward to do in any
'transparent' way. It's easier if your application processes know how
to failover themselves. (But if anyone has a foolproof setup for
transparent clustered rabbitmq instances, I would love to hear from
you.)

As Chris says, you need to design your systems to be idempotent
wherever possible. That enables you to handle duplicate messages
passing through your pipeline. You can't avoid duplication so don't
try. A simple example is where a process has taken too long processing
a message so the broker times out waiting for the ACK so resends the
message to another process. The first process then completes the
processing and sends the data on down the pipeline. The second process
also completes and hey presto a duplicate message in your pipeline.
This is an inherent characteristic of distributing processing so your
system needs to be able to handle it. Idempotence makes it safe to
have multiple processes working in parallel.

Where you have components that cannot be stateless, e.g. where you are
accumulating data for summing, counting, sorting, etc., you need to
handle the persistence yourself, i.e. the message queue won't do this
for you. The simplest way to handle this is to accumulate messages in
a database then send a triggering message to indicate that the data
should be processed. If you do it all in memory, you risk losing the
lot. I recommend that you don't try to roll your own persistence
mechanism (speaking from experience).

Finally, if you are using persistence and ack with resends, you need
to consider the problem of the 'poison letter'. That is a message
which causes the same processing stage to fail repeatedly and so keep
getting resent from the broker. I think AMQP has a header saying how
many times a message has been resent. You could use this to reject
messages over a certain threshold. (Someone please correct me if I'm
mixing AMQP up with something else here.)

That's not everything but I reckon it's enough for now :)

Good luck!

Regards,
Sean



More information about the Chat mailing list