Блог веб-программиста

27Авг/110

Бесконечная очередь и отказ от сообщений в RabbitMQ + Thumper + PHP AMPQlib

По работе столкнулся с одной задачей и решил использовать сервер очереди сообщений RabbitMQ в связке с PHP 5.3 через PHP-ampqlib и библиотеку Thumper.

Чтобы сделать Consumer(worker), который будет обрабатывать бесконечную очередь надо задать $consumer->consume(-1);. Тогда в цикле Thumper, где проверяется нужно ли нам выходить всё будет хорошо и наш обработчик будет работать вечно.

Но появился ещё один интересный вопрос - что делать, если внутри функции обработки сообщения произошел Exception? Я решил, что тогда мы отказываемся от этого сообщение и оно идет к другому обработчику. Реализовывать это лучше расширением класса Consumer:

class MyConsumer extends Consumer {
	public function processMessage($msg) {
		try {
			parent::processMessage($msg);
		} catch (Exception $e) {
			echo "Message rejected due to exception.\n".$e->getMessage()."\n";
			$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'],true);
			//throw $e;
		}
	}
}

Соответственно ваш Consumer должен быть объектом класса MyConsumer.
Если же всё таки нужно кинуть этот Exception выше - убираем комментарий со строки throw $e;

Связано с категорией: Code Нет комментариев