Worxflow
Sdks

Python SDK

Le SDK Python officiel pour Worxflow vous permet d'exécuter des workflows de maniÚre programmatique à partir de vos applications Python en utilisant le SDK Python officiel.

Le SDK Python prend en charge Python 3.8+ avec support d'exécution asynchrone, limitation automatique du débit avec backoff exponentiel, et suivi d'utilisation.

Installation

Installez le SDK en utilisant pip :

pip install simstudio-sdk

Démarrage rapide

Voici un exemple simple pour commencer :

from simstudio import SimStudioClient

# Initialize the client
client = SimStudioClient(
    api_key="your-api-key-here",
    base_url="https://worxflow.ai"  # optional, defaults to https://worxflow.ai
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

Référence de l'API

SimStudioClient

Constructeur

SimStudioClient(api_key: str, base_url: str = "https://worxflow.ai")

ParamĂštres :

  • api_key (str) : Votre clĂ© API Sim
  • base_url (str, facultatif) : URL de base pour l'API Sim

Méthodes

execute_workflow()

Exécuter un workflow avec des données d'entrée facultatives.

result = client.execute_workflow(
    "workflow-id",
    input_data={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

ParamĂštres :

  • workflow_id (str) : L'identifiant du workflow Ă  exĂ©cuter
  • input_data (dict, facultatif) : DonnĂ©es d'entrĂ©e Ă  transmettre au workflow
  • timeout (float, facultatif) : DĂ©lai d'expiration en secondes (par dĂ©faut : 30.0)
  • stream (bool, facultatif) : Activer les rĂ©ponses en streaming (par dĂ©faut : False)
  • selected_outputs (list[str], facultatif) : Sorties de blocs Ă  diffuser au format blockName.attribute (par exemple, ["agent1.content"])
  • async_execution (bool, facultatif) : ExĂ©cuter de maniĂšre asynchrone (par dĂ©faut : False)

Retourne : WorkflowExecutionResult | AsyncExecutionResult

Lorsque async_execution=True, retourne immédiatement un identifiant de tùche pour l'interrogation. Sinon, attend la fin de l'exécution.

get_workflow_status()

Obtenir le statut d'un workflow (statut de déploiement, etc.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

ParamĂštres :

  • workflow_id (str) : L'identifiant du workflow

Retourne : WorkflowStatus

validate_workflow()

Valider qu'un workflow est prĂȘt pour l'exĂ©cution.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

ParamĂštres :

  • workflow_id (str) : L'identifiant du workflow

Retourne : bool

get_job_status()

Obtenir le statut d'une exécution de tùche asynchrone.

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

ParamĂštres :

  • task_id (str) : L'identifiant de tĂąche retournĂ© par l'exĂ©cution asynchrone

Retourne : Dict[str, Any]

Champs de réponse :

  • success (bool) : Si la requĂȘte a rĂ©ussi
  • taskId (str) : L'identifiant de la tĂąche
  • status (str) : L'un des Ă©tats suivants : 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict) : Contient startedAt, completedAt, et duration
  • output (any, facultatif) : La sortie du workflow (une fois terminĂ©)
  • error (any, facultatif) : DĂ©tails de l'erreur (en cas d'Ă©chec)
  • estimatedDuration (int, facultatif) : DurĂ©e estimĂ©e en millisecondes (lors du traitement/mise en file d'attente)
execute_with_retry()

Exécuter un workflow avec réessai automatique en cas d'erreurs de limitation de débit, en utilisant un backoff exponentiel.

result = client.execute_with_retry(
    "workflow-id",
    input_data={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

ParamĂštres :

  • workflow_id (str) : L'identifiant du workflow Ă  exĂ©cuter
  • input_data (dict, facultatif) : DonnĂ©es d'entrĂ©e Ă  transmettre au workflow
  • timeout (float, facultatif) : DĂ©lai d'expiration en secondes
  • stream (bool, facultatif) : Activer les rĂ©ponses en streaming
  • selected_outputs (list, facultatif) : Sorties de blocs Ă  diffuser
  • async_execution (bool, facultatif) : ExĂ©cuter de maniĂšre asynchrone
  • max_retries (int, facultatif) : Nombre maximum de tentatives (par dĂ©faut : 3)
  • initial_delay (float, facultatif) : DĂ©lai initial en secondes (par dĂ©faut : 1.0)
  • max_delay (float, facultatif) : DĂ©lai maximum en secondes (par dĂ©faut : 30.0)
  • backoff_multiplier (float, facultatif) : Multiplicateur de backoff (par dĂ©faut : 2.0)

Retourne : WorkflowExecutionResult | AsyncExecutionResult

La logique de nouvelle tentative utilise un backoff exponentiel (1s → 2s → 4s → 8s...) avec une variation alĂ©atoire de ±25% pour Ă©viter l'effet de horde. Si l'API fournit un en-tĂȘte retry-after, celui-ci sera utilisĂ© Ă  la place.

get_rate_limit_info()

Obtenir les informations actuelles sur les limites de débit à partir de la derniÚre réponse de l'API.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

Retourne : RateLimitInfo | None

get_usage_limits()

Obtenir les limites d'utilisation actuelles et les informations de quota pour votre compte.

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

Retourne : UsageLimits

Structure de la réponse :

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

Mettre à jour la clé API.

client.set_api_key("new-api-key")
set_base_url()

Mettre Ă  jour l'URL de base.

client.set_base_url("https://my-custom-domain.com")
close()

Fermer la session HTTP sous-jacente.

client.close()

Classes de données

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    is_published: bool = False
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

SimStudioError

class SimStudioError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Codes d'erreur courants :

  • UNAUTHORIZED : ClĂ© API invalide
  • TIMEOUT : DĂ©lai d'attente de la requĂȘte dĂ©passĂ©
  • RATE_LIMIT_EXCEEDED : Limite de dĂ©bit dĂ©passĂ©e
  • USAGE_LIMIT_EXCEEDED : Limite d'utilisation dĂ©passĂ©e
  • EXECUTION_ERROR : Échec de l'exĂ©cution du workflow

Exemples

Exécution basique d'un workflow

Configurez le SimStudioClient avec votre clé API.

VĂ©rifiez si le workflow est dĂ©ployĂ© et prĂȘt pour l'exĂ©cution.

Lancez le workflow avec vos données d'entrée.

Traitez le résultat de l'exécution et gérez les éventuelles erreurs.

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input_data={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

Gestion des erreurs

Gérez différents types d'erreurs qui peuvent survenir pendant l'exécution du workflow :

from simstudio import SimStudioClient, SimStudioError
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except SimStudioError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

Utilisation du gestionnaire de contexte

Utilisez le client comme gestionnaire de contexte pour gérer automatiquement le nettoyage des ressources :

from simstudio import SimStudioClient
import os

# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

Exécution de workflows par lots

Exécutez plusieurs workflows efficacement :

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Exécution asynchrone de workflow

Exécutez des workflows de maniÚre asynchrone pour les tùches de longue durée :

import os
import time
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input_data={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

Limitation de débit et nouvelle tentative

Gérez les limites de débit automatiquement avec un retrait exponentiel :

import os
from simstudio import SimStudioClient, SimStudioError

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input_data={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except SimStudioError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

Surveillance de l'utilisation

Surveillez l'utilisation et les limites de votre compte :

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

Exécution de workflow en streaming

Exécutez des workflows avec des réponses en streaming en temps réel :

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input_data={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

La réponse en streaming suit le format Server-Sent Events (SSE) :

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Exemple de streaming avec Flask :

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            'https://worxflow.ai/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('SIM_API_KEY')
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Configuration de l'environnement

Configurez le client en utilisant des variables d'environnement :

import os
from simstudio import SimStudioClient

# Development configuration
client = SimStudioClient(
    api_key=os.getenv("SIM_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://worxflow.ai")
)
import os
from simstudio import SimStudioClient

# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
    raise ValueError("SIM_API_KEY environment variable is required")

client = SimStudioClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://worxflow.ai")
)

Obtention de votre clé API

Accédez à Sim et connectez-vous à votre compte.

Accédez au workflow que vous souhaitez exécuter par programmation.

Cliquez sur "Déployer" pour déployer votre workflow s'il n'a pas encore été déployé.

Pendant le processus de déploiement, sélectionnez ou créez une clé API.

Copiez la clé API pour l'utiliser dans votre application Python.

Gardez votre clé API sécurisée et ne la soumettez jamais au contrÎle de version. Utilisez des variables d'environnement ou une gestion de configuration sécurisée.

Prérequis

  • Python 3.8+
  • requests >= 2.25.0

Licence

Apache-2.0

Python SDK