Taskinity Flow Example

Flow Definition


flow EmailProcessing:
    description: "Email Processing Flow"
    fetch_emails -> classify_emails
    classify_emails -> process_urgent_emails
    classify_emails -> process_regular_emails
    process_urgent_emails -> send_responses
    process_regular_emails -> send_responses
  

Python Implementation


from taskinity import task, run_flow_from_dsl

@task(name="Fetch Emails")
def fetch_emails(server, username, password):
    # Implementation
    return ["Email 1", "Email 2"]

@task(name="Classify Emails")
def classify_emails(emails):
    # Implementation
    urgent = [e for e in emails if "URGENT" in e]
    regular = [e for e in emails if "URGENT" not in e]
    return {"urgent_emails": urgent, "regular_emails": regular}

@task(name="Process Urgent Emails")
def process_urgent_emails(urgent_emails):
    # Implementation
    return ["Response to urgent email" for _ in urgent_emails]

@task(name="Process Regular Emails")
def process_regular_emails(regular_emails):
    # Implementation
    return ["Response to regular email" for _ in regular_emails]

@task(name="Send Responses")
def send_responses(process_urgent_emails_result, process_regular_emails_result):
    # Implementation
    return {"sent": len(process_urgent_emails_result) + len(process_regular_emails_result)}

# Define flow using DSL
flow_dsl = """
flow EmailProcessing:
    description: "Email Processing Flow"
    fetch_emails -> classify_emails
    classify_emails -> process_urgent_emails
    classify_emails -> process_regular_emails
    process_urgent_emails -> send_responses
    process_regular_emails -> send_responses
"""

# Run the flow
results = run_flow_from_dsl(flow_dsl, {
    "server": "imap.example.com",
    "username": "user@example.com",
    "password": "password123"
})