Share via


Author AI agents in code

This page shows how to author an AI agent in Python using Mosaic AI Agent Framework and popular agent authoring libraries like LangGraph and OpenAI.

Requirements

Tip

Databricks recommends installing the latest version of the MLflow Python client when developing agents.

To author and deploy agents using the approach on this page, install the following:

  • databricks-agents 1.2.0 or above
  • mlflow 3.1.3 or above
  • Python 3.10 or above.
    • Use serverless compute or Databricks Runtime 13.3 LTS or above to meet this requirement.
%pip install -U -qqqq databricks-agents mlflow

Databricks also recommends installing Databricks AI Bridge integration packages to author agents. These integration packages provide a shared layer of APIs that interact with Databricks AI features, such as Databricks AI/BI Genie and Vector Search, across agent authoring frameworks and SDKs.

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

OpenAI

%pip install -U -qqqq databricks-openai

Pure Python agents

%pip install -U -qqqq databricks-ai-bridge

Use ResponsesAgent to author agents

Databricks recommends the MLflow ResponsesAgent interface to author production-grade agents. With ResponsesAgent, you can use any third-party framework to author an agent and then integrate it with Databricks AI features for robust logging, tracing, evaluation, deployment, and monitoring capabilities.

This ReponsesAgent schema is compatible with the OpenAI Responses schema. To learn more about OpenAI Responses, see OpenAI: Responses vs. ChatCompletion.

ResponsesAgent easily wraps existing agents for Databricks compatibility.

ResponsesAgent provides the following benefits:

  • Advanced agent capabilities

    • Multi-agent support
    • Streaming output: Stream the output in smaller chunks.
    • Comprehensive tool-calling message history: Return multiple messages, including intermediate tool-calling messages, for improved quality and conversation management.
    • Tool-calling confirmation support
    • Long-running tool support
  • Streamlined development, deployment, and monitoring

    • Author agents using any framework: Wrap any existing agent using the ResponsesAgent interface to get out-of-the-box compatibility with AI Playground, Agent Evaluation, and Agent Monitoring.
    • Typed authoring interfaces: Write agent code using typed Python classes, benefiting from IDE and notebook autocomplete.
    • Automatic signature inference: MLflow automatically infers ResponsesAgent signatures when logging an agent, simplifying registration and deployment. See Infer Model Signature during logging.
    • Automatic tracing: MLflow automatically traces your predict and predict_stream functions, aggregating streamed responses for easier evaluation and display.
    • AI Gateway-enhanced inference tables: AI Gateway inference tables are automatically enabled for deployed agents, providing access to detailed request log metadata.

To learn how to create a ResponsesAgent, see the examples in the following section and MLflow documentation - ResponsesAgent for Model Serving.

ResponsesAgent examples

The following notebooks show how to author streaming and non-streaming ResponsesAgent using popular libraries. To learn how to expand the capabilities of these agents, see AI agent tools.

LangGraph

LangGraph tool-calling agent

Get notebook

OpenAI

OpenAI tool-calling agent using Databricks-hosted models

Get notebook

OpenAI tool-calling agent using OpenAI-hosted models

Get notebook

Multi-agent example

To learn how to create a multi-agent system, see Use Genie in multi-agent systems.

What if I already have an agent?

If you already have an agent built with LangChain, LangGraph, or a similar framework, you don’t need to rewrite your agent to use it on Databricks. Instead, just wrap your existing agent with the MLflow ResponsesAgent interface:

  1. Write a Python wrapper class that inherits from mlflow.pyfunc.ResponsesAgent.

    Inside the wrapper class, reference the existing agent as an attribute self.agent = your_existing_agent.

  2. The ResponsesAgent class requires implementing a predict method that returns a ResponsesAgentResponse to handle non-streaming requests. The following is an example of the ResponsesAgentResponses schema:

    import uuid
    # input as a dict
    {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
    
    # output example
    ResponsesAgentResponse(
        output=[
            {
                "type": "message",
                "id": str(uuid.uuid4()),
                "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
                "role": "assistant",
            },
        ]
    )
    
  3. In the predict function, convert the incoming messages from ResponsesAgentRequest into the format the agent expects. After the agent generates a response, convert its output to a ResponsesAgentResponse object.

See the following code examples to see how to convert existing agents to ResponsesAgent:

Basic conversion

For non-streaming agents, convert inputs and outputs in the predict function.

from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)


class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)

        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)

        # Return the response
        return ResponsesAgentResponse(output=[output_item])

Streaming with code re-use

For streaming agents, you can be clever and reuse logic to avoid duplicating the code that converts messages:

from typing import Generator
from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk

        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )

For complete examples, see ResponsesAgent examples.

Streaming responses

Streaming allows agents to send responses in real-time chunks instead of waiting for the complete response. To implement streaming with ResponsesAgent, emit a series of delta events followed by a final completion event:

  1. Emit delta events: Send multiple output_text.delta events with the same item_id to stream text chunks in real-time.
  2. Finish with done event: Send a final response.output_item.done event with the same item_id as the delta events containing the complete final output text.

Each delta event streams a chunk of text to the client. The final done event contains the complete response text and signals Databricks to do the following:

  • Trace your agent's output with MLflow tracing
  • Aggregate streamed responses in AI Gateway inference tables
  • Show the complete output in the AI Playground UI

Streaming error propagation

Mosaic AI propagates any errors encountered while streaming with the last token under databricks_output.error. It is up to the calling client to properly handle and surface this error.

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

Advanced features

Custom inputs and outputs

Some scenarios might require additional agent inputs, such as client_type and session_id, or outputs like retrieval source links that should not be included in the chat history for future interactions.

For these scenarios, MLflow ResponsesAgent natively supports the fields custom_inputs and custom_outputs. You can access the custom inputs via request.custom_inputs in all examples linked above in ResponsesAgent Examples.

Warning

The Agent Evaluation review app does not support rendering traces for agents with additional input fields.

See the following notebooks to learn how to set custom inputs and outputs.

Provide custom_inputs in the AI Playground and review app

If your agent accepts additional inputs using the custom_inputs field, you can manually provide these inputs in both the AI Playground and the review app.

  1. In either the AI Playground or the Agent Review App, select the gear icon Gear icon..

  2. Enable custom_inputs.

  3. Provide a JSON object that matches your agent's defined input schema.

    Provide custom_inputs in the AI playground.

Specify custom retriever schemas

AI agents commonly use retrievers to find and query unstructured data from vector search indices. For example retriever tools, see Build and trace retriever tools for unstructured data.

Trace these retrievers within your agent with MLflow RETRIEVER spans to enable Databricks product features, including:

  • Automatically displaying links to retrieved source documents in the AI Playground UI
  • Automatically running retrieval groundedness and relevance judges in Agent Evaluation

Note

Databricks recommends using retriever tools provided by Databricks AI Bridge packages like databricks_langchain.VectorSearchRetrieverTool and databricks_openai.VectorSearchRetrieverTool because they already conform to the MLflow retriever schema. See Locally develop Vector Search retriever tools with AI Bridge.

If your agent includes retriever spans with a custom schema, call mlflow.models.set_retriever_schema when you define your agent in code. This maps your retriever's output columns to MLflow's expected fields (primary_key, text_column, doc_uri).

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

Note

The doc_uri column is especially important when evaluating the retriever's performance. doc_uri is the main identifier for documents returned by the retriever, allowing you to compare them against ground truth evaluation sets. See Evaluation sets (MLflow 2).

Deployment considerations

Prepare for Databricks Model Serving

Databricks deploys ResponsesAgents in a distributed environment on Databricks Model Serving. This means that during a multi-turn conversation, the same serving replica might not handle all requests. Pay attention to the following implications for managing agent state:

  • Avoid local caching: When deploying a ResponsesAgent, don't assume the same replica handles all requests in a multi-turn conversation. Reconstruct internal state using a dictionary ResponsesAgentRequest schema for each turn.

  • Thread-safe state: Design agent state to be thread-safe, preventing conflicts in multi-threaded environments.

  • Initialize state in the predict function: Initialize state each time the predict function is called, not during ResponsesAgent initialization. Storing state at the ResponsesAgent level could leak information between conversations and cause conflicts because a single ResponsesAgent replica could handle requests from multiple conversations.

Parametrize code for deployment across environments

Parametrize agent code to reuse the same agent code across different environments.

Parameters are key-value pairs that you define in a Python dictionary or a .yaml file.

To configure the code, create a ModelConfig using either a Python dictionary or a .yaml file. ModelConfig is a set of key-value parameters that allows for flexible configuration management. For example, you can use a dictionary during development and then convert it to a .yaml file for production deployment and CI/CD.

For details about ModelConfig, see the MLflow documentation.

An example ModelConfig is shown below:

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

In your agent code, you can reference a default (development) configuration from the .yaml file or dictionary:

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

Then, when logging your agent, specify the model_config parameter to log_model to specify a custom set of parameters to use when loading the logged agent. See MLflow documentation - ModelConfig.

Use synchronous code or call-back patterns

To ensure stability and compatibility, use synchronous code or callback-based patterns in your agent implementation.

Azure Databricks automatically manages asynchronous communication to provide optimal concurrency and performance when you deploy an agent. Introducing custom event loops or async frameworks might lead to errors like RuntimeError: This event loop is already running and caused unpredictable behavior.

Azure Databricks recommends avoiding asynchronous programming, such as using asyncio or creating custom event loops, when developing agents.

Next steps