A kiválasztott változat és az aktuális verzió közötti különbségek a következők.
| Előző változat mindkét oldalon Előző változat Következő változat | Előző változat | ||
|
tanszek:oktatas:iss_t:messaging_systems [2023/04/24 19:05] knehez [MQTT example] |
tanszek:oktatas:iss_t:messaging_systems [2023/05/14 15:23] (aktuális) knehez [RabbitMQ example] |
||
|---|---|---|---|
| Sor 53: | Sor 53: | ||
| This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | ||
| - | ==== Type of "Exchange" in RabbitMQ ==== | + | ===== Type of "Exchange" in RabbitMQ ===== |
| An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | ||
| Sor 107: | Sor 107: | ||
| Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | ||
| - | ==== MQTT example ==== | + | ===== MQTT example ===== |
| Clone repository into docker playground: | Clone repository into docker playground: | ||
| Sor 186: | Sor 186: | ||
| * CMD ["python", "-u", "consumer.py"]: This line specifies the command to run when the container is started. In this case, it runs the consumer.py script using the Python interpreter (python). The -u flag is used to enable unbuffered output, which ensures that log messages are immediately visible in the console. | * CMD ["python", "-u", "consumer.py"]: This line specifies the command to run when the container is started. In this case, it runs the consumer.py script using the Python interpreter (python). The -u flag is used to enable unbuffered output, which ensures that log messages are immediately visible in the console. | ||
| + | **consumer.py** | ||
| + | <code python> | ||
| + | import paho.mqtt.client as mqtt | ||
| + | |||
| + | broker = "mqtt" | ||
| + | port = 1883 | ||
| + | |||
| + | timelive = 60 | ||
| + | |||
| + | def on_connect(client, userdata, flags, rc): | ||
| + | print("Connected with result code "+str(rc)) | ||
| + | client.subscribe("/data") | ||
| + | |||
| + | |||
| + | def on_message(client, userdata, msg): | ||
| + | print(msg.payload.decode()) | ||
| + | |||
| + | client = mqtt.Client() | ||
| + | client.connect(broker, port, timelive) | ||
| + | client.on_connect = on_connect | ||
| + | client.on_message = on_message | ||
| + | client.loop_forever() | ||
| + | </code> | ||
| + | |||
| + | **producer.py** | ||
| + | |||
| + | <code python> | ||
| + | # simulator device 1 for mqtt message publishing | ||
| + | import paho.mqtt.client as paho | ||
| + | import time | ||
| + | import random | ||
| + | |||
| + | broker = "mqtt" | ||
| + | port = 1883 | ||
| + | |||
| + | def on_publish(client, userdata, result): | ||
| + | print("Device 1 : Data published.") | ||
| + | |||
| + | client = paho.Client("admin") | ||
| + | client.on_publish = on_publish | ||
| + | client.connect(broker, port) | ||
| + | |||
| + | for i in range(20): | ||
| + | d = random.randint(1, 5) | ||
| + | |||
| + | # telemetry to send | ||
| + | message = "Device 1 : Data " + str(i) | ||
| + | |||
| + | time.sleep(d) | ||
| + | |||
| + | # publish message | ||
| + | ret = client.publish("/data", message) | ||
| + | |||
| + | print("Stopped...") | ||
| + | </code> | ||