Felhasználói eszközök

Eszközök a webhelyen


tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2

Különbségek

A kiválasztott változat és az aktuális verzió közötti különbségek a következők.

Összehasonlító nézet linkje

Előző változat mindkét oldalon Előző változat
Következő változat
Előző változat
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/05/08 06:49]
knehez
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/06/30 11:32] (aktuális)
knehez
Sor 4: Sor 4:
  
   * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a '​qualityQueue'​ üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. ​   * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a '​qualityQueue'​ üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. ​
-  * **Készítsen egy komponenst** amely a '​GOOD',​ '​EXCELLENT'​ és a '​WRONG'​ üzeneteket leolvassa a qualityQueue sorról. Minden 10 megkapott üzenet után a '​qualityStatistics'​ sorra küld egy üzenetet, ​ami azt jelzi, hogy 10 (adott minőségű) üzenetet ​feldolgoztott+  * **Készítsen egy komponenst** amely a '​GOOD',​ '​EXCELLENT'​ és a '​WRONG'​ üzeneteket leolvassa a qualityQueue sorról ​és gyűjti. Minden 10 megkapott ​azonos ​üzenet után a '​qualityStatistics'​ sorra küld egy üzenetet, ​amiben ​azt jelzi, hogy 10 (adott minőségű) üzenetet ​feldolgozott
   * **Készítsen egy második klienst**, ami a '​qualityStatistics'​ sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 '​WRONG'​ messages has been processed'​   * **Készítsen egy második klienst**, ami a '​qualityStatistics'​ sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 '​WRONG'​ messages has been processed'​
  
-A fenti feladatot a docker.iit.uni-miskolc.hu-n oldjuk meg.+A fenti feladatot a http://docker.iit.uni-miskolc.hu ​keretrendszerben ​oldjuk meg.
  
 +=== RabbitMQ indítása docker-ben ===
  
 +A feladat megoldásához több instance-t (konzolt) érdemes indítani. Az első konzol fogja a rabbitMQ szervert indítani. Adjunk hozzá egy konzolt (node 1) és futtassuk a következő parancsot:
  
 +<​code>​
 +docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:​management-alpine
 +</​code>​
 +
 +A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben és a feldolgozóban.
 +
 +Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot:
 +
 +<​code>​
 +pip install pika
 +</​code>​
 +
 +Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja.
 +
 +Hozzuk létre a quality_message_sender.py-t:​
 +
 +Használjuk a megfelelő IP-t a //​__init__(self)://​ ben
 +
 +<code python>
 +import pika
 +import random
 +import time
 +
 +class QualitySender:​
 +    def __init__(self):​
 +        self.connection = pika.BlockingConnection(pika.ConnectionParameters('​10.x.y.z'​))
 +        self.channel = self.connection.channel()
 +        self.channel.queue_declare(queue='​qualityQueue'​)
 +
 +    def start_sending(self):​
 +        qualities = ['​GOOD',​ '​EXCELLENT',​ '​WRONG'​]
 +        while True:
 +            quality = random.choice(qualities)
 +            self.channel.basic_publish(exchange='',​ routing_key='​qualityQueue',​ body=quality)
 +            print(f'​Sent quality: {quality}'​)
 +            time.sleep(1)
 +
 +    def close_connection(self):​
 +        self.connection.close()
 +
 +if __name__ == '​__main__':​
 +    sender = QualitySender()
 +    try:
 +        sender.start_sending()
 +    except KeyboardInterrupt:​
 +        sender.close_connection()
 +
 +</​code>​
 +
 +A második komponenshez indítsunk egy új konzolt:
 +
 +A //​__init__(self)://​ konstruktorban állítsuk be a rabbitMQ szerver IP címét
 +
 +<code python>
 +import pika
 +
 +class QualityConsumer:​
 +    def __init__(self):​
 +        self.connection = pika.BlockingConnection(pika.ConnectionParameters('​localhost'​))
 +        self.channel = self.connection.channel()
 +        self.channel.queue_declare(queue='​qualityQueue'​)
 +        self.channel.queue_declare(queue='​qualityStatistics'​)
 +        self.message_count = {'​GOOD':​ 0, '​EXCELLENT':​ 0, '​WRONG':​ 0}
 +
 +    def start_consuming(self):​
 +        def callback(ch,​ method, properties, body):
 +            quality = body.decode()
 +            self.message_count[quality] += 1
 +            print(f'​Received quality: {quality}'​)
 +            if self.is_batch_completed():​
 +                self.send_statistics()
 +                self.reset_message_count()
 +
 +        self.channel.basic_consume(queue='​qualityQueue',​ on_message_callback=callback,​ auto_ack=True)
 +        self.channel.start_consuming()
 +
 +    def send_statistics(self):​
 +        for quality, count in self.message_count.items():​
 +            if count > 0:
 +                message = f'​{count} {quality} messages has been processed'​
 +                self.channel.basic_publish(exchange='',​ routing_key='​qualityStatistics',​ body=message)
 +                print(f'​Sent statistics: {message}'​)
 +
 +    def reset_message_count(self):​
 +        for quality in self.message_count:​
 +            self.message_count[quality] = 0
 +
 +    def is_batch_completed(self):​
 +        return sum(self.message_count.values()) >= 10
 +
 +    def close_connection(self):​
 +        self.connection.close()
 +
 +if __name__ == '​__main__':​
 +    consumer = QualityConsumer()
 +    try:
 +        consumer.start_consuming()
 +    except KeyboardInterrupt:​
 +        consumer.close_connection()
 +
 +</​code>​
 +
 +Készítsük el a statisztika kiírását egy új konzolban:
 +
 +<code python>
 +import pika
 +
 +# RabbitMQ settings
 +connection = pika.BlockingConnection(pika.ConnectionParameters('​10.x.y.z'​))
 +channel = connection.channel()
 +
 +channel.queue_declare(queue='​qualityStatistics'​)
 +
 +def callback(ch,​ method, properties, body):
 +    message = body.decode()
 +    print(f'​{message}'​)
 +    ch.basic_ack(delivery_tag=method.delivery_tag)
 +
 +channel.basic_consume(queue='​qualityStatistics',​ on_message_callback=callback)
 +
 +print('​Waiting for quality statistics...'​)
 +channel.start_consuming()
 +</​code>​
 +
 +
 +**Feladat:​**
 +
 +A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit.
  
  
tanszek/oktatas/informacios_rendszerek_integralasa/uezenetsorok-rabbitmq_2.1683528566.txt.gz · Utolsó módosítás: 2023/05/08 06:49 szerkesztette: knehez