Taskinity Flow Example
Flow Definition
flow EmailProcessing:
description: "Email Processing Flow"
fetch_emails -> classify_emails
classify_emails -> process_urgent_emails
classify_emails -> process_regular_emails
process_urgent_emails -> send_responses
process_regular_emails -> send_responses
Python Implementation
from taskinity import task, run_flow_from_dsl
@task(name="Fetch Emails")
def fetch_emails(server, username, password):
# Implementation
return ["Email 1", "Email 2"]
@task(name="Classify Emails")
def classify_emails(emails):
# Implementation
urgent = [e for e in emails if "URGENT" in e]
regular = [e for e in emails if "URGENT" not in e]
return {"urgent_emails": urgent, "regular_emails": regular}
@task(name="Process Urgent Emails")
def process_urgent_emails(urgent_emails):
# Implementation
return ["Response to urgent email" for _ in urgent_emails]
@task(name="Process Regular Emails")
def process_regular_emails(regular_emails):
# Implementation
return ["Response to regular email" for _ in regular_emails]
@task(name="Send Responses")
def send_responses(process_urgent_emails_result, process_regular_emails_result):
# Implementation
return {"sent": len(process_urgent_emails_result) + len(process_regular_emails_result)}
# Define flow using DSL
flow_dsl = """
flow EmailProcessing:
description: "Email Processing Flow"
fetch_emails -> classify_emails
classify_emails -> process_urgent_emails
classify_emails -> process_regular_emails
process_urgent_emails -> send_responses
process_regular_emails -> send_responses
"""
# Run the flow
results = run_flow_from_dsl(flow_dsl, {
"server": "imap.example.com",
"username": "user@example.com",
"password": "password123"
})