Python SDK-Referenz
Das offizielle Python-SDK für die SubscribeFlow API. Vollständig asynchron, vollständig typisiert.
Installation
Oder in pyproject.toml (uv, Poetry, PDM):
Client-Initialisierung
Der Client verwendet einen async Context Manager für die HTTP-Verbindung:
import asyncio
from subscribeflow import SubscribeFlowClient
async def main():
async with SubscribeFlowClient(
api_key="sf_live_...",
base_url="https://api.subscribeflow.net", # Standard
timeout=30.0, # Sekunden, Standard: 30
) as client:
subscribers = await client.subscribers.list(limit=10)
asyncio.run(main())
Sie können den Lebenszyklus auch manuell verwalten:
client = SubscribeFlowClient(api_key="sf_live_...")
try:
subscriber = await client.subscribers.get("subscriber-id")
finally:
await client.close()
Ressourcen
subscribers
create(email, tags=None, metadata=None)
Neuen Subscriber erstellen.
subscriber = await client.subscribers.create(
email="alice@example.com",
tags=["newsletter"],
metadata={"source": "website"},
)
get(subscriber_id)
Subscriber per ID abrufen.
get_by_email(email)
Subscriber per E-Mail-Adresse abrufen.
list(limit=50, cursor=None, status=None)
Subscriber mit Cursor-basierter Pagination auflisten.
result = await client.subscribers.list(limit=50, status="active")
for subscriber in result:
print(subscriber.email)
update(subscriber_id, **kwargs)
Subscriber-Felder aktualisieren (E-Mail, Status, Metadaten).
delete(subscriber_id)
Subscriber und alle zugehörigen Daten dauerhaft löschen.
add_tags(subscriber_id, tags)
Tags zu einem Subscriber hinzufügen.
remove_tag(subscriber_id, tag_slug)
Einzelnen Tag von einem Subscriber entfernen.
generate_token(subscriber_id)
Preference-Center-Token generieren.
token_response = await client.subscribers.generate_token("subscriber-id")
print(token_response.token)
tags
create(name, description=None, category=None, is_public=True)
Neuen Tag erstellen.
tag = await client.tags.create(
name="Product Updates",
description="Neue Features und Verbesserungen",
category="product",
is_public=True,
)
get(tag_id)
Tag per ID abrufen.
get_by_name(name)
Tag per Name abrufen.
list(category=None)
Alle Tags auflisten, optional nach Kategorie filtern.
update(tag_id, **kwargs)
Tag-Felder aktualisieren (Name, Beschreibung, Kategorie, is_public).
delete(tag_id)
Tag löschen und alle Subscriber-Zuordnungen entfernen.
templates
create(name, subject, mjml_content, category=None)
E-Mail-Template erstellen.
template = await client.templates.create(
name="Welcome Email",
subject="Willkommen bei {{company}}!",
mjml_content="<mjml><mj-body>...</mj-body></mjml>",
category="transactional",
)
get(template_id)
Template per ID oder Slug abrufen.
list(category=None)
Templates auflisten, optional nach Kategorie filtern.
update(template_id, **kwargs)
Template-Felder aktualisieren.
delete(template_id)
Template löschen.
preview(template_id, variables=None)
Template mit Variablen rendern und HTML-Ausgabe zurückgeben.
preview = await client.templates.preview(
"template-id",
variables={"company": "Acme Inc"},
)
print(preview.html)
campaigns
create(name, template_id, tag_filter=None)
Campaign-Entwurf erstellen.
campaign = await client.campaigns.create(
name="Maerz-Newsletter",
template_id="template-uuid",
tag_filter={"include_tags": ["newsletter"], "match": "any"},
)
get(campaign_id)
Campaign per ID abrufen.
list(status=None)
Campaigns auflisten, optional nach Status filtern.
update(campaign_id, **kwargs)
Campaign-Felder aktualisieren.
send(campaign_id)
Campaign an alle passenden Subscriber senden.
result = await client.campaigns.send("campaign-id")
print(f"Versand an {result.total_recipients} Empfaenger")
cancel(campaign_id)
Laufende Campaign abbrechen.
duplicate(campaign_id)
Campaign als neuen Entwurf duplizieren.
retry(campaign_id)
Fehlgeschlagene Campaign erneut versuchen.
count_recipients(include_tags=None, match=None)
Vorschau der Empfängeranzahl für einen Tag-Filter.
emails
send(template_slug, to, variables=None, idempotency_key=None)
Transaktionale E-Mail an einen einzelnen Empfänger senden.
result = await client.emails.send(
template_slug="welcome-email",
to="alice@example.com",
variables={"company": "Acme Inc"},
idempotency_key="welcome-alice-2024",
)
webhooks
create(url, events, description=None)
Webhook-Endpoint erstellen. Gibt das Signing Secret zurück.
webhook = await client.webhooks.create(
url="https://your-app.com/webhooks/subscribeflow",
events=["subscriber.created", "tag.subscribed"],
)
print(f"Secret: {webhook.secret}")
get(webhook_id)
Webhook per ID abrufen.
list()
Alle Webhook-Endpoints auflisten.
update(webhook_id, **kwargs)
Webhook-Felder aktualisieren (URL, Events, Beschreibung, is_active).
delete(webhook_id)
Webhook-Endpoint löschen.
test(webhook_id)
Testereignis an einen Webhook-Endpoint senden.
list_deliveries(webhook_id)
Zustellungsverlauf eines Webhooks auflisten.
retry_delivery(webhook_id, delivery_id)
Fehlgeschlagene Zustellung wiederholen.
get_stats(webhook_id)
Zustellungsstatistiken eines Webhooks abrufen.
regenerate_secret(webhook_id)
Signing Secret rotieren.
new_webhook = await client.webhooks.regenerate_secret("webhook-id")
print(f"Neues Secret: {new_webhook.secret}")
triggers
create(event_type, template_id, description=None)
Ereignisbasierten E-Mail-Trigger erstellen.
trigger = await client.triggers.create(
event_type="subscriber.created",
template_id="welcome-template-uuid",
description="Willkommens-E-Mail bei Registrierung senden",
)
get(trigger_id)
Trigger per ID abrufen.
list()
Alle Triggers auflisten.
update(trigger_id, **kwargs)
Trigger-Felder aktualisieren.
delete(trigger_id)
Trigger löschen.
billing
get_subscription()
Aktuelles Billing-Abonnement abrufen.
create_checkout_session(plan)
Stripe-Checkout-Session für Plan-Upgrade erstellen.
session = await client.billing.create_checkout_session("starter")
print(f"Checkout-URL: {session.url}")
create_portal_session()
Stripe-Billing-Portal-Session erstellen.
sync_subscription()
Abonnement-Status mit Stripe synchronisieren.
preference_center
Preference-Center-Operationen mit einem Subscriber-Token ausführen.
get_preferences()
Präferenzen und verfügbare Tags des Subscribers abrufen.
subscribe_tag(tag_id)
Tag abonnieren.
unsubscribe_tag(tag_id)
Tag abbestellen.
export_data()
Alle Subscriber-Daten als JSON exportieren (DSGVO Art. 20).
delete_account()
Subscriber-Konto dauerhaft löschen (DSGVO Art. 17).
Fehlerbehandlung
Alle Fehler erweitern SubscribeFlowError:
from subscribeflow import (
SubscribeFlowError, # Basis-Fehler (hat .status, .detail, .type)
AuthenticationError, # 401 -- ungueltiger oder fehlender API-Key
NotFoundError, # 404 -- Ressource nicht gefunden
ValidationError, # 422 -- Validierung der Anfrage fehlgeschlagen
RateLimitError, # 429 -- Rate Limit überschritten
LimitExceededError, # 402 -- Plan-Limit überschritten
)
Beispiel:
from subscribeflow import NotFoundError, ValidationError, LimitExceededError
try:
subscriber = await client.subscribers.get("non-existent-id")
except NotFoundError as e:
print(f"Nicht gefunden: {e.detail}")
except ValidationError as e:
print(f"Validierung fehlgeschlagen: {e.detail}")
for error in e.errors:
print(f" - {error['loc']}: {error['msg']}")
except LimitExceededError as e:
print(f"Plan-Limit überschritten: {e.detail}")
except SubscribeFlowError as e:
print(f"API-Fehler ({e.status}): {e.detail}")
MCP Server
Das Python-SDK enthält einen MCP-Server für die Integration mit Claude Desktop und Claude Code. Siehe den MCP-Integrationsleitfaden für Einrichtungsanweisungen.