How to fix Encountered an error (InternalServerError) from host runtime. while Test/Run Function App Timer Trigger Function
Jinwon Ahn
0
Reputation points
this is my function app.py
import azure.functions as func
import logging
import json
import os
import sys
import subprocess
import asyncio
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
# Set up logging immediately
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add the project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# Azure Storage and Key Vault imports with error handling
try:
from azure.storage.blob import BlobServiceClient
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
AZURE_IMPORTS_AVAILABLE = True
logger.info("Azure SDK imports loaded successfully")
except ImportError as e:
logger.warning(f"Azure SDK imports not available: {e} - blob storage features will be limited")
AZURE_IMPORTS_AVAILABLE = False
# Try importing core modules with error handling
try:
# Import validated settings with graceful fallback
from azure_settings import (
get_environment_status,
is_environment_ready,
assert_environment_ready,
setup_azure_environment,
ENV_VALIDATION_SUCCESS
)
logger.info("Azure settings imported successfully")
except ImportError as e:
logger.error(f"Failed to import azure_settings: {e}")
# Provide fallback functions
def get_environment_status():
return {"validated": False, "error": "azure_settings import failed"}
def is_environment_ready():
return False
def setup_azure_environment():
pass
ENV_VALIDATION_SUCCESS = False
try:
from weekly_brief_enhanced import WeeklyBriefAutomation
logger.info("WeeklyBriefAutomation imported successfully")
except ImportError as e:
logger.error(f"Failed to import WeeklyBriefAutomation: {e}")
WeeklyBriefAutomation = None
try:
from async_weekly_brief import AsyncWeeklyBriefAutomation, run_async_operation
logger.info("AsyncWeeklyBriefAutomation imported successfully")
except ImportError as e:
logger.error(f"Failed to import AsyncWeeklyBriefAutomation: {e}")
AsyncWeeklyBriefAutomation = None
def run_async_operation(coro):
return asyncio.run(coro)
app = func.FunctionApp()
@app.route(route="runtime-test", auth_level=func.AuthLevel.ANONYMOUS)
def runtime_test(req: func.HttpRequest) -> func.HttpResponse:
"""Simple runtime test to verify Function App is working"""
try:
return func.HttpResponse(
json.dumps({
"status": "success",
"message": "Function App runtime is working",
"timestamp": datetime.utcnow().isoformat(),
"azure_imports": AZURE_IMPORTS_AVAILABLE,
"python_version": sys.version
}, indent=2),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logger.error(f"Runtime test failed: {e}")
return func.HttpResponse(
json.dumps({
"status": "error",
"message": f"Runtime test failed: {str(e)}",
"timestamp": datetime.utcnow().isoformat()
}, indent=2),
status_code=500,
mimetype="application/json"
)
def upload_to_blob_storage(file_path: str, blob_name: str) -> Optional[str]:
"""Upload file to Azure Blob Storage and return URL, following working pattern"""
if not AZURE_IMPORTS_AVAILABLE:
logging.warning("Azure SDK not available - cannot upload to blob storage")
return None
try:
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
account_url = os.getenv("AZURE_STORAGE_ACCOUNT_URL") # e.g., https://<account>.blob.core.windows.net
container_name = os.getenv("STORAGE_CONTAINER_NAME", "reports")
if connection_string:
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
elif account_url:
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url=account_url, credential=credential)
else:
logging.error("Neither AZURE_STORAGE_CONNECTION_STRING nor AZURE_STORAGE_ACCOUNT_URL is set.")
return None
# Create container if it doesn't exist
try:
blob_service_client.create_container(container_name)
except Exception:
pass # Container already exists
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
with open(file_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
return blob_client.url
except Exception as e:
logging.error(f"Failed to upload to blob storage: {e}")
return None
def create_error_response(message: str, details: str = None, status_code: int = 500, function_name: str = None) -> func.HttpResponse:
"""Create standardized error response with enhanced logging and context."""
error_response = {
"status": "error",
"message": message,
"timestamp": datetime.utcnow().isoformat(),
"environment_validated": ENV_VALIDATION_SUCCESS,
"function_name": function_name or "unknown"
}
if details:
error_response["details"] = details
if not ENV_VALIDATION_SUCCESS:
error_response["environment_issues"] = get_environment_status()
# Enhanced logging with function context
log_message = f"[{function_name or 'unknown'}] Error: {message}"
if details:
log_message += f" - Details: {details}"
logging.error(log_message)
return func.HttpResponse(
json.dumps(error_response, indent=2),
status_code=status_code,
mimetype="application/json"
)
@app.route(route="health", auth_level=func.AuthLevel.ANONYMOUS)
def health_check(req: func.HttpRequest) -> func.HttpResponse:
"""
Enhanced health check endpoint with import-time environment validation.
Uses fail-fast pattern - environment validated at module import time.
"""
try:
# Get environment status from import-time validation
env_status = get_environment_status()
response_data = {
"status": "healthy" if env_status["validated"] else "degraded",
"timestamp": datetime.utcnow().isoformat(),
"environment_validation": env_status,
"function_app": "running",
"python_version": sys.version,
"working_directory": str(Path.cwd()),
"validation_method": "import_time_fail_fast"
}
status_code = 200 if env_status["validated"] else 503
return func.HttpResponse(
json.dumps(response_data, indent=2),
status_code=status_code,
mimetype="application/json"
)
except Exception as e:
logging.error(f"Health check failed: {e}")
return create_error_response(
"Health check failed",
str(e),
500,
"health_check"
)
@app.route(route="weekly_manual_run", auth_level=func.AuthLevel.ANONYMOUS)
def weekly_manual_run(req: func.HttpRequest) -> func.HttpResponse:
"""
Manual trigger for testing specific countries or immediate execution.
Uses fail-fast environment validation pattern.
"""
logging.info('Manual weekly run trigger executed (async version)')
try:
# Setup environment following working pattern
try:
setup_azure_environment()
logging.info("Environment setup completed successfully")
except Exception as e:
logging.error(f"Environment setup failed: {e}")
return create_error_response(
"Environment setup failed",
str(e),
503
)
# Check environment readiness using fail-fast pattern
if not is_environment_ready():
return create_error_response(
"Environment validation failed",
"Required configuration is missing or invalid - check logs for details",
503
)
# Parse request parameters
countries_param = req.params.get('countries')
days_param = req.params.get('days', '7')
# Initialize async automation
async_automation = AsyncWeeklyBriefAutomation()
if countries_param:
# Process specific countries asynchronously
countries = [c.strip().lower() for c in countries_param.split(',')]
logging.info(f'Processing specific countries async: {countries}')
async def process_countries():
results = {}
for country in countries:
if country in async_automation.countries:
try:
result = await async_automation.generate_country_report_async(country)
results[country] = result
except Exception as e:
logging.error(f'Error processing {country}: {e}')
results[country] = {"status": "error", "error": str(e)}
else:
results[country] = {"status": "error", "error": "Country not supported"}
return results
# Execute async processing
results = run_async_operation(process_countries())
return func.HttpResponse(
json.dumps({
"status": "completed",
"mode": "manual_countries",
"countries_requested": countries,
"results": results,
"timestamp": datetime.utcnow().isoformat()
}),
status_code=200,
mimetype="application/json"
)
else:
# Run full automation asynchronously
async def run_full_automation():
try:
return await async_automation.run_full_automation_async()
except Exception as e:
logging.error(f'Error in full automation: {e}')
return {
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
result = run_async_operation(run_full_automation())
# Try to upload report to blob storage if available
blob_url = None
if result.get("status") == "success" and result.get("report_path"):
blob_name = f"reports/weekly_brief_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
blob_url = upload_to_blob_storage(result["report_path"], blob_name)
if blob_url:
logging.info(f"Report uploaded to blob storage: {blob_url}")
result["blob_url"] = blob_url
return func.HttpResponse(
json.dumps({
"status": result.get("status", "success"),
"mode": "async",
"message": "Async weekly automation completed",
"timestamp": result.get("timestamp", datetime.utcnow().isoformat()),
"countries_processed": result.get("countries_processed", len(async_automation.countries)),
"duration": result.get("duration", 0),
"processing_stats": result.get("processing_stats", {}),
"blob_url": blob_url
}),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f'Error in manual weekly run: {e}')
return create_error_response(
"Manual weekly run failed",
str(e)
)
@app.route(route="process_weekly_country_news", auth_level=func.AuthLevel.ANONYMOUS)
def process_weekly_country_news(req: func.HttpRequest) -> func.HttpResponse:
"""
Main scheduled function for processing all countries.
Uses fail-fast environment validation pattern.
"""
logging.info('Processing weekly country news - scheduled execution')
try:
# Setup environment following working pattern
try:
setup_azure_environment()
logging.info("Environment setup completed successfully")
except Exception as e:
logging.error(f"Environment setup failed: {e}")
return create_error_response(
"Environment setup failed",
str(e),
503
)
# Check environment readiness using fail-fast pattern
if not is_environment_ready():
return create_error_response(
"Environment validation failed",
"Required configuration is missing or invalid - check logs for details",
503
)
# Initialize and run the weekly brief automation using async version
async_automation = AsyncWeeklyBriefAutomation()
async def run_automation():
try:
return await async_automation.run_full_automation_async()
except Exception as e:
logging.error(f'Error in automation: {e}')
return {
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
result = run_async_operation(run_automation())
return func.HttpResponse(
json.dumps({
"status": result.get("status", "success"),
"message": "Weekly country news processing completed",
"timestamp": result.get("timestamp", datetime.utcnow().isoformat()),
"countries_processed": result.get("countries_processed", 0),
"duration": result.get("duration", 0),
"processing_stats": result.get("processing_stats", {})
}),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f'Error in weekly country news processing: {e}')
return create_error_response(
"Weekly country news processing failed",
str(e)
)
@app.timer_trigger(schedule="0 30 13 * * 1",
arg_name="myTimer",
run_on_startup=False,
use_monitor=False)
def weekly_country_news_timer(myTimer: func.TimerRequest) -> None:
"""
Timer-triggered Azure Function that runs weekly intelligence brief automation.
Runs every Monday at 1:30 PM UTC (8:30 AM EST).
Uses fail-fast environment validation pattern following working architecture.
"""
utc_timestamp = datetime.utcnow().replace(tzinfo=None).isoformat()
if myTimer.past_due:
logging.info('The timer is past due!')
logging.info('Weekly country news timer trigger executed at %s (enhanced version)', utc_timestamp)
try:
# Setup environment following working pattern
try:
setup_azure_environment()
logging.info("Environment setup completed successfully")
except Exception as e:
logging.error(f"Environment setup failed: {e}")
logging.error("Timer function failed: Environment setup failed - required configuration missing")
return
# Check environment readiness using fail-fast pattern
if not is_environment_ready():
logging.error("Timer function failed: Environment validation failed - required configuration missing")
return
# Initialize and run the weekly brief automation async
logging.info('Initializing AsyncWeeklyBriefAutomation...')
async_automation = AsyncWeeklyBriefAutomation()
logging.info('Starting async weekly automation process...')
async def run_timer_automation():
try:
return await async_automation.run_full_automation_async()
except Exception as e:
logging.error(f'Error in timer automation: {e}')
return {
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# Execute the async operation
result = run_async_operation(run_timer_automation())
if result.get("status") == "success":
logging.info('Timer-triggered weekly automation completed successfully')
else:
logging.error(f'Timer-triggered automation failed: {result}')
except Exception as e:
logging.error(f'Critical error in timer function: {e}')
# Don't re-raise to prevent function app from crashing
@app.route(route="test_email", auth_level=func.AuthLevel.ANONYMOUS)
def test_email(req: func.HttpRequest) -> func.HttpResponse:
"""
Test endpoint for email functionality only.
Uses fail-fast environment validation pattern.
"""
logging.info('Email test endpoint called (async version)')
try:
# Setup environment following working pattern
try:
setup_azure_environment()
logging.info("Environment setup completed successfully")
except Exception as e:
logging.error(f"Environment setup failed: {e}")
return create_error_response(
"Environment setup failed",
str(e),
503
)
# Check environment readiness using fail-fast pattern
if not is_environment_ready():
return create_error_response(
"Environment validation failed",
"Required configuration is missing or invalid - check logs for details",
503
)
# Use async email testing
async_automation = AsyncWeeklyBriefAutomation()
logging.info('Using AsyncWeeklyBriefAutomation for email test')
async def run_email_test():
try:
return await async_automation.test_email_async()
except Exception as e:
logging.error(f'Error in email test: {e}')
return {
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# Run async email test
result = run_async_operation(run_email_test())
return func.HttpResponse(
json.dumps({
"status": result.get("status", "unknown"),
"mode": "async",
"message": result.get("message", "Email test completed"),
"timestamp": result.get("timestamp", datetime.utcnow().isoformat()),
"details": result.get("details", {})
}),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f'Error in email test: {e}')
return create_error_response(
"Email test failed",
str(e)
)
@app.route(route="gpt-health", auth_level=func.AuthLevel.ANONYMOUS)
def gpt_health(req: func.HttpRequest) -> func.HttpResponse:
"""Test GPT/OpenAI functionality following working pattern"""
try:
logging.info('GPT health check endpoint called')
# Setup environment first
setup_azure_environment()
# Test GPT with a simple request
import openai
# Set API key
openai.api_key = os.getenv("OPENAI_API_KEY")
# Simple test request
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Say 'GPT health check successful' in exactly those words."}],
max_tokens=20,
temperature=0
)
result = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"openai_response": response.choices[0].message.content.strip(),
"model_used": "gpt-3.5-turbo",
"test_successful": True
}
except Exception as e:
result = {
"status": "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"error": str(e),
"test_successful": False
}
return func.HttpResponse(
json.dumps(result, indent=2),
status_code=200 if result["test_successful"] else 503,
mimetype="application/json"
)
except Exception as e:
logging.error(f"GPT health check failed: {e}")
return create_error_response(
"GPT health check failed",
str(e),
500,
"gpt_health"
)
@app.route(route="env-test", auth_level=func.AuthLevel.ANONYMOUS)
def env_test(req: func.HttpRequest) -> func.HttpResponse:
"""Test environment variables and Key Vault access following working pattern"""
try:
logging.info('Environment test endpoint called')
# Test environment setup
test_results = {
"timestamp": datetime.utcnow().isoformat(),
"service": "weekly-country-news-func",
"environment_test": {}
}
# Check key environment variables (without exposing values)
env_vars_to_check = [
"OPENAI_API_KEY", "SERP_API_KEY", "SENDER_EMAIL",
"RECIPIENT_EMAILS", "AZURE_COMMUNICATION_CONNECTION_STRING", "KEY_VAULT_URL"
]
for var in env_vars_to_check:
value = os.getenv(var)
test_results["environment_test"][var] = {
"present": bool(value),
"length": len(value) if value else 0,
"masked_value": f"{value[:4]}***{value[-4:]}" if value and len(value) > 8 else "***" if value else None
}
# Test Key Vault access
try:
setup_azure_environment()
test_results["key_vault_test"] = {
"status": "success",
"message": "Key Vault access successful"
}
except Exception as e:
test_results["key_vault_test"] = {
"status": "error",
"message": str(e)
}
# Test environment validation
test_results["environment_validation"] = get_environment_status()
return func.HttpResponse(
json.dumps(test_results, indent=2),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f"Environment test failed: {e}")
return create_error_response(
"Environment test failed",
str(e),
500,
"env_test"
)
@app.route(route="debug_info", auth_level=func.AuthLevel.ANONYMOUS)
def debug_info(req: func.HttpRequest) -> func.HttpResponse:
"""
Debug endpoint to display system information and configuration.
Uses fail-fast environment validation pattern.
"""
try:
import sys
import platform
debug_data = {
"status": "debug_info",
"timestamp": datetime.utcnow().isoformat(),
"environment_validation": get_environment_status(),
"system_info": {
"platform": platform.platform(),
"python_version": sys.version,
"working_directory": str(Path.cwd()),
"sys_path": sys.path[:5] # First 5 paths only
},
"file_system": {
"project_files": [str(f) for f in Path.cwd().glob("*.py")],
"requirements_exist": Path("requirements.txt").exists(),
"host_json_exist": Path("host.json").exists()
},
"validation_method": "import_time_fail_fast"
}
return func.HttpResponse(
json.dumps(debug_data, indent=2),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f'Debug info failed: {e}')
return create_error_response(
"Debug info failed",
str(e)
)
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
Sign in to answer