Azure Event Hub - amqp:connection:forced - The connection was closed by container because it did not have any active links in the past 60000 milliseconds

Augusto Piva 55 Reputation points
2025-07-29T20:27:09.4266667+00:00

Hi,

I'm facing a trouble with the Azure Event hub SDK for python.

I'm running an Event hub client producer that streams data this way, in buffer mode this way:

async def on_error(events, partition_id, error):
    logger.error(f"Error sending {len(events)} events for partition key {partition_id}: {error}")
    pass

async def on_success(events,partition_id):
	pass
    

async def producer(queue: asyncio.Queue):
    global last_time_message_enqueued
    
    producer_client:EventHubProducerClient = EventHubProducerClient.from_connection_string(
            conn_str=CONNECTION_STRING, 
            buffered_mode=True, 
            max_buffer_length=1000, 
            max_wait_time=3, 
            on_error=on_error,
            on_success=on_success
    )

    async with producer_client:
    	logger.info("Producer client starting")
        while True:
        	try:
            	event = await queue.get()
                if event is None:
                   queue.task_done()
                   logger.info("Empty event")
                   continue
                try:
                	await asyncio.wait_for(
                       producer_client.send_event(  
                       EventData(json.dumps(event)),
                      	partition_key=str(event["zoneId"]),
                     ),
						timeout=10)
                     last_time_message_enqueued = datetime.now().strftime("%H:%M:%S")
                     queue.task_done()

                     except asyncio.TimeoutError:
                        logger.error("⚠️ Send_event timed out")
						queue.task_done()
                        continue

                     except Exception as e:
                        logger.error(f"Error sending events: {str(e)}")
                        queue.task_done()

                except Exception as e:
                    logger.error(f"Error while processing events: {str(e)}")
                    queue.task_done()

This event hub client, is placed in a Fast API application, running asynchronously.

The Event hub client as you can see receives data from an asyncio.Queue internal queue, as you see it's consuming the data from

event = await queue.get()

The problem we're having is that some times we have this log from the client:

2025-05-23T01:37:27.4619244Z azure.eventhub._pyamqp.aio._connection_async - WARNING - Connection closed with error: [b'amqp:connection:forced', b"The connection was closed by container '36fd96aacb604aa0a6e009718d105cdf_G13' because it did not have any active links in the past 60000 milliseconds.

At the beginning we found out that producer_client.send_event got stuck

await producer_client.send_event(EventData(json.dumps(event)),  partition_key=str(event["zoneId"]) )

Then we added the timeout to see if on the next invocation the client could unstuck by its own, like this, on the next iteration

await asyncio.wait_for(
	producer_client.send_event(
                   EventData(json.dumps(event)),                     
                   partition_key=str(event["zoneId"]
    ),
), timeout=10)

But we then start getting the timeouts messages "Send_event timed out" over and over again till we got the disconnection from Azure about amqp:connection:forced.

We've not hit any quota limits. We're aware of that.

We also understand that the producer is always feed with events, and there's always a message to process from the asyncio.Queue, which is the queue it's consuming the messages from.

This issue is not happening to us every day, but is impacting our flow of messages to the event hub as we have this unexpected disconnection because of no active links, and when it happens it have lapses of time that could be 1 minute or 5 minutes.

Is there anything special we should try for sort this issue out?
Is there any logs on the server that we should look at?

Thanks in advanced

Azure Event Hubs
0 comments No comments
{count} votes

Accepted answer
  1. Chandra Boorla 15,425 Reputation points Microsoft External Staff Moderator
    2025-07-30T07:22:37.3733333+00:00

    Hi Augusto Piva

    Thank you for the detailed explanation, this is very helpful in understanding the issue you're facing.

    The error message you're seeing "amqp:connection:forced - The connection was closed by container ... because it did not have any active links in the past 60000 milliseconds" usually indicates that Azure Event Hubs closed the AMQP connection due to inactivity (i.e., no send or receive operations within 60 seconds). Even though your application appears to be continuously feeding the producer via the asyncio.Queue, this can still occur under a few conditions, especially when using buffered_mode=True.

    Here are some key considerations and recommendations that may help address the issue:

    Considerations:

    Buffered Mode Behavior - In buffered mode, events are internally batched and sent when either:

    • max_buffer_length is reached, or
    • max_wait_time is exceeded.

    If the queue feeds data slowly or irregularly, the buffered client may not send anything for over 60 seconds, leading Azure to close the connection as idle.

    Use of asyncio.wait_for() - Wrapping each send_event() call in asyncio.wait_for(..., timeout=10) may prevent the SDK from fully handling retries or AMQP-level recoveries, especially if timeouts repeatedly occur.

    EventHubClient May Get Stuck Internally - If a send_event() hangs or gets into an unexpected state due to timeouts or network interruptions, the connection may become unhealthy over time, especially in long-running services like FastAPI apps.

    Considerations:

    Temporarily test with buffered_mode=False - This can help confirm if the buffered batching is delaying your sends and causing idle timeouts.

    Add a keep-alive heartbeat - As a workaround, you can implement a background coroutine that sends a small dummy event (e.g., { "type": "ping" }) every 30 seconds if no real events are sent.

    Avoid wrapping every send_event() with a timeout - Consider removing asyncio.wait_for around send_event() and rely on the built-in retry mechanism. If you still need timeout logic, implement it with care and reinitialize the client if you catch persistent errors.

    Enable detailed logging - You can enable debug logging to get more insights from the SDK and AMQP layer:

    import logging
    logging.basicConfig(level=logging.DEBUG)
    logging.getLogger("uamqp").setLevel(logging.DEBUG)
    

    Enable Diagnostic Logs in Azure Monitor - You can enable Event Hubs diagnostics to Log Analytics or Storage to track disconnections, send failures, and other metrics on the server side.

    I hope this information helps. Please do let us know if you have any further queries.

    Kindly consider upvoting the comment if the information provided is helpful. This can assist other community members in resolving similar issues.

    Thank you.

    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.