We’ll run a task to get some information about suppliers of blast furnaces. We’ll ask Twin to find on Alibaba the product according to our specification.
import requests
import time
twin_key = "<your api key>"
browse_endpoint = "https://api.twin.so/browse"
task_endpoint = "http://api.twin.so/task/"
def wait_for_tasks(task_ids):
outputs = []
latest_actions = {task_id: None for task_id in task_ids}
remaining_tasks = set(task_ids)
while remaining_tasks:
for task_id in list(remaining_tasks):
response = requests.get(url=task_endpoint + task_id + "?limit=1", headers={"x-api-key": twin_key})
data = response.json()
if data["status"] in {"COMPLETED", "FAILED"}:
outputs.append(data["output"])
remaining_tasks.remove(task_id)
elif data["steps"] and data["steps"][0]["action"] and latest_actions[task_id] != data["steps"][0]["action"]:
latest_actions[task_id] = data["steps"][0]["action"]
print(f"Task {task_id}: {latest_actions[task_id]}")
time.sleep(1)
return outputs
def launch_alibaba_task(product_description):
response = requests.post(
url=browse_endpoint,
headers={"x-api-key": twin_key},
json={
"goal": f"Find on Alibaba the product according to our specification, and return the URL of the product page: {product_description}",
"startUrl": "https://www.alibaba.com/",
"outputType": "url",
},
)
return response.json()["taskId"]
description = "an electric scooter that goes at least 40km with a single charge"
task_id = launch_alibaba_task(description)
product_page = wait_for_tasks([task_id])
print(product_page)