Zum Inhalt

Python SDK-Referenz

Das offizielle Python-SDK für die SubscribeFlow API. Vollständig asynchron, vollständig typisiert.

Installation

pip install subscribeflow

Oder in pyproject.toml (uv, Poetry, PDM):

[project.dependencies]
subscribeflow = ">=0.1.0"

Client-Initialisierung

Der Client verwendet einen async Context Manager für die HTTP-Verbindung:

import asyncio
from subscribeflow import SubscribeFlowClient

async def main():
    async with SubscribeFlowClient(
        api_key="sf_live_...",
        base_url="https://api.subscribeflow.net",  # Standard
        timeout=30.0,                               # Sekunden, Standard: 30
    ) as client:
        subscribers = await client.subscribers.list(limit=10)

asyncio.run(main())

Sie können den Lebenszyklus auch manuell verwalten:

client = SubscribeFlowClient(api_key="sf_live_...")
try:
    subscriber = await client.subscribers.get("subscriber-id")
finally:
    await client.close()

Ressourcen

subscribers

create(email, tags=None, metadata=None)

Neuen Subscriber erstellen.

subscriber = await client.subscribers.create(
    email="alice@example.com",
    tags=["newsletter"],
    metadata={"source": "website"},
)

get(subscriber_id)

Subscriber per ID abrufen.

subscriber = await client.subscribers.get("subscriber-id")

get_by_email(email)

Subscriber per E-Mail-Adresse abrufen.

subscriber = await client.subscribers.get_by_email("alice@example.com")

list(limit=50, cursor=None, status=None)

Subscriber mit Cursor-basierter Pagination auflisten.

result = await client.subscribers.list(limit=50, status="active")
for subscriber in result:
    print(subscriber.email)

update(subscriber_id, **kwargs)

Subscriber-Felder aktualisieren (E-Mail, Status, Metadaten).

updated = await client.subscribers.update(
    "subscriber-id",
    metadata={"plan": "professional"},
)

delete(subscriber_id)

Subscriber und alle zugehörigen Daten dauerhaft löschen.

await client.subscribers.delete("subscriber-id")

add_tags(subscriber_id, tags)

Tags zu einem Subscriber hinzufügen.

await client.subscribers.add_tags("subscriber-id", tags=["beta-testers"])

remove_tag(subscriber_id, tag_slug)

Einzelnen Tag von einem Subscriber entfernen.

await client.subscribers.remove_tag("subscriber-id", "beta-testers")

generate_token(subscriber_id)

Preference-Center-Token generieren.

token_response = await client.subscribers.generate_token("subscriber-id")
print(token_response.token)

tags

create(name, description=None, category=None, is_public=True)

Neuen Tag erstellen.

tag = await client.tags.create(
    name="Product Updates",
    description="Neue Features und Verbesserungen",
    category="product",
    is_public=True,
)

get(tag_id)

Tag per ID abrufen.

tag = await client.tags.get("tag-id")

get_by_name(name)

Tag per Name abrufen.

tag = await client.tags.get_by_name("Product Updates")

list(category=None)

Alle Tags auflisten, optional nach Kategorie filtern.

tags = await client.tags.list(category="product")

update(tag_id, **kwargs)

Tag-Felder aktualisieren (Name, Beschreibung, Kategorie, is_public).

await client.tags.update("tag-id", description="Aktualisierte Beschreibung")

delete(tag_id)

Tag löschen und alle Subscriber-Zuordnungen entfernen.

await client.tags.delete("tag-id")

templates

create(name, subject, mjml_content, category=None)

E-Mail-Template erstellen.

template = await client.templates.create(
    name="Welcome Email",
    subject="Willkommen bei {{company}}!",
    mjml_content="<mjml><mj-body>...</mj-body></mjml>",
    category="transactional",
)

get(template_id)

Template per ID oder Slug abrufen.

template = await client.templates.get("template-id")

list(category=None)

Templates auflisten, optional nach Kategorie filtern.

templates = await client.templates.list(category="transactional")

update(template_id, **kwargs)

Template-Felder aktualisieren.

await client.templates.update("template-id", subject="Neuer Betreff")

delete(template_id)

Template löschen.

await client.templates.delete("template-id")

preview(template_id, variables=None)

Template mit Variablen rendern und HTML-Ausgabe zurückgeben.

preview = await client.templates.preview(
    "template-id",
    variables={"company": "Acme Inc"},
)
print(preview.html)

campaigns

create(name, template_id, tag_filter=None)

Campaign-Entwurf erstellen.

campaign = await client.campaigns.create(
    name="Maerz-Newsletter",
    template_id="template-uuid",
    tag_filter={"include_tags": ["newsletter"], "match": "any"},
)

get(campaign_id)

Campaign per ID abrufen.

campaign = await client.campaigns.get("campaign-id")

list(status=None)

Campaigns auflisten, optional nach Status filtern.

campaigns = await client.campaigns.list(status="draft")

update(campaign_id, **kwargs)

Campaign-Felder aktualisieren.

await client.campaigns.update("campaign-id", name="Aktualisierter Name")

send(campaign_id)

Campaign an alle passenden Subscriber senden.

result = await client.campaigns.send("campaign-id")
print(f"Versand an {result.total_recipients} Empfaenger")

cancel(campaign_id)

Laufende Campaign abbrechen.

await client.campaigns.cancel("campaign-id")

duplicate(campaign_id)

Campaign als neuen Entwurf duplizieren.

new_campaign = await client.campaigns.duplicate("campaign-id")

retry(campaign_id)

Fehlgeschlagene Campaign erneut versuchen.

await client.campaigns.retry("campaign-id")

count_recipients(include_tags=None, match=None)

Vorschau der Empfängeranzahl für einen Tag-Filter.

count = await client.campaigns.count_recipients(include_tags=["newsletter"])

emails

send(template_slug, to, variables=None, idempotency_key=None)

Transaktionale E-Mail an einen einzelnen Empfänger senden.

result = await client.emails.send(
    template_slug="welcome-email",
    to="alice@example.com",
    variables={"company": "Acme Inc"},
    idempotency_key="welcome-alice-2024",
)

webhooks

create(url, events, description=None)

Webhook-Endpoint erstellen. Gibt das Signing Secret zurück.

webhook = await client.webhooks.create(
    url="https://your-app.com/webhooks/subscribeflow",
    events=["subscriber.created", "tag.subscribed"],
)
print(f"Secret: {webhook.secret}")

get(webhook_id)

Webhook per ID abrufen.

webhook = await client.webhooks.get("webhook-id")

list()

Alle Webhook-Endpoints auflisten.

webhooks = await client.webhooks.list()

update(webhook_id, **kwargs)

Webhook-Felder aktualisieren (URL, Events, Beschreibung, is_active).

await client.webhooks.update("webhook-id", events=["subscriber.created"])

delete(webhook_id)

Webhook-Endpoint löschen.

await client.webhooks.delete("webhook-id")

test(webhook_id)

Testereignis an einen Webhook-Endpoint senden.

result = await client.webhooks.test("webhook-id")
print(f"Erfolgreich: {result.success}")

list_deliveries(webhook_id)

Zustellungsverlauf eines Webhooks auflisten.

deliveries = await client.webhooks.list_deliveries("webhook-id")

retry_delivery(webhook_id, delivery_id)

Fehlgeschlagene Zustellung wiederholen.

await client.webhooks.retry_delivery("webhook-id", "delivery-id")

get_stats(webhook_id)

Zustellungsstatistiken eines Webhooks abrufen.

stats = await client.webhooks.get_stats("webhook-id")
print(f"Erfolgsrate: {stats.success_rate}%")

regenerate_secret(webhook_id)

Signing Secret rotieren.

new_webhook = await client.webhooks.regenerate_secret("webhook-id")
print(f"Neues Secret: {new_webhook.secret}")

triggers

create(event_type, template_id, description=None)

Ereignisbasierten E-Mail-Trigger erstellen.

trigger = await client.triggers.create(
    event_type="subscriber.created",
    template_id="welcome-template-uuid",
    description="Willkommens-E-Mail bei Registrierung senden",
)

get(trigger_id)

Trigger per ID abrufen.

trigger = await client.triggers.get("trigger-id")

list()

Alle Triggers auflisten.

triggers = await client.triggers.list()

update(trigger_id, **kwargs)

Trigger-Felder aktualisieren.

await client.triggers.update("trigger-id", is_active=False)

delete(trigger_id)

Trigger löschen.

await client.triggers.delete("trigger-id")

billing

get_subscription()

Aktuelles Billing-Abonnement abrufen.

subscription = await client.billing.get_subscription()

create_checkout_session(plan)

Stripe-Checkout-Session für Plan-Upgrade erstellen.

session = await client.billing.create_checkout_session("starter")
print(f"Checkout-URL: {session.url}")

create_portal_session()

Stripe-Billing-Portal-Session erstellen.

session = await client.billing.create_portal_session()
print(f"Portal-URL: {session.url}")

sync_subscription()

Abonnement-Status mit Stripe synchronisieren.

await client.billing.sync_subscription()

preference_center

Preference-Center-Operationen mit einem Subscriber-Token ausführen.

pref_center = client.preference_center(token)

get_preferences()

Präferenzen und verfügbare Tags des Subscribers abrufen.

info = await pref_center.get_preferences()

subscribe_tag(tag_id)

Tag abonnieren.

await pref_center.subscribe_tag("tag-id")

unsubscribe_tag(tag_id)

Tag abbestellen.

await pref_center.unsubscribe_tag("tag-id")

export_data()

Alle Subscriber-Daten als JSON exportieren (DSGVO Art. 20).

export = await pref_center.export_data()

delete_account()

Subscriber-Konto dauerhaft löschen (DSGVO Art. 17).

await pref_center.delete_account()

Fehlerbehandlung

Alle Fehler erweitern SubscribeFlowError:

from subscribeflow import (
    SubscribeFlowError,      # Basis-Fehler (hat .status, .detail, .type)
    AuthenticationError,      # 401 -- ungueltiger oder fehlender API-Key
    NotFoundError,            # 404 -- Ressource nicht gefunden
    ValidationError,          # 422 -- Validierung der Anfrage fehlgeschlagen
    RateLimitError,           # 429 -- Rate Limit überschritten
    LimitExceededError,       # 402 -- Plan-Limit überschritten
)

Beispiel:

from subscribeflow import NotFoundError, ValidationError, LimitExceededError

try:
    subscriber = await client.subscribers.get("non-existent-id")
except NotFoundError as e:
    print(f"Nicht gefunden: {e.detail}")
except ValidationError as e:
    print(f"Validierung fehlgeschlagen: {e.detail}")
    for error in e.errors:
        print(f"  - {error['loc']}: {error['msg']}")
except LimitExceededError as e:
    print(f"Plan-Limit überschritten: {e.detail}")
except SubscribeFlowError as e:
    print(f"API-Fehler ({e.status}): {e.detail}")

MCP Server

Das Python-SDK enthält einen MCP-Server für die Integration mit Claude Desktop und Claude Code. Siehe den MCP-Integrationsleitfaden für Einrichtungsanweisungen.

{
  "mcpServers": {
    "subscribeflow": {
      "command": "uv",
      "args": [
        "--directory", "/pfad/zu/subscribe-flow/sdk/python",
        "run", "subscribeflow-mcp"
      ],
      "env": {
        "SUBSCRIBEFLOW_API_KEY": "sf_live_..."
      }
    }
  }
}