Felhasználói eszközök

Eszközök a webhelyen


tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2

Különbségek

A kiválasztott változat és az aktuális verzió közötti különbségek a következők.

Összehasonlító nézet linkje

Előző változat mindkét oldalon Előző változat
Következő változat
Előző változat
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/05/08 07:03]
knehez
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/06/30 11:32] (aktuális)
knehez
Sor 4: Sor 4:
  
   * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a '​qualityQueue'​ üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. ​   * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a '​qualityQueue'​ üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. ​
-  * **Készítsen egy komponenst** amely a '​GOOD',​ '​EXCELLENT'​ és a '​WRONG'​ üzeneteket leolvassa a qualityQueue sorról. Minden 10 megkapott üzenet után a '​qualityStatistics'​ sorra küld egy üzenetet, ​ami azt jelzi, hogy 10 (adott minőségű) üzenetet ​feldolgoztott+  * **Készítsen egy komponenst** amely a '​GOOD',​ '​EXCELLENT'​ és a '​WRONG'​ üzeneteket leolvassa a qualityQueue sorról ​és gyűjti. Minden 10 megkapott ​azonos ​üzenet után a '​qualityStatistics'​ sorra küld egy üzenetet, ​amiben ​azt jelzi, hogy 10 (adott minőségű) üzenetet ​feldolgozott
   * **Készítsen egy második klienst**, ami a '​qualityStatistics'​ sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 '​WRONG'​ messages has been processed'​   * **Készítsen egy második klienst**, ami a '​qualityStatistics'​ sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 '​WRONG'​ messages has been processed'​
  
-A fenti feladatot a http://​docker.iit.uni-miskolc.hu-n keretrendszerben oldjuk meg.+A fenti feladatot a http://​docker.iit.uni-miskolc.hu keretrendszerben oldjuk meg.
  
 === RabbitMQ indítása docker-ben === === RabbitMQ indítása docker-ben ===
Sor 14: Sor 14:
  
 <​code>​ <​code>​
-docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management+docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:​management-alpine
 </​code>​ </​code>​
  
-A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben.+A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben ​és a feldolgozóban.
  
 Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot:
Sor 23: Sor 23:
 <​code>​ <​code>​
 pip install pika pip install pika
-</>+</code>
  
 Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja.
  
 Hozzuk létre a quality_message_sender.py-t:​ Hozzuk létre a quality_message_sender.py-t:​
 +
 +Használjuk a megfelelő IP-t a //​__init__(self)://​ ben
  
 <code python> <code python>
Sor 34: Sor 36:
 import time import time
  
-qualities ​['EXCELLENT', '​GOOD', ​'WRONG']+class QualitySender:​ 
 +    def __init__(self):​ 
 +        self.connection ​pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) 
 +        self.channel = self.connection.channel() 
 +        self.channel.queue_declare(queue='qualityQueue')
  
-# RabbitMQ settings +    def start_sending(self):​ 
-connection ​pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) +        qualities = ['​GOOD',​ '​EXCELLENT',​ '​WRONG'​] 
-channel = connection.channel()+        while True: 
 +            ​quality ​random.choice(qualities) 
 +            self.channel.basic_publish(exchange=''​, routing_key='​qualityQueue',​ body=quality) 
 +            print(f'​Sent quality: {quality}'​
 +            time.sleep(1)
  
-channel.queue_declare(queue='​qualityQueue'​)+    def close_connection(self):​ 
 +        self.connection.close()
  
-while True: +if __name__ ​== '__main__': 
-    quality ​random.choice(qualities) +    ​sender = QualitySender() 
-    message ​f'{quality}+    ​try
-    ​channel.basic_publish(exchange='',​ routing_key='​qualityQueue',​ body=message+        ​sender.start_sending() 
-    ​print(f'​Sent message{message}'​) +    ​except KeyboardInterrupt:​ 
-    time.sleep(1+        ​sender.close_connection()
- +
-connection.close()+
  
 </​code>​ </​code>​
  
-Állítsuk be a megfelelő IP címet a //​pika.ConnectionParameters()//​ függvénynél.+A második komponenshez indítsunk egy új konzolt:
  
-Indítsunk egy másik terminált, futtassuk a //'pip install pika'// parancsot. Majd hozzuk létre ​//​quality_consumer.py//​ állományt az alábbi kóddal.+//__init__(self):​// konstruktorban állítsuk be rabbitMQ szerver IP címét
  
-A //pika.ConnectionParameters()//-t értelem szerűen állítsuk be.+<code python>​ 
 +import pika 
 + 
 +class QualityConsumer:​ 
 +    def __init__(self):​ 
 +        self.connection = pika.BlockingConnection(pika.ConnectionParameters('​localhost'​)
 +        self.channel = self.connection.channel() 
 +        self.channel.queue_declare(queue='​qualityQueue'​) 
 +        self.channel.queue_declare(queue='​qualityStatistics'​) 
 +        self.message_count = {'​GOOD':​ 0, '​EXCELLENT':​ 0, '​WRONG':​ 0} 
 + 
 +    def start_consuming(self):​ 
 +        def callback(ch,​ method, properties, body): 
 +            quality = body.decode() 
 +            self.message_count[quality] += 1 
 +            print(f'​Received quality: {quality}'​) 
 +            if self.is_batch_completed():​ 
 +                self.send_statistics() 
 +                self.reset_message_count() 
 + 
 +        self.channel.basic_consume(queue='​qualityQueue',​ on_message_callback=callback,​ auto_ack=True) 
 +        self.channel.start_consuming() 
 + 
 +    def send_statistics(self):​ 
 +        for quality, count in self.message_count.items():​ 
 +            if count > 0: 
 +                message = f'​{count} {quality} messages has been processed'​ 
 +                self.channel.basic_publish(exchange='',​ routing_key='​qualityStatistics',​ body=message) 
 +                print(f'​Sent statistics: {message}'​) 
 + 
 +    def reset_message_count(self):​ 
 +        for quality in self.message_count:​ 
 +            self.message_count[quality] = 0 
 + 
 +    def is_batch_completed(self):​ 
 +        return sum(self.message_count.values()) >= 10 
 + 
 +    def close_connection(self):​ 
 +        self.connection.close() 
 + 
 +if __name__ == '​__main__':​ 
 +    consumer = QualityConsumer() 
 +    try: 
 +        consumer.start_consuming() 
 +    except KeyboardInterrupt:​ 
 +        consumer.close_connection() 
 + 
 +</​code>​ 
 + 
 +Készítsük el a statisztika kiírását egy új konzolban:
  
 <code python> <code python>
Sor 69: Sor 127:
  
 def callback(ch,​ method, properties, body): def callback(ch,​ method, properties, body):
-    ​quality ​= body.decode() +    ​message ​= body.decode() 
-    print(f'​{qualitymessages has been processed')+    print(f'​{message}')
     ch.basic_ack(delivery_tag=method.delivery_tag)     ch.basic_ack(delivery_tag=method.delivery_tag)
  
Sor 77: Sor 135:
 print('​Waiting for quality statistics...'​) print('​Waiting for quality statistics...'​)
 channel.start_consuming() channel.start_consuming()
- 
 </​code>​ </​code>​
  
  
 +**Feladat:​**
  
 +A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit.
  
  
tanszek/oktatas/informacios_rendszerek_integralasa/uezenetsorok-rabbitmq_2.1683529435.txt.gz · Utolsó módosítás: 2023/05/08 07:03 szerkesztette: knehez