A kiválasztott változat és az aktuális verzió közötti különbségek a következők.
| Előző változat mindkét oldalon Előző változat Következő változat | Előző változat | ||
|
tanszek:oktatas:iss_t:messaging_systems [2023/04/24 19:00] knehez [MQTT example] |
tanszek:oktatas:iss_t:messaging_systems [2023/05/14 15:23] (aktuális) knehez [RabbitMQ example] |
||
|---|---|---|---|
| Sor 53: | Sor 53: | ||
| This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | ||
| - | ==== Type of "Exchange" in RabbitMQ ==== | + | ===== Type of "Exchange" in RabbitMQ ===== |
| An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | ||
| Sor 107: | Sor 107: | ||
| Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | ||
| - | ==== MQTT example ==== | + | ===== MQTT example ===== |
| Clone repository into docker playground: | Clone repository into docker playground: | ||
| git clone https://github.com/knehez/isi.git | git clone https://github.com/knehez/isi.git | ||
| + | cd isis/mqtt-python | ||
| + | docker-compose up | ||
| docker-compose.yml defines a multi-container application with three services: mqtt, consumer, and producer. | docker-compose.yml defines a multi-container application with three services: mqtt, consumer, and producer. | ||
| Sor 149: | Sor 151: | ||
| volumes: | volumes: | ||
| - .:/app | - .:/app | ||
| - | depen | + | depends_on: |
| + | - mqtt | ||
| </code> | </code> | ||
| + | |||
| + | **Dockerfile-consumer** | ||
| + | |||
| + | This Dockerfile defines a simple containerized Python application that can be used as a consumer for a message broker. The dependencies are installed in the container, and the consumer code is copied into the container's /app directory. When the container is started, the consumer.py script is executed to consume messages from the broker. | ||
| + | |||
| + | <code yml> | ||
| + | FROM python:3.9-slim-buster | ||
| + | |||
| + | WORKDIR /app | ||
| + | |||
| + | COPY requirements.txt . | ||
| + | RUN pip install --no-cache-dir -r requirements.txt | ||
| + | |||
| + | COPY consumer.py . | ||
| + | |||
| + | CMD ["python", "-u", "consumer.py"] | ||
| + | </code> | ||
| + | |||
| + | Here is a breakdown of the different parts of the Dockerfile: | ||
| + | |||
| + | * FROM python:3.9-slim-buster: This line specifies the base image for the Docker image. In this case, the image is based on Python 3.9 running on a slimmed-down version of the Debian Buster Linux distribution. | ||
| + | |||
| + | * WORKDIR /app: This line sets the working directory for the container to /app. This is where the consumer code and other related files will be located. | ||
| + | |||
| + | * COPY requirements.txt .: This line copies the requirements.txt file from the current directory on the host machine to the /app directory in the container. The requirements.txt file lists the dependencies that the consumer requires to run. | ||
| + | |||
| + | * RUN pip install --no-cache-dir -r requirements.txt: This line installs the dependencies listed in the requirements.txt file using pip. The --no-cache-dir option is used to ensure that pip does not cache the downloaded packages, which can help to reduce the size of the Docker image. | ||
| + | |||
| + | * COPY consumer.py .: This line copies the consumer.py file from the current directory on the host machine to the /app directory in the container. This is the main code file for the consumer. | ||
| + | |||
| + | * CMD ["python", "-u", "consumer.py"]: This line specifies the command to run when the container is started. In this case, it runs the consumer.py script using the Python interpreter (python). The -u flag is used to enable unbuffered output, which ensures that log messages are immediately visible in the console. | ||
| + | |||
| + | **consumer.py** | ||
| + | |||
| + | <code python> | ||
| + | import paho.mqtt.client as mqtt | ||
| + | |||
| + | broker = "mqtt" | ||
| + | port = 1883 | ||
| + | |||
| + | timelive = 60 | ||
| + | |||
| + | def on_connect(client, userdata, flags, rc): | ||
| + | print("Connected with result code "+str(rc)) | ||
| + | client.subscribe("/data") | ||
| + | |||
| + | |||
| + | def on_message(client, userdata, msg): | ||
| + | print(msg.payload.decode()) | ||
| + | |||
| + | client = mqtt.Client() | ||
| + | client.connect(broker, port, timelive) | ||
| + | client.on_connect = on_connect | ||
| + | client.on_message = on_message | ||
| + | client.loop_forever() | ||
| + | </code> | ||
| + | |||
| + | **producer.py** | ||
| + | |||
| + | <code python> | ||
| + | # simulator device 1 for mqtt message publishing | ||
| + | import paho.mqtt.client as paho | ||
| + | import time | ||
| + | import random | ||
| + | |||
| + | broker = "mqtt" | ||
| + | port = 1883 | ||
| + | |||
| + | def on_publish(client, userdata, result): | ||
| + | print("Device 1 : Data published.") | ||
| + | |||
| + | client = paho.Client("admin") | ||
| + | client.on_publish = on_publish | ||
| + | client.connect(broker, port) | ||
| + | |||
| + | for i in range(20): | ||
| + | d = random.randint(1, 5) | ||
| + | |||
| + | # telemetry to send | ||
| + | message = "Device 1 : Data " + str(i) | ||
| + | |||
| + | time.sleep(d) | ||
| + | |||
| + | # publish message | ||
| + | ret = client.publish("/data", message) | ||
| + | |||
| + | print("Stopped...") | ||
| + | </code> | ||
| + | |||
| + | |||