你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
仅适用于:Foundry(经典版)门户。 本文不适用于新的 Foundry 门户。
详细了解新门户。
注释
本文中的链接可能会打开新 Microsoft Foundry 文档中的内容,而不是你现在正在查看的 Foundry (经典)文档。
使用本文了解如何在 Azure OpenAI 中操作计算机使用功能。 计算机使用是一种专用 AI 工具,它使用专用模型,可以通过其 UI 与计算机系统和应用程序交互来执行任务。 利用计算机,你可以创建一个能够处理复杂任务并通过解释视觉元素来根据屏幕内容做出决策的代理。
计算机使用可提供:
- 自治导航:例如,打开应用程序、单击按钮、填写表单和导航多页工作流。
- 动态适应:解释 UI 更改并相应地调整动作。
- 跨应用程序任务执行:跨基于 Web 的应用程序和桌面应用程序运行。
- 自然语言界面:用户可以以纯语言描述任务,计算机使用模型确定要执行的正确的 UI 交互。
请求访问权限
若要访问 gpt-5.4 模型,需要注册,并根据Microsoft的资格条件授予访问权限。 有权访问其他受限访问模型的客户仍需为此模型请求访问权限。
请求访问:gpt-5.4 受限访问模型申请
获得访问权限后,您需要为模型创建一个部署。
使用Responses API向电脑使用模型发送API调用
计算机使用工具通过 响应 API 进行访问。 该工具在循环中运行,发送诸如键入文本或执行单击之类的操作。 代码在计算机上执行这些作,并将结果的屏幕截图发送到模型。
这样,代码将使用计算机界面模拟人工的作,而模型使用屏幕截图来了解环境状态并建议下一步作。
以下示例演示基本 API 调用:
若要发送请求,需要安装以下Python包。
pip install openai
pip install azure-identity
import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import OpenAI
import json
token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://ai.azure.com/.default")
client = OpenAI(
base_url = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/",
api_key=token_provider,
)
response = client.responses.create(
model="gpt-5.4", # set this to your model deployment name
tools=[{"type": "computer"}],
input=[
{
"role": "user",
"content": "Check the latest AI news on bing.com."
}
],
)
print(json.dumps(response.model_dump(), indent=2))
输出
{
"id": "resp_068b0022b159a6710069b0d44d97848195911e2ff69ff500fa",
"created_at": 1773196365.0,
"error": null,
"incomplete_details": null,
"instructions": null,
"metadata": {},
"model": "gpt-5.4",
"object": "response",
"output": [
{
"id": "msg_068b0022b159a6710069b0d44ede1881959e2d1deefe9f8676",
"content": [
{
"annotations": [],
"text": "I\u2019ll open Bing, look for current AI news, and summarize the latest headlines I find.",
"type": "output_text",
"logprobs": []
}
],
"role": "assistant",
"status": "completed",
"type": "message",
"phase": "commentary"
},
{
"id": "cu_068b0022b159a6710069b0d45008448195980f77beaa9cec83",
"action": null,
"call_id": "call_4y94crSZe0elpGhdiiwjLpa0",
"pending_safety_checks": null,
"status": "completed",
"type": "computer_call",
"actions": [
{
"type": "screenshot"
}
]
}
],
"parallel_tool_calls": true,
"temperature": 1.0,
"tool_choice": "auto",
"tools": [
{
"name": null,
"parameters": null,
"strict": null,
"type": "computer",
"description": null
}
],
"top_p": 0.98,
"background": false,
"conversation": null,
"max_output_tokens": null,
"max_tool_calls": null,
"previous_response_id": null,
"prompt": null,
"prompt_cache_key": null,
"reasoning": {
"effort": "none",
"generate_summary": null,
"summary": null
},
"safety_identifier": null,
"service_tier": "default",
"status": "completed",
"text": {
"format": {
"type": "text"
},
"verbosity": "medium"
},
"top_logprobs": 0,
"truncation": "disabled",
"usage": {
"input_tokens": 820,
"input_tokens_details": {
"cached_tokens": 0
},
"output_tokens": 40,
"output_tokens_details": {
"reasoning_tokens": 16
},
"total_tokens": 860
},
"user": null,
"completed_at": 1773196368,
"content_filters": [
{Removed from example output}
],
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"prompt_cache_retention": null,
"store": true
}
发送初始 API 请求后,您可以在应用程序代码中执行一个循环,其中执行指定的操作,并在每一轮中发送一个屏幕截图,以便模型可以评估环境的更新状态。
## response.output is the previous response from the model
computer_calls = [item for item in response.output if item.type == "computer_call"]
if not computer_calls:
print("No computer call found. Output from model:")
for item in response.output:
print(item)
computer_call = computer_calls[0]
last_call_id = computer_call.call_id
actions = computer_call.actions # actions is now a batched array
# Your application would now perform each action in the actions[] array, in order
# And create a screenshot of the updated state of the environment before sending another response
response_2 = client.responses.create(
model="gpt-5.4",
previous_response_id=response.id,
tools=[{"type": "computer"}],
input=[
{
"call_id": last_call_id,
"type": "computer_call_output",
"output": {
"type": "computer_screenshot",
# Image should be in base64
"image_url": f"data:image/png;base64,{<base64_string>}",
"detail": "original"
}
}
],
)
了解计算机使用的整合
使用计算机使用工具时,通常会执行以下作以将其集成到应用程序中。
- 向包含对计算机使用工具的调用的模型发送请求。 还可以在第一个 API 请求中包含环境的初始状态的屏幕截图。
- 从模型接收响应。 如果响应具有
actions数组,则这些项包含建议的作以朝着指定目标前进。 例如,操作可能是screenshot,以便模型可以使用更新的屏幕截图来评估当前状态,或者通过click提供X/Y坐标来指示鼠标应移动的位置。 - 在计算机或浏览器环境中使用应用程序代码执行该作。
- 执行动作后,将环境的更新状态捕捉为屏幕截图。
- 发送状态更新为
computer_call_output的新请求,并重复此循环,直到模型停止请求操作或您决定停止。
处理对话历史记录
可以使用 previous_response_id 参数将当前请求链接到以前的响应。 如果不想管理聊天历史记录,建议使用此参数。
如果不使用此参数,应确保包含输入数组中上一请求的响应输出中返回的所有项。 这包括推理内容(如果存在)。
安全检查
API 具有安全检查来帮助防止提示注入和模型错误。 这些检查包括:
- 恶意指令检测:系统将评估屏幕截图图像,并检查它是否包含可能会更改模型行为的对抗内容。
- 无关域检测:系统会评估当前域,并检查它是否被视为与会话历史记录相关的域。
- 敏感域检测:系统检查当前域,并在检测到用户位于敏感域时引发警告。
如果触发了上述一个或多个检查,则当模型返回下一个 computer_call 时,将进行安全检查,其中包含 pending_safety_checks 参数。
"output": [
{
"type": "reasoning",
"id": "rs_67cb...",
"summary": [
{
"type": "summary_text",
"text": "Exploring 'File' menu option."
}
]
},
{
"type": "computer_call",
"id": "cu_67cb...",
"call_id": "call_nEJ...",
"actions": [
{
"type": "click",
"button": "left",
"x": 135,
"y": 193
}
],
"pending_safety_checks": [
{
"id": "cu_sc_67cb...",
"code": "malicious_instructions",
"message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
}
],
"status": "completed"
}
]
在下一个请求中,你需要将该安全检查作为 acknowledged_safety_checks 传回,以便继续操作。
"input":[
{
"type": "computer_call_output",
"call_id": "<call_id>",
"acknowledged_safety_checks": [
{
"id": "<safety_check_id>",
"code": "malicious_instructions",
"message": "We've detected instructions that may cause your application to perform malicious or unauthorized actions. Please acknowledge this warning if you'd like to proceed."
}
],
"output": {
"type": "computer_screenshot",
"image_url": "<image_url>"
}
}
],
安全检查处理流程
在所有返回 pending_safety_checks 的情况中,应将操作移交给终端用户,以确认模型行为和准确性。
-
malicious_instructions和irrelevant_domain:最终用户应查看模型作,并确认模型的行为是否按预期方式运行。 -
sensitive_domain:确保最终用户正在积极地监视这些站点上的模型操作。 此“监视模式”的确切实现可能因应用程序而异,但可能的示例可能是收集网站上的用户印象数据,以确保有活动的最终用户参与应用程序。
Playwright 集成
在本部分中,我们提供了一个简单的示例脚本,该脚本将 Azure openAI 的 gpt-5.4 模型与 Playwright 集成,以自动执行基本浏览器交互。 将模型与 Playwright 结合使用可让模型查看浏览器屏幕、做出决策,以及执行点击、输入和导航网站等操作。 运行此示例代码时,应谨慎行事。 此代码旨在在本地运行,但只能在测试环境中执行。 使用人工确认决策,不向模型提供敏感数据access。
首先,需要安装适用于 Playwright 的 Python 库。
pip install playwright
安装软件包后,你还需要运行
playwright install
导入和配置
首先,导入必要的库并定义配置参数。 由于我们正在使用 asyncio,因此将在 Jupyter 笔记本之外执行此代码。 我们将先在区块中演练代码,然后演示如何使用该代码。
import os
import asyncio
import base64
from openai import OpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from playwright.async_api import async_playwright, TimeoutError
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://ai.azure.com/.default"
)
# Configuration
BASE_URL = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/"
MODEL = "gpt-5.4" # Set to model deployment name
DISPLAY_WIDTH = 1440
DISPLAY_HEIGHT = 900
ITERATIONS = 5 # Max number of iterations before returning control to human supervisor
注释
建议使用 1440x900 或 1600x900 的显示分辨率,以优化计算机使用模型的点击精确性。
浏览器交互的键映射
接下来,我们为模型可能需要传递给 Playwright 的特殊键设置映射。 最终,该模型永远不会执行操作本身,它会传递命令的表示形式,并且你必须提供最终集成层,这些集成层可以采用这些命令并在所选环境中执行它们。
此处并未详尽列出所有可能的键映射。 可以根据需要展开此列表。 此词典专用于将模型与 Playwright 集成。 如果您将模型与备用库集成,以便为操作系统的键盘/鼠标提供API访问,则需要提供特定于该库的映射。
# Key mapping for special keys in Playwright
# Supports multiple common spellings for each key (case-insensitive)
KEY_MAPPING = {
"/": "Slash", "\\": "Backslash",
"alt": "Alt", "option": "Alt",
"arrowdown": "ArrowDown", "down": "ArrowDown",
"arrowleft": "ArrowLeft", "left": "ArrowLeft",
"arrowright": "ArrowRight", "right": "ArrowRight",
"arrowup": "ArrowUp", "up": "ArrowUp",
"backspace": "Backspace",
"ctrl": "Control", "control": "Control",
"cmd": "Meta", "command": "Meta", "meta": "Meta", "win": "Meta", "super": "Meta",
"delete": "Delete",
"enter": "Enter", "return": "Return",
"esc": "Escape", "escape": "Escape",
"shift": "Shift",
"space": " ",
"tab": "Tab",
"pagedown": "PageDown", "pageup": "PageUp",
"home": "Home", "end": "End",
"insert": "Insert",
"f1": "F1", "f2": "F2", "f3": "F3", "f4": "F4",
"f5": "F5", "f6": "F6", "f7": "F7", "f8": "F8",
"f9": "F9", "f10": "F10", "f11": "F11", "f12": "F12"
}
此字典将按键名称转换为 Playwright 的键盘 API 所需的格式。 每个键都支持多个常见拼写(例如,CTRL 和 CONTROL 都映射到 Control)。
坐标验证
为了确保从模型传递的任何鼠标作都保留在浏览器窗口边界内,我们将添加以下实用工具函数:
def validate_coordinates(x, y):
"""Ensure coordinates are within display bounds."""
return max(0, min(x, DISPLAY_WIDTH)), max(0, min(y, DISPLAY_HEIGHT))
操作处理
浏览器自动化的核心是动作处理程序,它处理各种类型的用户交互并将其转换为浏览器中的动作。
actions[] 数组中的操作以纯字典形式返回。
async def handle_action(page, action):
"""Handle different action types from the model."""
action_type = action.get("type")
if action_type == "click":
button = action.get("button", "left")
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: click at ({x}, {y}) with button '{button}'")
if button == "back":
await page.go_back()
elif button == "forward":
await page.go_forward()
elif button == "wheel":
await page.mouse.wheel(x, y)
else:
button_type = {"left": "left", "right": "right", "middle": "middle"}.get(button, "left")
await page.mouse.click(x, y, button=button_type)
try:
await page.wait_for_load_state("domcontentloaded", timeout=3000)
except TimeoutError:
pass
elif action_type == "double_click":
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: double click at ({x}, {y})")
await page.mouse.dblclick(x, y)
elif action_type == "drag":
path = action.get("path", [])
if len(path) < 2:
print("\tAction: drag requires at least 2 points. Skipping.")
return
start = path[0]
sx, sy = validate_coordinates(start.get("x", 0), start.get("y", 0))
print(f"\tAction: drag from ({sx}, {sy}) through {len(path) - 1} points")
await page.mouse.move(sx, sy)
await page.mouse.down()
for point in path[1:]:
px, py = validate_coordinates(point.get("x", 0), point.get("y", 0))
await page.mouse.move(px, py)
await page.mouse.up()
elif action_type == "move":
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: move to ({x}, {y})")
await page.mouse.move(x, y)
elif action_type == "scroll":
scroll_x = action.get("scroll_x", 0)
scroll_y = action.get("scroll_y", 0)
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: scroll at ({x}, {y}) with offsets ({scroll_x}, {scroll_y})")
await page.mouse.move(x, y)
await page.evaluate(f"window.scrollBy({{left: {scroll_x}, top: {scroll_y}, behavior: 'smooth'}});")
elif action_type == "keypress":
keys = action.get("keys", [])
print(f"\tAction: keypress {keys}")
mapped_keys = [KEY_MAPPING.get(key.lower(), key) for key in keys]
if len(mapped_keys) > 1:
# For key combinations (like Ctrl+C)
for key in mapped_keys:
await page.keyboard.down(key)
await asyncio.sleep(0.1)
for key in reversed(mapped_keys):
await page.keyboard.up(key)
else:
for key in mapped_keys:
await page.keyboard.press(key)
elif action_type == "type":
text = action.get("text", "")
print(f"\tAction: type text: {text}")
await page.keyboard.type(text, delay=20)
elif action_type == "wait":
ms = action.get("ms", 1000)
print(f"\tAction: wait {ms}ms")
await asyncio.sleep(ms / 1000)
elif action_type == "screenshot":
print("\tAction: screenshot")
else:
print(f"\tUnrecognized action: {action_type}")
屏幕截图捕获
为了使模型能够查看它与其他模型交互的内容,需要一种方法来捕获屏幕截图。 对于此代码,我们使用 Playwright 捕获屏幕截图,我们将视图限制为仅浏览器窗口中的内容。 屏幕截图不包括 URL 栏或浏览器 GUI 的其他方面。 如果需要模型在主浏览器窗口外查看,可以通过创建自己的屏幕截图函数来增强模型。
async def take_screenshot(page):
"""Take a screenshot and return base64 encoding with caching for failures."""
global last_successful_screenshot
try:
screenshot_bytes = await page.screenshot(full_page=False)
last_successful_screenshot = base64.b64encode(screenshot_bytes).decode("utf-8")
return last_successful_screenshot
except Exception as e:
print(f"Screenshot failed: {e}")
print(f"Using cached screenshot from previous successful capture")
if last_successful_screenshot:
return last_successful_screenshot
此函数将当前浏览器状态捕获为图像,并将其作为 base64 编码的字符串返回,以可供发送到模型。 在每个步骤之后,我们会在循环中不断重复此操作,使模型可以查看它尝试执行的命令是否成功,从而根据屏幕截图的内容进行调整。 我们可以让模型决定是否需要获取屏幕截图,但为简单起见,我们将强制为每个迭代执行屏幕截图。
模型响应处理
此函数会处理模型的响应并执行请求的操作:
async def process_model_response(client, response, page, max_iterations=ITERATIONS):
"""Process the model's response and execute actions."""
for iteration in range(max_iterations):
if not response.output:
print("No output from model.")
break
response_id = response.id
print(f"\nIteration {iteration + 1} - Response ID: {response_id}\n")
# Print text responses and reasoning
for item in response.output:
if item.type == "text":
print(f"\nModel message: {item.text}\n")
if item.type == "reasoning" and item.summary:
print("=== Model Reasoning ===")
for summary in item.summary:
if hasattr(summary, 'text') and summary.text.strip():
print(summary.text)
print("=====================\n")
# Extract computer calls
computer_calls = [item for item in response.output if item.type == "computer_call"]
if not computer_calls:
print("No computer call found in response. Reverting control to human operator")
break
computer_call = computer_calls[0]
call_id = computer_call.call_id
actions = computer_call.actions # actions is a batched array of dicts
# Handle safety checks
acknowledged_checks = []
if computer_call.pending_safety_checks:
pending_checks = computer_call.pending_safety_checks
print("\nSafety checks required:")
for check in pending_checks:
print(f"- {check.code}: {check.message}")
if input("\nDo you want to proceed? (y/n): ").lower() != 'y':
print("Operation cancelled by user.")
break
acknowledged_checks = pending_checks
# Execute all actions in the batch, in order
try:
await page.bring_to_front()
for action in actions:
await handle_action(page, action)
# Check if a new page was created after a click action
if action.get("type") == "click":
await asyncio.sleep(1.5)
all_pages = page.context.pages
if len(all_pages) > 1:
newest_page = all_pages[-1]
if newest_page != page and newest_page.url not in ["about:blank", ""]:
print(f"\tSwitching to new tab: {newest_page.url}")
page = newest_page
elif action.get("type") != "wait":
await asyncio.sleep(0.5)
except Exception as e:
print(f"Error handling action: {e}")
import traceback
traceback.print_exc()
# Take a screenshot after the actions
screenshot_base64 = await take_screenshot(page)
print("\tNew screenshot taken")
# Prepare input for the next request
input_content = [{
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "computer_screenshot",
"image_url": f"data:image/png;base64,{screenshot_base64}",
"detail": "original"
}
}]
# Add acknowledged safety checks if any
if acknowledged_checks:
input_content[0]["acknowledged_safety_checks"] = [
{"id": check.id, "code": check.code, "message": check.message}
for check in acknowledged_checks
]
# Send the screenshot back for the next step
try:
response = client.responses.create(
model=MODEL,
previous_response_id=response_id,
tools=[{"type": "computer"}],
input=input_content,
)
print("\tModel processing screenshot")
except Exception as e:
print(f"Error in API call: {e}")
import traceback
traceback.print_exc()
break
if iteration >= max_iterations - 1:
print("Reached maximum number of iterations. Stopping.")
在本部分中,我们添加了以下代码:
- 从模型中提取和显示文本及推理。
- 处理计算机操作调用。
- 处理需要用户确认的潜在安全检查。
- 执行请求的操作(批量处理在字典数组中)。
- 捕获新的屏幕截图。
- 将更新的状态发送回模型并定义
computer该工具。 - 对多个迭代重复此过程。
主要功能
主函数会协调整个过程:
# Initialize OpenAI client
client = OpenAI(
base_url=BASE_URL,
api_key=token_provider,
)
# Initialize Playwright
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(
headless=False,
args=[f"--window-size={DISPLAY_WIDTH},{DISPLAY_HEIGHT}", "--disable-extensions"]
)
context = await browser.new_context(
viewport={"width": DISPLAY_WIDTH, "height": DISPLAY_HEIGHT},
accept_downloads=True
)
page = await context.new_page()
# Navigate to starting page
await page.goto("https://www.bing.com", wait_until="domcontentloaded")
print("Browser initialized to Bing.com")
# Main interaction loop
try:
while True:
print("\n" + "="*50)
user_input = input("Enter a task to perform (or 'exit' to quit): ")
if user_input.lower() in ('exit', 'quit'):
break
if not user_input.strip():
continue
# Take initial screenshot
screenshot_base64 = await take_screenshot(page)
print("\nTake initial screenshot")
# Initial request to the model
response = client.responses.create(
model=MODEL,
tools=[{"type": "computer"}],
instructions = "You are an AI agent with the ability to control a browser. You can control the keyboard and mouse. You take a screenshot after each action to check if your action was successful. Once you have completed the requested task you should stop running and pass back control to your human operator.",
input=[{
"role": "user",
"content": [{
"type": "input_text",
"text": user_input
}, {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}",
"detail": "original"
}]
}],
reasoning={"summary": "concise"},
)
print("\nSending model initial screenshot and instructions")
# Process model actions
await process_model_response(client, response, page)
except Exception as e:
print(f"An error occurred: {e}")
import traceback
traceback.print_exc()
finally:
# Close browser
await context.close()
await browser.close()
print("Browser closed.")
if __name__ == "__main__":
asyncio.run(main())
主函数:
- 初始化 OpenAI 客户端。
- 设置 Playwright 浏览器。
- 从 Bing.com 开始。
- 进入循环来接受用户任务。
- 捕获初始状态。
- 将任务和屏幕截图发送到模型。
- 处理模型的响应。
- 重复操作,直到用户退出。
- 确保浏览器已正确关闭。
完成脚本
注意
此代码是实验性代码,仅用于演示目的。 此代码仅用于说明响应 API 和 gpt-5.4 模型之间的基本流程。 虽然可以在本地计算机上执行此代码,但我们强烈建议在低特权虚拟机上运行此代码,而不access敏感数据。 此代码仅用于基本测试目的。
import os
import asyncio
import base64
from openai import OpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from playwright.async_api import async_playwright, TimeoutError
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://ai.azure.com/.default"
)
# Configuration
BASE_URL = "https://YOUR-RESOURCE-NAME.openai.azure.com/openai/v1/"
MODEL = "gpt-5.4"
DISPLAY_WIDTH = 1440
DISPLAY_HEIGHT = 900
ITERATIONS = 5 # Max number of iterations before forcing the model to return control to the human supervisor
# Key mapping for special keys in Playwright
# Supports multiple common spellings for each key (case-insensitive)
KEY_MAPPING = {
"/": "Slash", "\\": "Backslash",
"alt": "Alt", "option": "Alt",
"arrowdown": "ArrowDown", "down": "ArrowDown",
"arrowleft": "ArrowLeft", "left": "ArrowLeft",
"arrowright": "ArrowRight", "right": "ArrowRight",
"arrowup": "ArrowUp", "up": "ArrowUp",
"backspace": "Backspace",
"ctrl": "Control", "control": "Control",
"cmd": "Meta", "command": "Meta", "meta": "Meta", "win": "Meta", "super": "Meta",
"delete": "Delete",
"enter": "Enter", "return": "Return",
"esc": "Escape", "escape": "Escape",
"shift": "Shift",
"space": " ",
"tab": "Tab",
"pagedown": "PageDown", "pageup": "PageUp",
"home": "Home", "end": "End",
"insert": "Insert",
"f1": "F1", "f2": "F2", "f3": "F3", "f4": "F4",
"f5": "F5", "f6": "F6", "f7": "F7", "f8": "F8",
"f9": "F9", "f10": "F10", "f11": "F11", "f12": "F12"
}
def validate_coordinates(x, y):
"""Ensure coordinates are within display bounds."""
return max(0, min(x, DISPLAY_WIDTH)), max(0, min(y, DISPLAY_HEIGHT))
async def handle_action(page, action):
"""Handle different action types from the model."""
action_type = action.get("type")
if action_type == "click":
button = action.get("button", "left")
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: click at ({x}, {y}) with button '{button}'")
if button == "back":
await page.go_back()
elif button == "forward":
await page.go_forward()
elif button == "wheel":
await page.mouse.wheel(x, y)
else:
button_type = {"left": "left", "right": "right", "middle": "middle"}.get(button, "left")
await page.mouse.click(x, y, button=button_type)
try:
await page.wait_for_load_state("domcontentloaded", timeout=3000)
except TimeoutError:
pass
elif action_type == "double_click":
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: double click at ({x}, {y})")
await page.mouse.dblclick(x, y)
elif action_type == "drag":
path = action.get("path", [])
if len(path) < 2:
print("\tAction: drag requires at least 2 points. Skipping.")
return
start = path[0]
sx, sy = validate_coordinates(start.get("x", 0), start.get("y", 0))
print(f"\tAction: drag from ({sx}, {sy}) through {len(path) - 1} points")
await page.mouse.move(sx, sy)
await page.mouse.down()
for point in path[1:]:
px, py = validate_coordinates(point.get("x", 0), point.get("y", 0))
await page.mouse.move(px, py)
await page.mouse.up()
elif action_type == "move":
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: move to ({x}, {y})")
await page.mouse.move(x, y)
elif action_type == "scroll":
scroll_x = action.get("scroll_x", 0)
scroll_y = action.get("scroll_y", 0)
x, y = validate_coordinates(action.get("x"), action.get("y"))
print(f"\tAction: scroll at ({x}, {y}) with offsets ({scroll_x}, {scroll_y})")
await page.mouse.move(x, y)
await page.evaluate(f"window.scrollBy({{left: {scroll_x}, top: {scroll_y}, behavior: 'smooth'}});")
elif action_type == "keypress":
keys = action.get("keys", [])
print(f"\tAction: keypress {keys}")
mapped_keys = [KEY_MAPPING.get(key.lower(), key) for key in keys]
if len(mapped_keys) > 1:
# For key combinations (like Ctrl+C)
for key in mapped_keys:
await page.keyboard.down(key)
await asyncio.sleep(0.1)
for key in reversed(mapped_keys):
await page.keyboard.up(key)
else:
for key in mapped_keys:
await page.keyboard.press(key)
elif action_type == "type":
text = action.get("text", "")
print(f"\tAction: type text: {text}")
await page.keyboard.type(text, delay=20)
elif action_type == "wait":
ms = action.get("ms", 1000)
print(f"\tAction: wait {ms}ms")
await asyncio.sleep(ms / 1000)
elif action_type == "screenshot":
print("\tAction: screenshot")
else:
print(f"\tUnrecognized action: {action_type}")
async def take_screenshot(page):
"""Take a screenshot and return base64 encoding with caching for failures."""
global last_successful_screenshot
try:
screenshot_bytes = await page.screenshot(full_page=False)
last_successful_screenshot = base64.b64encode(screenshot_bytes).decode("utf-8")
return last_successful_screenshot
except Exception as e:
print(f"Screenshot failed: {e}")
if last_successful_screenshot:
return last_successful_screenshot
async def process_model_response(client, response, page, max_iterations=ITERATIONS):
"""Process the model's response and execute actions."""
for iteration in range(max_iterations):
if not response.output:
print("No output from model.")
break
response_id = response.id
print(f"\nIteration {iteration + 1} - Response ID: {response_id}\n")
# Print text responses and reasoning
for item in response.output:
# Handle text output
if item.type == "text":
print(f"\nModel message: {item.text}\n")
if item.type == "reasoning" and item.summary:
print("=== Model Reasoning ===")
for summary in item.summary:
if hasattr(summary, 'text') and summary.text.strip():
print(summary.text)
print("=====================\n")
# Extract computer calls
computer_calls = [item for item in response.output if item.type == "computer_call"]
if not computer_calls:
print("No computer call found in response. Reverting control to human supervisor")
break
computer_call = computer_calls[0]
call_id = computer_call.call_id
actions = computer_call.actions # actions is a batched array of dicts
# Handle safety checks
acknowledged_checks = []
if computer_call.pending_safety_checks:
pending_checks = computer_call.pending_safety_checks
print("\nSafety checks required:")
for check in pending_checks:
print(f"- {check.code}: {check.message}")
if input("\nDo you want to proceed? (y/n): ").lower() != 'y':
print("Operation cancelled by user.")
break
acknowledged_checks = pending_checks
# Execute all actions in the batch, in order
try:
await page.bring_to_front()
for action in actions:
await handle_action(page, action)
# Check if a new page was created after a click action
if action.get("type") == "click":
await asyncio.sleep(1.5)
all_pages = page.context.pages
if len(all_pages) > 1:
newest_page = all_pages[-1]
if newest_page != page and newest_page.url not in ["about:blank", ""]:
print(f"\tSwitching to new tab: {newest_page.url}")
page = newest_page
elif action.get("type") != "wait":
await asyncio.sleep(0.5)
except Exception as e:
print(f"Error handling action: {e}")
import traceback
traceback.print_exc()
# Take a screenshot after the actions
screenshot_base64 = await take_screenshot(page)
print("\tNew screenshot taken")
# Prepare input for the next request
input_content = [{
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "computer_screenshot",
"image_url": f"data:image/png;base64,{screenshot_base64}",
"detail": "original"
}
}]
# Add acknowledged safety checks if any
if acknowledged_checks:
input_content[0]["acknowledged_safety_checks"] = [
{"id": check.id, "code": check.code, "message": check.message}
for check in acknowledged_checks
]
# Send the screenshot back for the next step
try:
response = client.responses.create(
model=MODEL,
previous_response_id=response_id,
tools=[{"type": "computer"}],
input=input_content,
)
print("\tModel processing screenshot")
except Exception as e:
print(f"Error in API call: {e}")
import traceback
traceback.print_exc()
break
if iteration >= max_iterations - 1:
print("Reached maximum number of iterations. Stopping.")
async def main():
# Initialize OpenAI client
client = OpenAI(
base_url=BASE_URL,
api_key=token_provider
)
# Initialize Playwright
async with async_playwright() as playwright:
browser = await playwright.chromium.launch(
headless=False,
args=[f"--window-size={DISPLAY_WIDTH},{DISPLAY_HEIGHT}", "--disable-extensions"]
)
context = await browser.new_context(
viewport={"width": DISPLAY_WIDTH, "height": DISPLAY_HEIGHT},
accept_downloads=True
)
page = await context.new_page()
# Navigate to starting page
await page.goto("https://www.bing.com", wait_until="domcontentloaded")
print("Browser initialized to Bing.com")
# Main interaction loop
try:
while True:
print("\n" + "="*50)
user_input = input("Enter a task to perform (or 'exit' to quit): ")
if user_input.lower() in ('exit', 'quit'):
break
if not user_input.strip():
continue
# Take initial screenshot
screenshot_base64 = await take_screenshot(page)
print("\nTake initial screenshot")
# Initial request to the model
response = client.responses.create(
model=MODEL,
tools=[{"type": "computer"}],
instructions = "You are an AI agent with the ability to control a browser. You can control the keyboard and mouse. You take a screenshot after each action to check if your action was successful. Once you have completed the requested task you should stop running and pass back control to your human supervisor.",
input=[{
"role": "user",
"content": [{
"type": "input_text",
"text": user_input
}, {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}",
"detail": "original"
}]
}],
reasoning={"summary": "concise"},
)
print("\nSending model initial screenshot and instructions")
# Process model actions
await process_model_response(client, response, page)
except Exception as e:
print(f"An error occurred: {e}")
import traceback
traceback.print_exc()
finally:
# Close browser
await context.close()
await browser.close()
print("Browser closed.")
if __name__ == "__main__":
asyncio.run(main())