A kiválasztott változat és az aktuális verzió közötti különbségek a következők.
Előző változat mindkét oldalon Előző változat Következő változat | Előző változat | ||
tanszek:oktatas:iss_t:messaging_systems [2023/04/24 19:05] knehez [MQTT example] |
tanszek:oktatas:iss_t:messaging_systems [2023/05/14 15:23] (aktuális) knehez [RabbitMQ example] |
||
---|---|---|---|
Sor 53: | Sor 53: | ||
This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | ||
- | ==== Type of "Exchange" in RabbitMQ ==== | + | ===== Type of "Exchange" in RabbitMQ ===== |
An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | ||
Sor 107: | Sor 107: | ||
Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | ||
- | ==== MQTT example ==== | + | ===== MQTT example ===== |
Clone repository into docker playground: | Clone repository into docker playground: | ||
Sor 186: | Sor 186: | ||
* CMD ["python", "-u", "consumer.py"]: This line specifies the command to run when the container is started. In this case, it runs the consumer.py script using the Python interpreter (python). The -u flag is used to enable unbuffered output, which ensures that log messages are immediately visible in the console. | * CMD ["python", "-u", "consumer.py"]: This line specifies the command to run when the container is started. In this case, it runs the consumer.py script using the Python interpreter (python). The -u flag is used to enable unbuffered output, which ensures that log messages are immediately visible in the console. | ||
+ | **consumer.py** | ||
+ | <code python> | ||
+ | import paho.mqtt.client as mqtt | ||
+ | |||
+ | broker = "mqtt" | ||
+ | port = 1883 | ||
+ | |||
+ | timelive = 60 | ||
+ | |||
+ | def on_connect(client, userdata, flags, rc): | ||
+ | print("Connected with result code "+str(rc)) | ||
+ | client.subscribe("/data") | ||
+ | |||
+ | |||
+ | def on_message(client, userdata, msg): | ||
+ | print(msg.payload.decode()) | ||
+ | |||
+ | client = mqtt.Client() | ||
+ | client.connect(broker, port, timelive) | ||
+ | client.on_connect = on_connect | ||
+ | client.on_message = on_message | ||
+ | client.loop_forever() | ||
+ | </code> | ||
+ | |||
+ | **producer.py** | ||
+ | |||
+ | <code python> | ||
+ | # simulator device 1 for mqtt message publishing | ||
+ | import paho.mqtt.client as paho | ||
+ | import time | ||
+ | import random | ||
+ | |||
+ | broker = "mqtt" | ||
+ | port = 1883 | ||
+ | |||
+ | def on_publish(client, userdata, result): | ||
+ | print("Device 1 : Data published.") | ||
+ | |||
+ | client = paho.Client("admin") | ||
+ | client.on_publish = on_publish | ||
+ | client.connect(broker, port) | ||
+ | |||
+ | for i in range(20): | ||
+ | d = random.randint(1, 5) | ||
+ | |||
+ | # telemetry to send | ||
+ | message = "Device 1 : Data " + str(i) | ||
+ | |||
+ | time.sleep(d) | ||
+ | |||
+ | # publish message | ||
+ | ret = client.publish("/data", message) | ||
+ | |||
+ | print("Stopped...") | ||
+ | </code> | ||