실시간 데이터 처리/RabbitMQ

RabbitMQ Exchange Fanout

BUST 2017. 7. 4. 20:54

RabbitMQ Exchange Fanout

image

channel.exchange_declare(exchange='logs',
                         type='fanout')
  • exchange를 선언(declare), type은 fanout으로 한다.
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
  • 기존 Queue에 메세지를 보낸 것과 달리 Publish에서는 exchange를 대상으로 publish를 하면 된다.
result = channel.queue_declare()
channel.queue_bind(exchange='logs',
                   queue=result.method.queue)
  • 임시로 Queue를 생성하여 exchnage에 bind 할수 있다.
  • 기존의 코드는 Simple Queue와 동일
rabbitmqctl list_bindings
  • binding list를 가지고 오는 명령어

전체 코드

  • send.py
#!/usr/bin/env python 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

-receive.py

#!/usr/bin/env python 
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()


'실시간 데이터 처리 > RabbitMQ' 카테고리의 다른 글

RabbitMQ Mirrored Queue  (0) 2018.06.29
RabbitMQ Simple Queue  (0) 2017.07.03
RabbitMQ  (0) 2017.07.03