Class AsyncConsumer¶
Defined in File rmq.py
Inheritance Relationships¶
Base Type¶
public object
Derived Type¶
public rmq.AsyncFanOutConsumer(Class AsyncFanOutConsumer)
Class Documentation¶
- rmq.AsyncConsumer : public object
Asynchronous RMQ consumer. AsyncConsumer handles unexpected interactions with RabbitMQ such as channel and connection closures. AsyncConsumer can receive messages but cannot send messages.
Subclassed by rmq.AsyncFanOutConsumer
Public Functions
- __init__(self, str host, int port, str vhost, str user, str password, str cert, str queue, int prefetch_count=1, Optional[Callable] on_message_cb=None, Optional[Callable] on_close_cb=None, Optional[logging.Logger] logger=None)¶
Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ. :param str credentials: The credentials file in JSON :param str cacert: The TLS certificate :param str queue: The queue to listen to :param Callable: on_message_cb this function will be called each time Pika receive a message :param Callable: on_close_cb this function will be called when Pika will close the connection :param int: prefetch_count Define consumer throughput, should be relative to resource and number of messages expected
- __enter__(self)¶
- __exit__(self, exc_type, exc_val, exc_tb)¶
- connection_params(self)¶
Create the pika credentials using TLS needed to connect to RabbitMQ. :rtype: pika.ConnectionParameters
- connect(self)¶
This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika. :rtype: pika.SelectConnection
- close_connection(self)¶
- on_connection_open(self, connection)¶
This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in case we need it, but in this case, we'll just mark it unused. :param pika.SelectConnection _unused_connection: The connection
- on_connection_open_error(self, _unused_connection, err)¶
This method is called by pika if the connection to RabbitMQ can't be established. :param pika.SelectConnection _unused_connection: The connection :param Exception err: The error
- on_connection_closed(self, _unused_connection, reason)¶
This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. :param pika.connection.Connection connection: The closed connection obj :param Exception reason: exception representing reason for loss of connection.
- reconnect(self)¶
Will be invoked if the connection can't be opened or is closed. Indicates that a reconnect is necessary then stops the ioloop.
- open_channel(self)¶
Open a new channel with RabbitMQ by issuing the Channel.Open RPC command. When RabbitMQ responds that the channel is open, the on_channel_open callback will be invoked by pika.
- on_channel_open(self, channel)¶
This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it. Since the channel is now open, we'll declare the exchange to use. :param pika.channel.Channel channel: The channel object
- add_on_channel_close_callback(self)¶
This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.
- on_channel_closed(self, channel, reason)¶
Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters. In this case, we'll close the connection to shutdown the object. :param pika.channel.Channel: The closed channel :param Exception reason: why the channel was closed
- setup_queue(self, queue_name)¶
Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_queue_declareok method will be invoked by pika. :param str|unicode queue_name: The name of the queue to declare.
- on_queue_declareok(self, _unused_frame, userdata)¶
Method invoked by pika when the Queue.Declare RPC call made in setup_queue has completed. In this method we will bind the queue and exchange together with the routing key by issuing the Queue.Bind RPC command. When this command is complete, the on_bindok method will be invoked by pika. :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame :param str|unicode userdata: Extra user data (queue name)
- set_qos(self)¶
This method sets up the consumer prefetch to only be delivered one message at a time. The consumer must acknowledge this message before RabbitMQ will deliver another one. You should experiment with different prefetch values to achieve desired performance.
- on_basic_qos_ok(self, _unused_frame)¶
Invoked by pika when the Basic.QoS method has completed. At this point we will start consuming messages by calling start_consuming which will invoke the needed RPC commands to start the process. :param pika.frame.Method _unused_frame: The Basic.QosOk response frame
- start_consuming(self)¶
This method sets up the consumer by first calling add_on_cancel_callback so that the object is notified if RabbitMQ cancels the consumer. It then issues the Basic.Consume RPC command which returns the consumer tag that is used to uniquely identify the consumer with RabbitMQ. We keep the value to use it when we want to cancel consuming. The on_message method is passed in as a callback pika will invoke when a message is fully received.
- add_on_cancel_callback(self)¶
Add a callback that will be invoked if RabbitMQ cancels the consumer for some reason. If RabbitMQ does cancel the consumer, on_consumer_cancelled will be invoked by pika.
- on_consumer_cancelled(self, method_frame)¶
Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages. :param pika.frame.Method method_frame: The Basic.Cancel frame
- on_message(self, _unused_channel, method_frame, properties, body)¶
Invoked by pika when a message is delivered from RabbitMQ. The channel is passed for your convenience. The method_frame object that is passed in carries the exchange, routing key, delivery tag and a redelivered flag for the message. The properties passed in is an instance of BasicProperties with the message properties and the body is the message that was sent. :param pika.channel.Channel _unused_channel: The channel object :param pika.Spec.Basic.Deliver: method_frame method :param pika.Spec.BasicProperties: properties :param bytes body: The message body
- acknowledge_message(self, delivery_tag)¶
Acknowledge the message delivery from RabbitMQ by sending a Basic.Ack RPC method for the delivery tag. :param int delivery_tag: The delivery tag from the Basic.Deliver frame
- stop_consuming(self)¶
Tell RabbitMQ that you would like to stop consuming by sending the Basic.Cancel RPC command.
- on_cancelok(self, _unused_frame, userdata)¶
This method is invoked by pika when RabbitMQ acknowledges the cancellation of a consumer. At this point we will close the channel. This will invoke the on_channel_closed method once the channel has been closed, which will in-turn close the connection. :param pika.frame.Method _unused_frame: The Basic.CancelOk frame :param str|unicode userdata: Extra user data (consumer tag)
- close_channel(self)¶
Call to close the channel with RabbitMQ cleanly by issuing the Channel.Close RPC command.
- run(self)¶
Run the example consumer by connecting to RabbitMQ and then starting the IOLoop to block and allow the SelectConnection to operate.
- stop(self)¶
Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok will be invoked by pika, which will then closing the channel and connection.