A kiválasztott változat és az aktuális verzió közötti különbségek a következők.
Előző változat mindkét oldalon Előző változat Következő változat | Előző változat | ||
tanszek:oktatas:iss_t:messaging_systems [2023/04/24 18:48] knehez |
tanszek:oktatas:iss_t:messaging_systems [2023/05/14 15:23] (aktuális) knehez [RabbitMQ example] |
||
---|---|---|---|
Sor 53: | Sor 53: | ||
This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | This code sets up a callback function that will be called every time a message is received from the 'my_queue' queue. The `auto_ack` parameter specifies whether to automatically acknowledge the message after it has been processed. Finally, the `start_consuming` method starts consuming messages from the queue. | ||
- | ==== Type of "Exchange" in RabbitMQ ==== | + | ===== Type of "Exchange" in RabbitMQ ===== |
An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message's routing key and decides which queue(s) the message should be sent to. | ||
Sor 86: | Sor 86: | ||
A fanout exchange routes messages to all queues that are bound to it, regardless of the routing key of the message. It is useful for broadcasting messages to multiple queues or multiple consumers. | A fanout exchange routes messages to all queues that are bound to it, regardless of the routing key of the message. It is useful for broadcasting messages to multiple queues or multiple consumers. | ||
- | * **For example**, a notification system may send messages to a fanout exchange when a new event is created. Each queue bound to the exchange would represent a different user, and all users should receive the notification. This way, the application can broadcast the message to all connected clients without having to worry about which specific queue to send the message to. | + | * **For example**, a notification system may send messages to a fanout exchange when a new event is created. Each queue bound to the exchange would represent a different user, and all users should receive the notification. This way, the application can broadcast the message to all connected queues. |
[Notification System] → [Fanout Exchange] → [Queue: User A] | [Notification System] → [Fanout Exchange] → [Queue: User A] | ||
Sor 106: | Sor 106: | ||
Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | ||
+ | |||
+ | ===== MQTT example ===== | ||
+ | |||
+ | Clone repository into docker playground: | ||
+ | |||
+ | git clone https://github.com/knehez/isi.git | ||
+ | cd isis/mqtt-python | ||
+ | docker-compose up | ||
+ | |||
+ | docker-compose.yml defines a multi-container application with three services: mqtt, consumer, and producer. | ||
+ | |||
+ | The mqtt service is an instance of the //toke/mosquitto// Docker image, which is a popular open-source MQTT broker. The service is configured to automatically restart unless it is explicitly stopped by the user. Additionally, three volumes are defined for the service: "/mosquitto/conf", "/mosquitto/data", and "/mosquitto/log". These volumes are used to persist the configuration, data, and log files for the MQTT broker respectively. | ||
+ | |||
+ | The consumer and producer services are both custom-built Docker images, which are defined using the build key. The context key specifies the build context, which in this case is the current directory (.), and the dockerfile key specifies the Dockerfile to use for the build. Additionally, a volume is defined for each service that maps the current directory to the "/app" directory in the container. | ||
+ | |||
+ | Finally, the depends_on key is used to specify that both the consumer and producer services depend on the mqtt service. This means that the mqtt service will be started before the other services, and will be available for use by those services. | ||
+ | |||
+ | **docker-compose.yml** | ||
+ | |||
+ | <code yml> | ||
+ | version: '3.7' | ||
+ | services: | ||
+ | mqtt: | ||
+ | image: toke/mosquitto | ||
+ | restart: unless-stopped | ||
+ | volumes: | ||
+ | - ./conf:/mosquitto/conf | ||
+ | - ./data:/mosquitto/data | ||
+ | - ./log:/mosquitto/log | ||
+ | |||
+ | consumer: | ||
+ | build: | ||
+ | context: . | ||
+ | dockerfile: Dockerfile-consumer | ||
+ | volumes: | ||
+ | - .:/app | ||
+ | depends_on: | ||
+ | - mqtt | ||
+ | |||
+ | producer: | ||
+ | build: | ||
+ | context: . | ||
+ | dockerfile: Dockerfile-producer | ||
+ | volumes: | ||
+ | - .:/app | ||
+ | depends_on: | ||
+ | - mqtt | ||
+ | </code> | ||
+ | |||
+ | **Dockerfile-consumer** | ||
+ | |||
+ | This Dockerfile defines a simple containerized Python application that can be used as a consumer for a message broker. The dependencies are installed in the container, and the consumer code is copied into the container's /app directory. When the container is started, the consumer.py script is executed to consume messages from the broker. | ||
+ | |||
+ | <code yml> | ||
+ | FROM python:3.9-slim-buster | ||
+ | |||
+ | WORKDIR /app | ||
+ | |||
+ | COPY requirements.txt . | ||
+ | RUN pip install --no-cache-dir -r requirements.txt | ||
+ | |||
+ | COPY consumer.py . | ||
+ | |||
+ | CMD ["python", "-u", "consumer.py"] | ||
+ | </code> | ||
+ | |||
+ | Here is a breakdown of the different parts of the Dockerfile: | ||
+ | |||
+ | * FROM python:3.9-slim-buster: This line specifies the base image for the Docker image. In this case, the image is based on Python 3.9 running on a slimmed-down version of the Debian Buster Linux distribution. | ||
+ | |||
+ | * WORKDIR /app: This line sets the working directory for the container to /app. This is where the consumer code and other related files will be located. | ||
+ | |||
+ | * COPY requirements.txt .: This line copies the requirements.txt file from the current directory on the host machine to the /app directory in the container. The requirements.txt file lists the dependencies that the consumer requires to run. | ||
+ | |||
+ | * RUN pip install --no-cache-dir -r requirements.txt: This line installs the dependencies listed in the requirements.txt file using pip. The --no-cache-dir option is used to ensure that pip does not cache the downloaded packages, which can help to reduce the size of the Docker image. | ||
+ | |||
+ | * COPY consumer.py .: This line copies the consumer.py file from the current directory on the host machine to the /app directory in the container. This is the main code file for the consumer. | ||
+ | |||
+ | * CMD ["python", "-u", "consumer.py"]: This line specifies the command to run when the container is started. In this case, it runs the consumer.py script using the Python interpreter (python). The -u flag is used to enable unbuffered output, which ensures that log messages are immediately visible in the console. | ||
+ | |||
+ | **consumer.py** | ||
+ | |||
+ | <code python> | ||
+ | import paho.mqtt.client as mqtt | ||
+ | |||
+ | broker = "mqtt" | ||
+ | port = 1883 | ||
+ | |||
+ | timelive = 60 | ||
+ | |||
+ | def on_connect(client, userdata, flags, rc): | ||
+ | print("Connected with result code "+str(rc)) | ||
+ | client.subscribe("/data") | ||
+ | |||
+ | |||
+ | def on_message(client, userdata, msg): | ||
+ | print(msg.payload.decode()) | ||
+ | |||
+ | client = mqtt.Client() | ||
+ | client.connect(broker, port, timelive) | ||
+ | client.on_connect = on_connect | ||
+ | client.on_message = on_message | ||
+ | client.loop_forever() | ||
+ | </code> | ||
+ | |||
+ | **producer.py** | ||
+ | |||
+ | <code python> | ||
+ | # simulator device 1 for mqtt message publishing | ||
+ | import paho.mqtt.client as paho | ||
+ | import time | ||
+ | import random | ||
+ | |||
+ | broker = "mqtt" | ||
+ | port = 1883 | ||
+ | |||
+ | def on_publish(client, userdata, result): | ||
+ | print("Device 1 : Data published.") | ||
+ | |||
+ | client = paho.Client("admin") | ||
+ | client.on_publish = on_publish | ||
+ | client.connect(broker, port) | ||
+ | |||
+ | for i in range(20): | ||
+ | d = random.randint(1, 5) | ||
+ | |||
+ | # telemetry to send | ||
+ | message = "Device 1 : Data " + str(i) | ||
+ | |||
+ | time.sleep(d) | ||
+ | |||
+ | # publish message | ||
+ | ret = client.publish("/data", message) | ||
+ | |||
+ | print("Stopped...") | ||
+ | </code> | ||