Back to Home
apflow logo

apflow

Distributed task orchestration for the AI-native era

About

apflow is a high-performance distributed task orchestration framework that scales from a single process to massive multi-node clusters. It provides a unified execution interface for 12+ built-in executors (HTTP, SSH, Docker, gRPC, MCP, LLM Agents) with automatic leader election, lease-based task ownership, and horizontal scaling. The framework includes a real-time GraphQL API with WebSocket subscriptions for live task tracking, a pluggable protocol registry (A2A, MCP, GraphQL), and flexible storage options (DuckDB for local, PostgreSQL for distributed). Built for the AI-native era, it seamlessly integrates with CrewAI and LLM-based task tree generation.

Features

Distributed Cluster: Automatic leader election and horizontal multi-node scaling
Unified Execution: 12+ built-in executors (REST, SSH, Docker, gRPC, MCP, etc.)
GraphQL API: Queries, mutations, and real-time WebSocket subscriptions
Protocol Abstraction: Unified registry for A2A, MCP, and GraphQL protocols
Task Tree Orchestration: Complex dependency management and priority execution
Lease-based Ownership: Fail-safe task ownership with automatic expiration
Multi-mode Deployment: Zero-config Standalone (DuckDB) or Distributed (PostgreSQL)
Agent Integration: Native support for CrewAI and LLM-based task tree generation
Event Bus: Inter-node communication for task status and cluster lifecycle events
Built-in Scheduling: Advanced cron-based and interval task scheduling

Workflow Examples

Visualize how tasks are organized in trees and how dependencies control execution order

Sequential Pipeline with Task Tree

Demonstrates both task tree organization (parent-child) and execution dependencies. The tree organizes tasks hierarchically, while dependencies control when tasks execute.

Dependency - Controls execution order
Tree - Parent-child organization
Node shows Schema (inputs)
Flow RootPipeline RootWorkflow root node0http_request_executorFetch DataFetch data from APISchemaurl, method, headersT1command_executorProcess DataProcess fetched dataSchemaoperation, dataT2command_executorSave ResultsSave processed resultsSchemadestination, dataT3

Install

pip install apflow[standard]

Quick Start

from apflow.core.builders import TaskBuilder
from apflow import TaskManager, create_session

# Initialize task manager
db = create_session()
task_manager = TaskManager(db)

# Use TaskBuilder for clean task creation and execution
result = await (
    TaskBuilder(task_manager, "rest_executor")
    .with_name("fetch_data")
    .with_input("url", "https://api.example.com/data")
    .with_input("method", "GET")
    .execute()
)

print(f"Result: {result.result}")

Sub-projects

apflow-webapp

A modern web application for managing and executing tasks with apflow. Built with Next.js and Mantine.

Related Products

apcore

The schema-driven module development framework that apflow is built on.

apcore-mcp

Automatic MCP Server & OpenAI Tools bridge for any apcore-based project.