name: social-media-api-integration description: Integrate with social media platform APIs for automated posting, scheduling, analytics retrieval, and content syndication. Covers OAuth flows, rate limiting, and multi-platform strategies. Triggers on social media API integration, automated posting, or platform API requests. license: MIT complexity: intermediate time_to_learn: 30min tags:
- social-media
- api-integration
- automation
- oauth
- content-syndication governance_phases: [build] organ_affinity: [organ-vii] triggers: [user-asks-about-social-media-api, context:social-posting, context:content-syndication, context:social-automation] complements: [oauth-flow-architect, content-distribution, essay-publishing-pipeline, webhook-integration-patterns]
Social Media API Integration
Build reliable integrations with social platform APIs for automated content distribution.
Platform API Overview
| Platform | API Style | Auth | Rate Limits | Key Constraint |
|---|---|---|---|---|
| Bluesky | AT Protocol | App password / OAuth | 3000/5min | Decentralized, open protocol |
| Mastodon | REST | OAuth 2.0 | 300/5min per IP | Instance-specific endpoints |
| REST | OAuth 2.0 | 100/day posts | Strict content policies | |
| Dev.to | REST | API Key | 30/30s | Article-focused |
| Medium | REST | OAuth 2.0 + Bearer | 100/day | Import API only |
| RSS | Pull-based | None | N/A | Read-only syndication |
Authentication Patterns
OAuth 2.0 Flow (LinkedIn, Mastodon)
from authlib.integrations.httpx_client import AsyncOAuth2Client
class SocialOAuth:
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
self.client = AsyncOAuth2Client(
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
)
def get_auth_url(self, scope: str) -> str:
url, state = self.client.create_authorization_url(
"https://platform.example.com/oauth/authorize",
scope=scope,
)
return url
async def exchange_code(self, code: str) -> dict:
token = await self.client.fetch_token( # allow-secret
"https://platform.example.com/oauth/token",
code=code,
)
return token
API Key Authentication (Dev.to, Bluesky)
import httpx
class DevToClient:
def __init__(self, api_key: str): # allow-secret
self.client = httpx.AsyncClient(
base_url="https://dev.to/api",
headers={"api-key": api_key, "Accept": "application/json"},
)
async def create_article(self, title: str, body: str, tags: list[str], published: bool = False):
return await self.client.post("/articles", json={
"article": {
"title": title,
"body_markdown": body,
"tags": tags,
"published": published,
}
})
Multi-Platform Posting
Content Adapter Pattern
from dataclasses import dataclass
from abc import ABC, abstractmethod
@dataclass
class Post:
title: str
body: str
url: str | None = None
tags: list[str] | None = None
image_url: str | None = None
class PlatformAdapter(ABC):
@abstractmethod
async def publish(self, post: Post) -> str:
"""Publish and return the post URL."""
@abstractmethod
def format_content(self, post: Post) -> dict:
"""Format post for platform constraints."""
class BlueskyAdapter(PlatformAdapter):
async def publish(self, post: Post) -> str:
content = self.format_content(post)
# AT Protocol posting
response = await self.client.post(
"com.atproto.repo.createRecord",
json=content,
)
return response["uri"]
def format_content(self, post: Post) -> dict:
text = post.body[:300] # 300 char limit
if post.url:
text = f"{text}\n\n{post.url}"
return {"collection": "app.bsky.feed.post", "record": {"text": text}}
class MastodonAdapter(PlatformAdapter):
async def publish(self, post: Post) -> str:
content = self.format_content(post)
response = await self.client.post("/api/v1/statuses", json=content)
return response.json()["url"]
def format_content(self, post: Post) -> dict:
text = post.body[:500] # 500 char default
if post.url:
text = f"{text}\n\n{post.url}"
return {"status": text, "visibility": "public"}
Distribution Orchestrator
class ContentDistributor:
def __init__(self, adapters: dict[str, PlatformAdapter]):
self.adapters = adapters
async def distribute(self, post: Post, platforms: list[str] | None = None) -> dict[str, str]:
targets = platforms or list(self.adapters.keys())
results = {}
for platform in targets:
adapter = self.adapters[platform]
try:
url = await adapter.publish(post)
results[platform] = url
except Exception as e:
results[platform] = f"ERROR: {e}"
return results
Rate Limiting
Client-Side Rate Limiter
import asyncio
import time
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.requests: list[float] = []
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
self.requests = [t for t in self.requests if now - t < self.window]
if len(self.requests) >= self.max_requests:
wait_time = self.requests[0] + self.window - now
await asyncio.sleep(wait_time)
self.requests.append(time.time())
Retry on Rate Limit
async def api_call_with_retry(func, *args, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await func(*args)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
else:
raise
raise Exception("Max retries exceeded")
Scheduling
from datetime import datetime, timedelta
class PostScheduler:
def __init__(self, distributor: ContentDistributor):
self.distributor = distributor
self.queue: list[tuple[datetime, Post, list[str]]] = []
def schedule(self, post: Post, platforms: list[str], publish_at: datetime):
self.queue.append((publish_at, post, platforms))
self.queue.sort(key=lambda x: x[0])
async def run(self):
while self.queue:
publish_at, post, platforms = self.queue[0]
now = datetime.now()
if now >= publish_at:
self.queue.pop(0)
await self.distributor.distribute(post, platforms)
else:
await asyncio.sleep((publish_at - now).total_seconds())
Analytics Retrieval
@dataclass
class PostMetrics:
views: int
likes: int
shares: int
comments: int
clicks: int
async def aggregate_metrics(post_urls: dict[str, str]) -> dict[str, PostMetrics]:
metrics = {}
for platform, url in post_urls.items():
adapter = adapters[platform]
metrics[platform] = await adapter.get_metrics(url)
return metrics
Anti-Patterns
- Posting identical content everywhere — Adapt format and tone per platform
- Ignoring rate limits — Always implement client-side rate limiting
- Storing tokens in code — Use environment variables or secret managers
- No error handling on post failure — Queue for retry, log failures
- Synchronous multi-platform posting — Use async/parallel posting with individual error handling
- Hardcoded platform URLs — Instance URLs vary (Mastodon), use configuration