We’ll use Twin to find new patents filed in our industry, over the past week. This is a task that we could run periodically and then send a notification over email, Slack, etc., to stay updated on latest advancements in the field. As an example, we’ll use cryptography.
from datetime import datetime, timedelta
import time
import requests
twin_key = "<your api key>"
browse_endpoint = "https://api.twin.so/browse"
task_endpoint = "http://api.twin.so/task/"
patent_websites = [
"https://uspto.gov",
"https://epo.org",
"https://patents.google.com",
]
Helpers
Let’s define some helpers to get the date from which we want to search, and to wait for the tasks for be completed
today = datetime.now().strftime("%Y-%m-%d")
monthago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
def wait_for_tasks(task_ids):
outputs = []
latest_actions = {task_id: None for task_id in task_ids}
remaining_tasks = set(task_ids)
while remaining_tasks:
for task_id in list(remaining_tasks):
response = requests.get(url=task_endpoint + task_id + "?limit=1", headers={"x-api-key": twin_key})
data = response.json()
if data["status"] in {"COMPLETED", "FAILED"}:
outputs.append(data["output"])
remaining_tasks.remove(task_id)
elif data["steps"] and data["steps"][0]["action"] and latest_actions[task_id] != data["steps"][0]["action"]:
latest_actions[task_id] = data["steps"][0]["action"]
print(f"Task {task_id}: {latest_actions[task_id]}")
time.sleep(1)
return outputs
Launch Twin
Let’s now launch Twin to search for patents
def twin_search_patents():
task_ids = []
for website in patent_websites:
response = requests.post(
url=browse_endpoint,
headers={"x-api-key": twin_key},
json={
"goal": f"Find the latest patents filed about cryptography since {monthago}",
"startUrl": website,
}
)
task_ids.append(response.json()["taskId"])
return task_ids
task_ids = twin_search_patents()
print(task_ids)
patents = wait_for_tasks(task_ids)
for patent in patents:
print(patent)
print("\n\n")