你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

触发机器学习管道

适用于:适用于 Python 的 Azure 机器学习 SDK v1

重要说明

本文提供有关使用 Azure 机器学习 SDK v1 的信息。 SDK v1 自 2025 年 3 月 31 日起弃用。 对它的支持将于 2026 年 6 月 30 日结束。 可以在该日期之前安装和使用 SDK v1。

建议在 2026 年 6 月 30 日之前过渡到 SDK v2。 有关 SDK v2 的详细信息,请参阅 什么是 Azure 机器学习 CLI 和 Python SDK v2? 以及 SDK v2 参考

本文介绍如何以编程方式计划管道,使其在 Azure 上运行。 你可以基于运行时间或文件系统更改来创建计划。 可以使用基于时间的计划来完成例行任务,例如监视数据偏移。 可以使用基于更改的计划来响应不规则或不可预知的更改,例如正在上传的新数据或正在编辑的旧数据。

了解如何创建计划后,你将了解如何检索和停用它们。 最后,你将了解如何使用其他 Azure 服务、Azure 逻辑应用和 Azure 数据工厂来运行管道。 逻辑应用可实现更复杂的触发逻辑或行为。 使用 Azure 数据工厂管道,你可以将机器学习管道作为更大的数据业务流程管道的一部分进行调用。

先决条件

获取所需值

若要计划管道,需要引用工作区、已发布管道的标识符以及要在其中创建计划的试验的名称。 可以使用以下代码获取这些值:

import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:
    print(experiment.name)

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")

experiment_name = "MyExperiment" 
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" 

创建计划

若要定期运行管道,请创建计划。 Schedule 与管道、试验和触发器关联。 触发器可以是 ScheduleRecurrence(用于定义两次作业之间的等待时间),也可以是数据存储路径(用于指定要监视更改的目录)。 不管哪种情况,都需要管道标识符,以及要在其中创建计划的试验的名称。

在 Python 文件的顶部,导入 ScheduleScheduleRecurrence 类:


from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

创建基于时间的计划

构造函数ScheduleRecurrence的必需参数frequency必须设置为以下字符串之一:"Minute""Hour""Day""Week""Month"。 它还需要一个整数 interval 参数,该参数指定开始时间之间应经过多少 frequency 个单位。 可选参数使你可以更具体地了解开始时间,如 ScheduleRecurrence 文档中所述。

创建每 15 分钟开始一次作业的 Schedule

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",
                            pipeline_id=pipeline_id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

创建基于更改的计划

由文件更改触发的管道比基于时间的计划更有效。 如果要在文件更改前执行操作,或者要在将新文件添加到数据目录时执行操作,可以预处理该文件。 可以监视对数据存储的任何更改,也可以监视数据存储中特定目录中的更改。 如果监视特定目录,该目录的子目录中的更改不会触发作业。

注意

基于更改的计划仅支持监视 Azure Blob 存储。

若要创建文件响应式 Schedule,需在对 datastore 的调用中设置 参数。 若要监视文件夹,请设置 path_on_datastore 参数。

polling_interval 参数使你可以指定数据存储检查更改的频率(以分钟为单位)。

如果管道是使用 DataPathPipelineParameter 构造的,则可通过设置 data_path_parameter_name 参数将该变量设置为已更改文件的名称。

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")

用于创建计划的可选参数

除了前面讨论的参数外,还可以将参数status设置为"Disabled"创建非活动计划。 此功能使 continue_on_step_failure 能够传递一个布尔值,以覆盖管道的默认失败行为。

查看计划的管道

在浏览器中,转到 Azure 机器学习工作室。 在左窗格中,选择“终结点”图标。 在 “终结点 ”窗格中,选择 “实时终结点”。 将转到在工作区中发布的管道的列表。

显示“终结点”窗格的屏幕截图。

在此页上,可以看到有关工作区中所有管道的摘要信息:名称、说明、状态等。 可以通过选择管道的名称来获取详细信息。 在生成的页面上,还可以获取有关单个作业的信息。

停用管道

如果你的 Pipeline 已发布但尚未计划,则可使用以下代码禁用它:

pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()

如果管道已计划,则需先取消计划。 从门户中或通过运行以下代码检索日程表的标识符:

ss = Schedule.list(ws)
for s in ss:
    print(s)

获得要禁用的计划的 schedule_id 后,请运行以下代码:

def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

stop_by_schedule_id(ws, schedule_id)

如果此后再次运行 Schedule.list(ws),应会得到一个空列表。

将逻辑应用用于复杂触发器

可以使用 逻辑应用创建更复杂的触发器规则或行为。

若要使用逻辑应用触发机器学习管道,需要已发布机器学习管道的 REST 终结点。 创建并发布管道。 然后,使用管道 ID 查找 PublishedPipeline 的 REST 终结点:

# You can find the pipeline ID in Azure Machine Learning studio

published_pipeline = PublishedPipeline.get(ws, id="<pipeline-id-here>")
published_pipeline.endpoint 

在 Azure 中创建逻辑应用

现在创建 一个逻辑应用。 预配逻辑应用后,按照以下步骤为管道配置触发器:

  1. 创建系统分配的托管标识 ,以授予应用对 Azure 机器学习工作区的访问权限。

  2. 在左窗格中,选择“开发工具”部分中的“逻辑应用模板”。

  3. 选择 空白工作流 模板:

    显示空白逻辑应用模板按钮的屏幕截图。

  4. 在设计器中,选择“ 添加触发器”。

  5. “添加触发器”窗格中,搜索Blob。 选择“添加或修改 blob 时(仅属性)”触发器。

    显示如何将触发器添加到逻辑应用的屏幕截图。

  6. 在“创建连接”窗格中,提供要监视 blob 添加或修改情况的 Blob 存储帐户的连接信息,然后选择“新建”。 选择要监视的容器。

    选择适合你的 间隔频率 值。

    注意

    此触发器将监视所选容器,但不监视子文件夹。

  7. 添加一个 HTTP 操作,当 Blob 发生更改或检测到新 Blob 时运行。 选择触发器下的加号(+),然后选择 添加动作

  8. “添加操作” 窗格中,选择 HTTP 操作。 如果没看到它,可以搜索一下。

    展示如何添加 HTTP 动作的屏幕截图。

  9. 在生成的窗格中,选择 “HTTP”。

    使用以下设置来配置操作:

    设置 价值
    URI 已发布管道的终结点。 请参阅 先决条件
    方法 POST
    身份验证类型 (在 “高级设置”下) 托管标识
  10. 安排你的时间表,以设置你拥有的任何 DataPath PipelineParameters 的值。

    {
      "DataPathAssignments": {
        "input_datapath": {
          "DataStoreName": "<datastore-name>",
          "RelativePath": "@{triggerBody()?['Name']}" 
        }
      },
      "ExperimentName": "MyRestPipeline",
      "ParameterAssignments": {
        "input_string": "sample_string3"
      },
      "RunSource": "SDK"
    }
    

    将已添加到工作区的 DataStoreName 用作先决条件

    显示 HTTP 设置的屏幕截图。

  11. 选择“保存”

重要说明

如果使用 Azure 基于角色的访问控制(Azure RBAC)来管理对管道的访问, 请为管道方案(训练或评分)设置权限

从 Azure 数据工厂管道调用机器学习管道

在 Azure 数据工厂管道中,“机器学习执行管道”活动运行 Azure 机器学习管道。 可以在菜单中 的“机器学习 ”下的“Azure 数据工厂创作”页上找到此活动:

显示 Azure 数据工厂创作环境中的机器学习管道活动的屏幕截图。

后续步骤

在本文中,你使用了适用于 Python 的 Azure 机器学习 SDK 以两种不同的方式计划管道。 一个计划是根据已用时钟时间触发的。 另一个计划是在指定的 Datastore 上或该存储的目录中修改了文件的情况下触发的。 你了解了如何使用门户来检查管道和单次作业。 你了解了如何禁用计划以使管道停止运行。 最后,你创建了一个 Azure 逻辑应用来触发管道。

以下文章提供更多信息: