Бесконечная очередь и отказ от сообщений в RabbitMQ + Thumper + PHP AMPQlib
По работе столкнулся с одной задачей и решил использовать сервер очереди сообщений RabbitMQ в связке с PHP 5.3 через PHP-ampqlib и библиотеку Thumper.
Чтобы сделать Consumer(worker), который будет обрабатывать бесконечную очередь надо задать $consumer->consume(-1);. Тогда в цикле Thumper, где проверяется нужно ли нам выходить всё будет хорошо и наш обработчик будет работать вечно.
Но появился ещё один интересный вопрос - что делать, если внутри функции обработки сообщения произошел Exception? Я решил, что тогда мы отказываемся от этого сообщение и оно идет к другому обработчику. Реализовывать это лучше расширением класса Consumer:
class MyConsumer extends Consumer {
public function processMessage($msg) {
try {
parent::processMessage($msg);
} catch (Exception $e) {
echo "Message rejected due to exception.\n".$e->getMessage()."\n";
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'],true);
//throw $e;
}
}
}
Соответственно ваш Consumer должен быть объектом класса MyConsumer.
Если же всё таки нужно кинуть этот Exception выше - убираем комментарий со строки throw $e;
