Python SDK Reference
The official Python SDK for the SubscribeFlow API. Fully async, fully typed.
Installation
Or in pyproject.toml (uv, Poetry, PDM):
Client initialization
The client uses an async context manager to manage the HTTP connection:
import asyncio
from subscribeflow import SubscribeFlowClient
async def main():
async with SubscribeFlowClient(
api_key="sf_live_...",
base_url="https://api.subscribeflow.net", # default
timeout=30.0, # seconds, default: 30
) as client:
subscribers = await client.subscribers.list(limit=10)
asyncio.run(main())
You can also manage the lifecycle manually:
client = SubscribeFlowClient(api_key="sf_live_...")
try:
subscriber = await client.subscribers.get("subscriber-id")
finally:
await client.close()
Resources
subscribers
create(email, tags=None, metadata=None)
Create a new subscriber.
subscriber = await client.subscribers.create(
email="alice@example.com",
tags=["newsletter"],
metadata={"source": "website"},
)
get(subscriber_id)
Get a subscriber by ID.
get_by_email(email)
Get a subscriber by email address.
list(limit=50, cursor=None, status=None)
List subscribers with cursor-based pagination.
result = await client.subscribers.list(limit=50, status="active")
for subscriber in result:
print(subscriber.email)
update(subscriber_id, **kwargs)
Update subscriber fields (email, status, metadata).
delete(subscriber_id)
Permanently delete a subscriber and all associated data.
add_tags(subscriber_id, tags)
Add tags to a subscriber.
remove_tag(subscriber_id, tag_slug)
Remove a single tag from a subscriber.
generate_token(subscriber_id)
Generate a preference center token.
token_response = await client.subscribers.generate_token("subscriber-id")
print(token_response.token)
tags
create(name, description=None, category=None, is_public=True)
Create a new tag.
tag = await client.tags.create(
name="Product Updates",
description="New features and improvements",
category="product",
is_public=True,
)
get(tag_id)
Get a tag by ID.
get_by_name(name)
Get a tag by name.
list(category=None)
List all tags, optionally filtered by category.
update(tag_id, **kwargs)
Update tag fields (name, description, category, is_public).
delete(tag_id)
Delete a tag and remove all subscriber associations.
templates
create(name, subject, mjml_content, category=None)
Create an email template.
template = await client.templates.create(
name="Welcome Email",
subject="Welcome to {{company}}!",
mjml_content="<mjml><mj-body>...</mj-body></mjml>",
category="transactional",
)
get(template_id)
Get a template by ID or slug.
list(category=None)
List templates, optionally filtered by category.
update(template_id, **kwargs)
Update template fields.
delete(template_id)
Delete a template.
preview(template_id, variables=None)
Render a template with variables and return the HTML output.
preview = await client.templates.preview(
"template-id",
variables={"company": "Acme Inc"},
)
print(preview.html)
campaigns
create(name, template_id, tag_filter=None)
Create a campaign draft.
campaign = await client.campaigns.create(
name="March Newsletter",
template_id="template-uuid",
tag_filter={"include_tags": ["newsletter"], "match": "any"},
)
get(campaign_id)
Get a campaign by ID.
list(status=None)
List campaigns, optionally filtered by status.
update(campaign_id, **kwargs)
Update campaign fields.
send(campaign_id)
Send a campaign to all matching subscribers.
result = await client.campaigns.send("campaign-id")
print(f"Sending to {result.total_recipients} recipients")
cancel(campaign_id)
Cancel a running campaign.
duplicate(campaign_id)
Duplicate a campaign as a new draft.
retry(campaign_id)
Retry a failed campaign.
count_recipients(include_tags=None, match=None)
Preview how many subscribers match a tag filter.
emails
send(template_slug, to, variables=None, idempotency_key=None)
Send a transactional email to a single recipient.
result = await client.emails.send(
template_slug="welcome-email",
to="alice@example.com",
variables={"company": "Acme Inc"},
idempotency_key="welcome-alice-2024",
)
webhooks
create(url, events, description=None)
Create a webhook endpoint. Returns the signing secret.
webhook = await client.webhooks.create(
url="https://your-app.com/webhooks/subscribeflow",
events=["subscriber.created", "tag.subscribed"],
)
print(f"Secret: {webhook.secret}")
get(webhook_id)
Get a webhook by ID.
list()
List all webhook endpoints.
update(webhook_id, **kwargs)
Update webhook fields (url, events, description, is_active).
delete(webhook_id)
Delete a webhook endpoint.
test(webhook_id)
Send a test event to a webhook endpoint.
list_deliveries(webhook_id)
List delivery history for a webhook.
retry_delivery(webhook_id, delivery_id)
Retry a failed delivery.
get_stats(webhook_id)
Get delivery statistics for a webhook.
regenerate_secret(webhook_id)
Rotate the signing secret.
new_webhook = await client.webhooks.regenerate_secret("webhook-id")
print(f"New secret: {new_webhook.secret}")
triggers
create(event_type, template_id, description=None)
Create an event-based email trigger.
trigger = await client.triggers.create(
event_type="subscriber.created",
template_id="welcome-template-uuid",
description="Send welcome email on signup",
)
get(trigger_id)
Get a trigger by ID.
list()
List all triggers.
update(trigger_id, **kwargs)
Update trigger fields.
delete(trigger_id)
Delete a trigger.
billing
get_subscription()
Get the current billing subscription.
create_checkout_session(plan)
Create a Stripe Checkout session for plan upgrade.
session = await client.billing.create_checkout_session("starter")
print(f"Checkout URL: {session.url}")
create_portal_session()
Create a Stripe Billing Portal session.
sync_subscription()
Synchronize the subscription state with Stripe.
preference_center
Access preference center operations using a subscriber token.
get_preferences()
Get the subscriber's preferences and available tags.
subscribe_tag(tag_id)
Subscribe to a tag.
unsubscribe_tag(tag_id)
Unsubscribe from a tag.
export_data()
Export all subscriber data as JSON (DSGVO Art. 20).
delete_account()
Permanently delete the subscriber account (DSGVO Art. 17).
Error handling
All errors extend SubscribeFlowError:
from subscribeflow import (
SubscribeFlowError, # Base error (has .status, .detail, .type)
AuthenticationError, # 401 -- invalid or missing API key
NotFoundError, # 404 -- resource not found
ValidationError, # 422 -- request validation failed
RateLimitError, # 429 -- rate limit exceeded
LimitExceededError, # 402 -- plan limit exceeded
)
Example:
from subscribeflow import NotFoundError, ValidationError, LimitExceededError
try:
subscriber = await client.subscribers.get("non-existent-id")
except NotFoundError as e:
print(f"Not found: {e.detail}")
except ValidationError as e:
print(f"Validation failed: {e.detail}")
for error in e.errors:
print(f" - {error['loc']}: {error['msg']}")
except LimitExceededError as e:
print(f"Plan limit exceeded: {e.detail}")
except SubscribeFlowError as e:
print(f"API error ({e.status}): {e.detail}")
MCP Server
The Python SDK includes an MCP server for Claude Desktop and Claude Code integration. See the MCP integration guide for setup instructions.