Hi Augusto Piva
Thank you for the detailed explanation, this is very helpful in understanding the issue you're facing.
The error message you're seeing "amqp:connection:forced - The connection was closed by container ... because it did not have any active links in the past 60000 milliseconds"
usually indicates that Azure Event Hubs closed the AMQP connection due to inactivity (i.e., no send or receive operations within 60 seconds). Even though your application appears to be continuously feeding the producer via the asyncio.Queue
, this can still occur under a few conditions, especially when using buffered_mode=True
.
Here are some key considerations and recommendations that may help address the issue:
Considerations:
Buffered Mode Behavior - In buffered mode, events are internally batched and sent when either:
-
max_buffer_length
is reached, or -
max_wait_time
is exceeded.
If the queue feeds data slowly or irregularly, the buffered client may not send anything for over 60 seconds, leading Azure to close the connection as idle.
Use of asyncio.wait_for()
- Wrapping each send_event()
call in asyncio.wait_for(..., timeout=10)
may prevent the SDK from fully handling retries or AMQP-level recoveries, especially if timeouts repeatedly occur.
EventHubClient May Get Stuck Internally - If a send_event()
hangs or gets into an unexpected state due to timeouts or network interruptions, the connection may become unhealthy over time, especially in long-running services like FastAPI apps.
Considerations:
Temporarily test with buffered_mode=False
- This can help confirm if the buffered batching is delaying your sends and causing idle timeouts.
Add a keep-alive heartbeat - As a workaround, you can implement a background coroutine that sends a small dummy event (e.g., { "type": "ping" }
) every 30 seconds if no real events are sent.
Avoid wrapping every send_event()
with a timeout - Consider removing asyncio.wait_for
around send_event()
and rely on the built-in retry mechanism. If you still need timeout logic, implement it with care and reinitialize the client if you catch persistent errors.
Enable detailed logging - You can enable debug logging to get more insights from the SDK and AMQP layer:
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("uamqp").setLevel(logging.DEBUG)
Enable Diagnostic Logs in Azure Monitor - You can enable Event Hubs diagnostics to Log Analytics or Storage to track disconnections, send failures, and other metrics on the server side.
I hope this information helps. Please do let us know if you have any further queries.
Kindly consider upvoting the comment if the information provided is helpful. This can assist other community members in resolving similar issues.
Thank you.