We’ll run a task to get some information about suppliers of blast furnaces. We’ll ask Twin to find on Alibaba the product according to our specification.

Access the cookbook in Github

import requests
import time

twin_key = "<your api key>"
browse_endpoint = "https://api.twin.so/browse"
task_endpoint = "http://api.twin.so/task/"

def wait_for_tasks(task_ids):
    outputs = []
    latest_actions = {task_id: None for task_id in task_ids}
    remaining_tasks = set(task_ids)

    while remaining_tasks:
        for task_id in list(remaining_tasks):
            response = requests.get(url=task_endpoint + task_id + "?limit=1", headers={"x-api-key": twin_key})
            data = response.json()
            
            if data["status"] in {"COMPLETED", "FAILED"}:
                outputs.append(data["output"])
                remaining_tasks.remove(task_id)
            elif data["steps"] and data["steps"][0]["action"] and latest_actions[task_id] != data["steps"][0]["action"]:
                latest_actions[task_id] = data["steps"][0]["action"]
                print(f"Task {task_id}: {latest_actions[task_id]}")
        
        time.sleep(1)
    
    return outputs
def launch_alibaba_task(product_description):
    response = requests.post(
        url=browse_endpoint,
        headers={"x-api-key": twin_key},
        json={
            "goal": f"Find on Alibaba the product according to our specification, and return the URL of the product page: {product_description}",
            "startUrl": "https://www.alibaba.com/",
            "outputType": "url",
        },
    )
    
    return response.json()["taskId"]

description = "an electric scooter that goes at least 40km with a single charge"
task_id = launch_alibaba_task(description)
product_page = wait_for_tasks([task_id])

print(product_page)