1# Install the Azure SDK for Service Bus
2# The '-q' flag is for a quiet installation
3!pip install -q azure-servicebus
4
5# Replace with your own connection string and queue name
6CONNECTION_STRING = "Endpoint=sb://*****.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=***********"
7QUEUE_NAME = "myqueue"
8
9# PRODUCER
10import json
11from azure.servicebus import ServiceBusClient, ServiceBusMessage
12
13
14# 1. Define the Python object (dictionary)
15message_object = {
16 "type": "greeting",
17 "message": "hello world 138"
18}
19
20# 2. Stringify the object to a JSON formatted string
21stringified_message = json.dumps(message_object)
22
23# 3. Send the stringified object
24# Create a ServiceBusClient using the connection string
25with ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING, logging_enable=True) as client:
26 # Get a Queue Sender object to send messages to the queue
27 with client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
28 # Create a single message with the JSON string as its body
29 single_message = ServiceBusMessage(stringified_message)
30
31 # Send the message
32 sender.send_messages(single_message)
33
34 print("Sent a single message with a stringified JSON object:")
35 print(stringified_message)
36
37
38# CONSUMER
39import json
40import asyncio
41from azure.servicebus.aio import ServiceBusClient
42
43async def consume_messages_async():
44 """Async version for better notebook compatibility"""
45 async with ServiceBusClient.from_connection_string(
46 conn_str=CONNECTION_STRING,
47 logging_enable=True) as servicebus_client:
48
49 # Get the Queue Receiver object for the queue
50 receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
51
52 async with receiver:
53 print("Starting async message processing...")
54
55 # Receive up to 2 messages at once
56 received_msgs = await receiver.receive_messages(max_wait_time=10, max_message_count=2)
57
58 print(f"Received {len(received_msgs)} messages:")
59
60 for msg in received_msgs:
61 try:
62 message_body = str(msg)
63 print(f"Raw message: {message_body}")
64
65 # Parse JSON
66 message_object = json.loads(message_body)
67 print(f"Processing: {message_object}")
68
69 # Complete the message so that it's removed from the queue
70 await receiver.complete_message(msg)
71 print("Message completed successfully")
72
73 except json.JSONDecodeError as e:
74 print(f"Error parsing JSON: {e}")
75 await receiver.abandon_message(msg)
76 except Exception as e:
77 print(f"Error processing message: {e}")
78 await receiver.abandon_message(msg)
79
80# Run the async consumer
81await consume_messages_async()
82
Created on 9/3/2025