==== Összetettebb példa ====
Egy minőségbiztosító rendszer mérőgépének 3 állapotát küldjük egy 'qualityQueue' nevű üzenetsorra. Készítsen egy több komponensből álló alkalmazást, amely 2 kliensen keresztül kommunikál az üzenetsorral az alábbi módon:
* Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a 'qualityQueue' üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként.
* **Készítsen egy komponenst** amely a 'GOOD', 'EXCELLENT' és a 'WRONG' üzeneteket leolvassa a qualityQueue sorról és gyűjti. Minden 10 megkapott azonos üzenet után a 'qualityStatistics' sorra küld egy üzenetet, amiben azt jelzi, hogy 10 (adott minőségű) üzenetet feldolgozott.
* **Készítsen egy második klienst**, ami a 'qualityStatistics' sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 'WRONG' messages has been processed'
A fenti feladatot a http://docker.iit.uni-miskolc.hu keretrendszerben oldjuk meg.
=== RabbitMQ indítása docker-ben ===
A feladat megoldásához több instance-t (konzolt) érdemes indítani. Az első konzol fogja a rabbitMQ szervert indítani. Adjunk hozzá egy konzolt (node 1) és futtassuk a következő parancsot:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management-alpine
A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben és a feldolgozóban.
Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot:
pip install pika
Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja.
Hozzuk létre a quality_message_sender.py-t:
Használjuk a megfelelő IP-t a //__init__(self):// ben
import pika
import random
import time
class QualitySender:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='qualityQueue')
def start_sending(self):
qualities = ['GOOD', 'EXCELLENT', 'WRONG']
while True:
quality = random.choice(qualities)
self.channel.basic_publish(exchange='', routing_key='qualityQueue', body=quality)
print(f'Sent quality: {quality}')
time.sleep(1)
def close_connection(self):
self.connection.close()
if __name__ == '__main__':
sender = QualitySender()
try:
sender.start_sending()
except KeyboardInterrupt:
sender.close_connection()
A második komponenshez indítsunk egy új konzolt:
A //__init__(self):// konstruktorban állítsuk be a rabbitMQ szerver IP címét
import pika
class QualityConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='qualityQueue')
self.channel.queue_declare(queue='qualityStatistics')
self.message_count = {'GOOD': 0, 'EXCELLENT': 0, 'WRONG': 0}
def start_consuming(self):
def callback(ch, method, properties, body):
quality = body.decode()
self.message_count[quality] += 1
print(f'Received quality: {quality}')
if self.is_batch_completed():
self.send_statistics()
self.reset_message_count()
self.channel.basic_consume(queue='qualityQueue', on_message_callback=callback, auto_ack=True)
self.channel.start_consuming()
def send_statistics(self):
for quality, count in self.message_count.items():
if count > 0:
message = f'{count} {quality} messages has been processed'
self.channel.basic_publish(exchange='', routing_key='qualityStatistics', body=message)
print(f'Sent statistics: {message}')
def reset_message_count(self):
for quality in self.message_count:
self.message_count[quality] = 0
def is_batch_completed(self):
return sum(self.message_count.values()) >= 10
def close_connection(self):
self.connection.close()
if __name__ == '__main__':
consumer = QualityConsumer()
try:
consumer.start_consuming()
except KeyboardInterrupt:
consumer.close_connection()
Készítsük el a statisztika kiírását egy új konzolban:
import pika
# RabbitMQ settings
connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z'))
channel = connection.channel()
channel.queue_declare(queue='qualityStatistics')
def callback(ch, method, properties, body):
message = body.decode()
print(f'{message}')
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='qualityStatistics', on_message_callback=callback)
print('Waiting for quality statistics...')
channel.start_consuming()
**Feladat:**
A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit.