A kiválasztott változat és az aktuális verzió közötti különbségek a következők.
Előző változat mindkét oldalon Előző változat Következő változat | Előző változat | ||
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/05/08 07:03] knehez |
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/06/30 11:32] (aktuális) knehez |
||
---|---|---|---|
Sor 4: | Sor 4: | ||
* Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a 'qualityQueue' üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. | * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a 'qualityQueue' üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. | ||
- | * **Készítsen egy komponenst** amely a 'GOOD', 'EXCELLENT' és a 'WRONG' üzeneteket leolvassa a qualityQueue sorról. Minden 10 megkapott üzenet után a 'qualityStatistics' sorra küld egy üzenetet, ami azt jelzi, hogy 10 (adott minőségű) üzenetet feldolgoztott. | + | * **Készítsen egy komponenst** amely a 'GOOD', 'EXCELLENT' és a 'WRONG' üzeneteket leolvassa a qualityQueue sorról és gyűjti. Minden 10 megkapott azonos üzenet után a 'qualityStatistics' sorra küld egy üzenetet, amiben azt jelzi, hogy 10 (adott minőségű) üzenetet feldolgozott. |
* **Készítsen egy második klienst**, ami a 'qualityStatistics' sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 'WRONG' messages has been processed' | * **Készítsen egy második klienst**, ami a 'qualityStatistics' sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 'WRONG' messages has been processed' | ||
- | A fenti feladatot a http://docker.iit.uni-miskolc.hu-n keretrendszerben oldjuk meg. | + | A fenti feladatot a http://docker.iit.uni-miskolc.hu keretrendszerben oldjuk meg. |
=== RabbitMQ indítása docker-ben === | === RabbitMQ indítása docker-ben === | ||
Sor 14: | Sor 14: | ||
<code> | <code> | ||
- | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management | + | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management-alpine |
</code> | </code> | ||
- | A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben. | + | A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben és a feldolgozóban. |
Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: | Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: | ||
Sor 23: | Sor 23: | ||
<code> | <code> | ||
pip install pika | pip install pika | ||
- | </> | + | </code> |
Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. | Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. | ||
Hozzuk létre a quality_message_sender.py-t: | Hozzuk létre a quality_message_sender.py-t: | ||
+ | |||
+ | Használjuk a megfelelő IP-t a //__init__(self):// ben | ||
<code python> | <code python> | ||
Sor 34: | Sor 36: | ||
import time | import time | ||
- | qualities = ['EXCELLENT', 'GOOD', 'WRONG'] | + | class QualitySender: |
+ | def __init__(self): | ||
+ | self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) | ||
+ | self.channel = self.connection.channel() | ||
+ | self.channel.queue_declare(queue='qualityQueue') | ||
- | # RabbitMQ settings | + | def start_sending(self): |
- | connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) | + | qualities = ['GOOD', 'EXCELLENT', 'WRONG'] |
- | channel = connection.channel() | + | while True: |
+ | quality = random.choice(qualities) | ||
+ | self.channel.basic_publish(exchange='', routing_key='qualityQueue', body=quality) | ||
+ | print(f'Sent quality: {quality}') | ||
+ | time.sleep(1) | ||
- | channel.queue_declare(queue='qualityQueue') | + | def close_connection(self): |
+ | self.connection.close() | ||
- | while True: | + | if __name__ == '__main__': |
- | quality = random.choice(qualities) | + | sender = QualitySender() |
- | message = f'{quality}' | + | try: |
- | channel.basic_publish(exchange='', routing_key='qualityQueue', body=message) | + | sender.start_sending() |
- | print(f'Sent message: {message}') | + | except KeyboardInterrupt: |
- | time.sleep(1) | + | sender.close_connection() |
- | + | ||
- | connection.close() | + | |
</code> | </code> | ||
- | Állítsuk be a megfelelő IP címet a //pika.ConnectionParameters()// függvénynél. | + | A második komponenshez indítsunk egy új konzolt: |
- | Indítsunk egy másik terminált, futtassuk a //'pip install pika'// parancsot. Majd hozzuk létre a //quality_consumer.py// állományt az alábbi kóddal. | + | A //__init__(self):// konstruktorban állítsuk be a rabbitMQ szerver IP címét |
- | A //pika.ConnectionParameters()//-t értelem szerűen állítsuk be. | + | <code python> |
+ | import pika | ||
+ | |||
+ | class QualityConsumer: | ||
+ | def __init__(self): | ||
+ | self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) | ||
+ | self.channel = self.connection.channel() | ||
+ | self.channel.queue_declare(queue='qualityQueue') | ||
+ | self.channel.queue_declare(queue='qualityStatistics') | ||
+ | self.message_count = {'GOOD': 0, 'EXCELLENT': 0, 'WRONG': 0} | ||
+ | |||
+ | def start_consuming(self): | ||
+ | def callback(ch, method, properties, body): | ||
+ | quality = body.decode() | ||
+ | self.message_count[quality] += 1 | ||
+ | print(f'Received quality: {quality}') | ||
+ | if self.is_batch_completed(): | ||
+ | self.send_statistics() | ||
+ | self.reset_message_count() | ||
+ | |||
+ | self.channel.basic_consume(queue='qualityQueue', on_message_callback=callback, auto_ack=True) | ||
+ | self.channel.start_consuming() | ||
+ | |||
+ | def send_statistics(self): | ||
+ | for quality, count in self.message_count.items(): | ||
+ | if count > 0: | ||
+ | message = f'{count} {quality} messages has been processed' | ||
+ | self.channel.basic_publish(exchange='', routing_key='qualityStatistics', body=message) | ||
+ | print(f'Sent statistics: {message}') | ||
+ | |||
+ | def reset_message_count(self): | ||
+ | for quality in self.message_count: | ||
+ | self.message_count[quality] = 0 | ||
+ | |||
+ | def is_batch_completed(self): | ||
+ | return sum(self.message_count.values()) >= 10 | ||
+ | |||
+ | def close_connection(self): | ||
+ | self.connection.close() | ||
+ | |||
+ | if __name__ == '__main__': | ||
+ | consumer = QualityConsumer() | ||
+ | try: | ||
+ | consumer.start_consuming() | ||
+ | except KeyboardInterrupt: | ||
+ | consumer.close_connection() | ||
+ | |||
+ | </code> | ||
+ | |||
+ | Készítsük el a statisztika kiírását egy új konzolban: | ||
<code python> | <code python> | ||
Sor 69: | Sor 127: | ||
def callback(ch, method, properties, body): | def callback(ch, method, properties, body): | ||
- | quality = body.decode() | + | message = body.decode() |
- | print(f'{quality} messages has been processed') | + | print(f'{message}') |
ch.basic_ack(delivery_tag=method.delivery_tag) | ch.basic_ack(delivery_tag=method.delivery_tag) | ||
Sor 77: | Sor 135: | ||
print('Waiting for quality statistics...') | print('Waiting for quality statistics...') | ||
channel.start_consuming() | channel.start_consuming() | ||
- | |||
</code> | </code> | ||
+ | **Feladat:** | ||
+ | A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit. | ||