Azure Service Bus (Producer-Consumer SDK)

PY
S
Python

Azure Service Bus (Producer-Consumer SDK)

1# Install the Azure SDK for Service Bus
2# The '-q' flag is for a quiet installation
3!pip install -q azure-servicebus
4
5# Replace with your own connection string and queue name
6CONNECTION_STRING = "Endpoint=sb://*****.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=***********"
7QUEUE_NAME = "myqueue"
8
9# PRODUCER
10import json
11from azure.servicebus import ServiceBusClient, ServiceBusMessage
12
13
14# 1. Define the Python object (dictionary)
15message_object = {
16    "type": "greeting",
17    "message": "hello world 138"
18}
19
20# 2. Stringify the object to a JSON formatted string
21stringified_message = json.dumps(message_object)
22
23# 3. Send the stringified object
24# Create a ServiceBusClient using the connection string
25with ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING, logging_enable=True) as client:
26    # Get a Queue Sender object to send messages to the queue
27    with client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
28        # Create a single message with the JSON string as its body
29        single_message = ServiceBusMessage(stringified_message)
30
31        # Send the message
32        sender.send_messages(single_message)
33
34        print("Sent a single message with a stringified JSON object:")
35        print(stringified_message)
36
37
38# CONSUMER
39import json
40import asyncio
41from azure.servicebus.aio import ServiceBusClient
42
43async def consume_messages_async():
44    """Async version for better notebook compatibility"""
45    async with ServiceBusClient.from_connection_string(
46        conn_str=CONNECTION_STRING, 
47        logging_enable=True) as servicebus_client:
48        
49        # Get the Queue Receiver object for the queue
50        receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
51        
52        async with receiver:
53            print("Starting async message processing...")
54            
55            # Receive up to 2 messages at once
56            received_msgs = await receiver.receive_messages(max_wait_time=10, max_message_count=2)
57            
58            print(f"Received {len(received_msgs)} messages:")
59            
60            for msg in received_msgs:
61                try:
62                    message_body = str(msg)
63                    print(f"Raw message: {message_body}")
64                    
65                    # Parse JSON
66                    message_object = json.loads(message_body)
67                    print(f"Processing: {message_object}")
68                    
69                    # Complete the message so that it's removed from the queue
70                    await receiver.complete_message(msg)
71                    print("Message completed successfully")
72                    
73                except json.JSONDecodeError as e:
74                    print(f"Error parsing JSON: {e}")
75                    await receiver.abandon_message(msg)
76                except Exception as e:
77                    print(f"Error processing message: {e}")
78                    await receiver.abandon_message(msg)
79
80# Run the async consumer
81await consume_messages_async()
82

Created on 9/3/2025