Felhasználói eszközök

Eszközök a webhelyen


tanszek:oktatas:iss_t:rabbitmq

Különbségek

A kiválasztott változat és az aktuális verzió közötti különbségek a következők.

Összehasonlító nézet linkje

Előző változat mindkét oldalon Előző változat
Következő változat
Előző változat
tanszek:oktatas:iss_t:rabbitmq [2023/05/08 08:27]
knehez
tanszek:oktatas:iss_t:rabbitmq [2023/06/27 12:38] (aktuális)
knehez
Sor 13: Sor 13:
  
 <​code>​ <​code>​
-docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management+docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:​management-alpine
 </​code>​ </​code>​
  
Sor 59: Sor 59:
         sender.close_connection()         sender.close_connection()
 </​code>​ </​code>​
 +
 +Let's create the //​quality_message_consumer.py//​ file, and the create statistics:
 +
 +(do not forget to create it in an other terminal, and run //pip install pika// and set the proper IP in //​pika.ConnectionParameters()//​ )
 +
 +<code python>
 +import pika
 + 
 +class QualityConsumer:​
 +    def __init__(self):​
 +        self.connection = pika.BlockingConnection(pika.ConnectionParameters('​10.x.y.z'​))
 +        self.channel = self.connection.channel()
 +        self.channel.queue_declare(queue='​qualityQueue'​)
 +        self.channel.queue_declare(queue='​qualityStatistics'​)
 +        self.message_count = {'​GOOD':​ 0, '​EXCELLENT':​ 0, '​WRONG':​ 0}
 + 
 +    def start_consuming(self):​
 +        def callback(ch,​ method, properties, body):
 +            quality = body.decode()
 +            self.message_count[quality] += 1
 +            print(f'​Received quality: {quality}'​)
 +            if self.is_batch_completed():​
 +                self.send_statistics()
 +                self.reset_message_count()
 + 
 +        self.channel.basic_consume(queue='​qualityQueue',​ on_message_callback=callback,​ auto_ack=True)
 +        self.channel.start_consuming()
 + 
 +    def send_statistics(self):​
 +        for quality, count in self.message_count.items():​
 +            if count > 0:
 +                message = f'​{count} {quality} messages has been processed'​
 +                self.channel.basic_publish(exchange='',​ routing_key='​qualityStatistics',​ body=message)
 +                print(f'​Sent statistics: {message}'​)
 + 
 +    def reset_message_count(self):​
 +        for quality in self.message_count:​
 +            self.message_count[quality] = 0
 + 
 +    def is_batch_completed(self):​
 +        return sum(self.message_count.values()) >= 10
 + 
 +    def close_connection(self):​
 +        self.connection.close()
 + 
 +if __name__ == '​__main__':​
 +    consumer = QualityConsumer()
 +    try:
 +        consumer.start_consuming()
 +    except KeyboardInterrupt:​
 +        consumer.close_connection()
 +</​code>​
 +
 +The third components prints the statistics. Let's create an other instance (terminal) and  create statistics_consumer.py
 +
 +<code python>
 +import pika
 + 
 +# RabbitMQ settings
 +connection = pika.BlockingConnection(pika.ConnectionParameters('​10.x.y.z'​))
 +channel = connection.channel()
 + 
 +channel.queue_declare(queue='​qualityStatistics'​)
 + 
 +def callback(ch,​ method, properties, body):
 +    message = body.decode()
 +    print(f'​{message}'​)
 +    ch.basic_ack(delivery_tag=method.delivery_tag)
 + 
 +channel.basic_consume(queue='​qualityStatistics',​ on_message_callback=callback)
 + 
 +print('​Waiting for quality statistics...'​)
 +channel.start_consuming()
 +</​code>​
 +
tanszek/oktatas/iss_t/rabbitmq.1683534443.txt.gz · Utolsó módosítás: 2023/05/08 08:27 szerkesztette: knehez