Бесконечная очередь и отказ от сообщений в RabbitMQ + Thumper + PHP AMPQlib
По работе столкнулся с одной задачей и решил использовать сервер очереди сообщений RabbitMQ в связке с PHP 5.3 через PHP-ampqlib и библиотеку Thumper.
Чтобы сделать Consumer(worker), который будет обрабатывать бесконечную очередь надо задать $consumer->consume(-1);. Тогда в цикле Thumper, где проверяется нужно ли нам выходить всё будет хорошо и наш обработчик будет работать вечно.
Но появился ещё один интересный вопрос - что делать, если внутри функции обработки сообщения произошел Exception? Я решил, что тогда мы отказываемся от этого сообщение и оно идет к другому обработчику. Реализовывать это лучше расширением класса Consumer:
class MyConsumer extends Consumer { public function processMessage($msg) { try { parent::processMessage($msg); } catch (Exception $e) { echo "Message rejected due to exception.\n".$e->getMessage()."\n"; $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'],true); //throw $e; } } }
Соответственно ваш Consumer должен быть объектом класса MyConsumer.
Если же всё таки нужно кинуть этот Exception выше - убираем комментарий со строки throw $e;