Getting Started
ZephyrPush is a real-time messaging service built on Cloudflare Workers that allows you to publish messages to channels and subscribe to them via WebSocket connections. It's perfect for real-time notifications, live chat, and event-driven applications.
What You'll Need
- • An API key (get one from your dashboard)
- • Basic knowledge of HTTP/WebSocket APIs
- • Any programming language that can make HTTP requests
Quick Start Steps
- 1. Sign up for a free account
- 2. Generate your API key
- 3. Choose a channel name (channels are created automatically)
- 4. Start publishing and subscribing!
Authentication
All API requests require authentication using your API key. You can generate API keys from your dashboard after signing up.
Keep Your API Key Secure
Your API key provides access to your messaging channels. Never share it publicly or commit it to version control.
Channels
Channels are named endpoints for message routing. When you publish a message to a channel, all subscribers to that channel receive the message in real-time.
Dynamic Channel Creation
Channels are created automatically when you first publish a message to them. You don't need to create channels in advance - simply start publishing and subscribing!
Channel Naming Rules
- • Channel names are case-sensitive
- • Only letters, numbers, hyphens (-), and underscores (_) are allowed
- • Maximum length: 255 characters
-
• Examples:
my-channel,notifications_2024
API Reference
/auth
Get an authentication token for WebSocket connections.
Request Body
{
"channel": "your-channel-name",
"api_key": "your-api-key"
}
Response
Success (200)
{
"token": "jwt-authentication-token"
}
Error Responses
{
"error": "Channel name contains invalid characters"
}
Invalid API key
/publish
Publish a message to a channel.
Request Body
{
"channel": "your-channel-name",
"data": {
"message": "Hello, world!",
"timestamp": 1234567890
},
"api_key": "your-api-key",
"event": "message"
}
Parameters
- channel (required): The channel name to publish to
- data (required): The message payload (any JSON object)
- api_key (required): Your API key
- event (optional): Event type ("message", "notification", "alert", "update")
Message Size Limits
- Free Tier: 1KB maximum per message
- Pro Tier: 10KB maximum per message
- Enterprise Tier: 10KB maximum per message
Upgrade to Pro for 10x larger message payloads!
Response
Success (200)
{
"success": true,
"message": "Message published successfully"
}
Error Responses
{
"error": "Missing required field: channel"
}
{
"error": "Invalid API key"
}
Access denied: You do not own this channel
{
"error": "Channel name contains invalid characters"
}
Monthly message limit exceeded. Please upgrade your plan.
{
"error": "Message size limit exceeded. Maximum allowed: 1KB, your message: 1.5KB. Upgrade to Pro for 10x larger messages."
}
/subscribe
Connect to a WebSocket for real-time message subscription.
Connection URL
wss://api.zephyrpush.com/subscribe?token=your-jwt-token
Connection Status
Successful Connection
WebSocket connection opens successfully and begins receiving messages.
Connection Errors
Missing channel
{
"error": "Channel name contains invalid characters"
}
Missing token
Access denied: You do not own this channel
Channel not found
{
"error": "Connection limit exceeded",
"limit": 500,
"current": 500,
"message": "Upgrade your subscription for higher limits"
}
Message Format
{
"event": "message",
"data": {
"message": "Hello, world!",
"timestamp": 1234567890
},
"timestamp": 1234567890,
"channel": "your-channel-name"
}
/analytics
Get analytics data for your channels.
Query Parameters
- channel (optional): Filter by specific channel
- period (optional): Time period ("day", "week", "month")
Response
Success (200)
{
"totalMessages": 1250,
"monthlyMessages": 0,
"activeChannels": 5,
"connectedUsers": 0,
"messageVolume": [0],
"timeSeries": []
}
Pro Features
Pro subscribers get additional advanced analytics including:
- Detailed channel-specific metrics
- Response time analytics
- Error rate tracking
- Geographic distribution data
Advanced analytics features are available for Pro and Enterprise subscribers.
Rate Limiting
ZephyrPush implements user-based rate limiting to ensure fair usage and protect the service. Rate limits vary by subscription tier.
Response Headers
Every API response includes rate limit headers to help you track your usage:
| Header | Description |
|---|---|
X-RateLimit-Limit |
Maximum requests allowed in the current window |
X-RateLimit-Remaining |
Number of requests remaining in the current window |
X-RateLimit-Reset |
Unix timestamp when the rate limit window resets |
X-RateLimit-Policy |
Rate limit policy in format limit;w=window_seconds |
Retry-After |
Seconds to wait before retrying (only included when rate limited) |
Example Response Headers
X-RateLimit-Limit: 120
X-RateLimit-Remaining: 115
X-RateLimit-Reset: 1732982460
X-RateLimit-Policy: 120;w=60
Rate Limits by Tier
All rate limits are per-minute windows. Higher tier subscriptions receive increased limits:
| Endpoint | Free | Pro | Enterprise |
|---|---|---|---|
/publish |
60/min | 120/min | 300/min |
/subscribe |
30/min | 60/min | 150/min |
| Polling endpoints | 120/min | 240/min | 600/min |
| Authentication endpoints | 5-10/min (IP-based, same for all tiers) | ||
Rate Limit Exceeded Response
When you exceed the rate limit, you'll receive a 429 Too Many Requests response:
HTTP/1.1 429 Too Many Requests
Content-Type: application/json
Retry-After: 30
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1732982460
{
"error": "Rate limit exceeded",
"limit": 60,
"remaining": 0,
"resetAt": "2024-11-30T15:01:00.000Z",
"retryAfter": 30
}
Handling Rate Limits
Best practices for handling rate limits in your application:
JavaScript Example
async function makeApiRequest(url, options) {
const response = await fetch(url, options);
// Check if rate limited
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get('Retry-After') || '60');
console.log(`Rate limited. Retrying in ${retryAfter} seconds...`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
return makeApiRequest(url, options); // Retry
}
// Log remaining requests when low
const remaining = response.headers.get('X-RateLimit-Remaining');
if (remaining && parseInt(remaining) < 10) {
console.warn(`Low rate limit: ${remaining} requests remaining`);
}
return response;
}
Tip: Always check the Retry-After header when rate limited, as it tells you exactly how long to wait before your next request.
Code Examples
# Python Example - Production Optimized
import asyncio
import aiohttp
import websockets
import json
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import backoff
from aiohttp import ClientTimeout
from aiohttp_retry import RetryClient, ExponentialRetry
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ZephyrPushClient:
api_key: str
base_url: str = "https://api.zephyrpush.com"
ws_url: str = "wss://api.zephyrpush.com"
token_cache: Dict[str, Dict] = None
session: Optional[aiohttp.ClientSession] = None
def __post_init__(self):
self.token_cache = {}
async def __aenter__(self):
timeout = ClientTimeout(total=30, connect=10)
retry_options = ExponentialRetry(attempts=3, start_timeout=1.0, max_timeout=10.0)
self.session = RetryClient(
aiohttp.ClientSession(timeout=timeout, connector=aiohttp.TCPConnector(limit=100)),
retry_options=retry_options
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _get_auth_token(self, channel: str) -> str:
"""Get cached token or fetch new one with automatic refresh"""
cache_key = f"{self.api_key}:{channel}"
now = datetime.now()
# Check cache validity (tokens expire in 1 hour)
if cache_key in self.token_cache:
cached = self.token_cache[cache_key]
if now < cached['expires']:
return cached['token']
# Fetch new token
async with self.session.post(
f"{self.base_url}/auth",
json={"channel": channel, "api_key": self.api_key},
headers={"Content-Type": "application/json"}
) as resp:
resp.raise_for_status()
data = await resp.json()
token = data['token']
# Cache token with 50min expiry (10min buffer)
self.token_cache[cache_key] = {
'token': token,
'expires': now + timedelta(minutes=50)
}
return token
@backoff.on_exception(backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=3)
async def publish_message(self, channel: str, data: Dict[str, Any], event: str = 'message') -> bool:
"""Publish message with retry logic and proper error handling"""
try:
token = await self._get_auth_token(channel)
payload = {
'channel': channel,
'data': data,
'api_key': self.api_key,
'event': event
}
async with self.session.post(
f"{self.base_url}/publish",
json=payload,
headers={"Content-Type": "application/json"}
) as resp:
resp.raise_for_status()
logger.info(f"Message published to {channel}: {event}")
return True
except aiohttp.ClientResponseError as e:
if e.status == 429:
logger.warning(f"Rate limited for channel {channel}, backing off")
raise # Let retry decorator handle it
elif e.status >= 500:
logger.error(f"Server error publishing to {channel}: {e.status}")
raise
else:
logger.error(f"Client error publishing to {channel}: {e.status} - {e.message}")
return False
except Exception as e:
logger.error(f"Unexpected error publishing to {channel}: {e}")
return False
async def subscribe_to_channel(self, channel: str, message_handler: callable):
"""Subscribe to channel with connection management and health checks"""
while True: # Reconnection loop
try:
token = await self._get_auth_token(channel)
uri = f"{self.ws_url}/subscribe?token={token}"
async with websockets.connect(uri, extra_headers={"User-Agent": "ZephyrPush-Client/1.0"}) as ws:
logger.info(f"Connected to channel: {channel}")
# Connection health check
await ws.ping()
async for message in ws:
try:
data = json.loads(message)
await message_handler(channel, data)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received on {channel}: {e}")
continue
except Exception as e:
logger.error(f"Error handling message on {channel}: {e}")
continue
except websockets.exceptions.ConnectionClosedError as e:
logger.warning(f"WebSocket connection closed for {channel}: {e.code} - {e.reason}")
await asyncio.sleep(5) # Reconnection delay
except Exception as e:
logger.error(f"WebSocket error for {channel}: {e}")
await asyncio.sleep(10) # Longer delay for other errors
# Usage examples with proper async context management
async def main():
async with ZephyrPushClient("your-api-key") as client:
# Publish with automatic retries and error handling
await client.publish_message('my-channel', {'message': 'Hello World!'})
await client.publish_message('my-channel', {'title': 'System Alert', 'priority': 'high'}, 'alert')
# Subscribe with reconnection logic
async def handle_message(channel, data):
event_type = data.get('event', 'message')
logger.info(f"Received {event_type} on {channel}: {data['data']}")
# Run subscription in background task
subscription_task = asyncio.create_task(
client.subscribe_to_channel('my-channel', handle_message)
)
# Publish more messages while subscribed
await client.publish_message('my-channel', {'user': 'John', 'action': 'logged in'}, 'notification')
# Keep running for demo (in real app, this would be your main event loop)
await asyncio.sleep(30)
if __name__ == "__main__":
asyncio.run(main())
# cURL Examples - Production Optimized
# Environment variables for security
API_KEY="your-api-key"
CHANNEL="my-channel"
BASE_URL="https://api.zephyrpush.com"
ANALYTICS_URL="https://analytics.zephyrpush.com"
# 1. Get authentication token with proper error handling and retries
TOKEN=$(curl -s --retry 3 --retry-delay 2 --max-time 30 \
-X POST "$BASE_URL/auth" \
-H "Content-Type: application/json" \
-H "User-Agent: ZephyrPush-Client/1.0" \
-d "{\"channel\": \"$CHANNEL\", \"api_key\": \"$API_KEY\"}" | jq -r '.token')
if [ -z "$TOKEN" ] || [ "$TOKEN" = "null" ]; then
echo "Failed to obtain authentication token"
exit 1
fi
# 2. Publish messages with different event types (with rate limiting consideration)
publish_message() {
local data="$1"
local event="${2:-message}"
curl -s --retry 3 --retry-delay 1 --retry-max-time 30 --max-time 10 \
-X POST "$BASE_URL/publish" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-H "User-Agent: ZephyrPush-Client/1.0" \
-H "X-Request-ID: $(uuidgen 2>/dev/null || echo $RANDOM)" \
-d "{
\"channel\": \"$CHANNEL\",
\"data\": $data,
\"api_key\": \"$API_KEY\",
\"event\": \"$event\"
}" | jq -e '.success // empty' > /dev/null
if [ $? -eq 0 ]; then
echo "Message published successfully: $event"
else
echo "Failed to publish message: $event"
return 1
fi
}
# Usage examples with proper error handling
publish_message '{"message": "Hello World!"}' || exit 1
publish_message '{"title": "System Alert", "priority": "high"}' "alert" || exit 1
publish_message '{"user": "John", "action": "logged in"}' "notification" || exit 1
publish_message '{"status": "updated", "timestamp": '$(date +%s)'}' "update" || exit 1
# 3. Get analytics with authentication and error handling
curl -s --retry 2 --max-time 15 \
-H "Authorization: Bearer $TOKEN" \
-H "User-Agent: ZephyrPush-Client/1.0" \
"$ANALYTICS_URL/analytics?channel=$CHANNEL" | jq .
# 4. Health check endpoint (if available)
curl -s --max-time 5 \
-H "User-Agent: ZephyrPush-Client/1.0" \
"$BASE_URL/health" || echo "Health check failed"
# 5. WebSocket connection test (using websocat or similar)
# websocat -u "wss://api.zephyrpush.com/subscribe?token=$TOKEN" --ping-interval 30
<?php
// PHP Example - Production Optimized
class ZephyrPushClient {
private string $apiKey;
private string $baseUrl;
private string $analyticsUrl;
private array $tokenCache = [];
private int $maxRetries = 3;
private float $timeout = 30.0;
private ?CurlHandle $curlHandle = null;
public function __construct(string $apiKey, string $baseUrl = 'https://api.zephyrpush.com', string $analyticsUrl = 'https://analytics.zephyrpush.com') {
$this->apiKey = $apiKey;
$this->baseUrl = $baseUrl;
$this->analyticsUrl = $analyticsUrl;
// Initialize persistent cURL handle for connection reuse
$this->curlHandle = curl_init();
curl_setopt_array($this->curlHandle, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => $this->timeout,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_USERAGENT => 'ZephyrPush-PHP-Client/1.0',
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_SSL_VERIFYPEER => true,
CURLOPT_SSL_VERIFYHOST => 2,
]);
}
public function __destruct() {
if ($this->curlHandle) {
curl_close($this->curlHandle);
}
}
private function getAuthToken(string $channel): string {
$cacheKey = $this->apiKey . ':' . $channel;
// Check token cache (50min expiry to account for clock skew)
if (isset($this->tokenCache[$cacheKey]) && $this->tokenCache[$cacheKey]['expires'] > time()) {
return $this->tokenCache[$cacheKey]['token'];
}
$payload = json_encode(['channel' => $channel, 'api_key' => $this->apiKey]);
for ($attempt = 1; $attempt <= $this->maxRetries; $attempt++) {
curl_setopt_array($this->curlHandle, [
CURLOPT_URL => $this->baseUrl . '/auth',
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $payload,
]);
$response = curl_exec($this->curlHandle);
$httpCode = curl_getinfo($this->curlHandle, CURLINFO_HTTP_CODE);
if ($httpCode === 200 && $response) {
$data = json_decode($response, true);
if (isset($data['token'])) {
// Cache token for 50 minutes
$this->tokenCache[$cacheKey] = [
'token' => $data['token'],
'expires' => time() + 3000
];
return $data['token'];
}
}
// Exponential backoff for retries
if ($attempt < $this->maxRetries) {
sleep(pow(2, $attempt - 1));
}
}
throw new RuntimeException('Failed to obtain authentication token after ' . $this->maxRetries . ' attempts');
}
public function publishMessage(string $channel, array $data, string $event = 'message'): bool {
try {
$token = $this->getAuthToken($channel);
$payload = json_encode([
'channel' => $channel,
'data' => $data,
'api_key' => $this->apiKey,
'event' => $event
]);
for ($attempt = 1; $attempt <= $this->maxRetries; $attempt++) {
curl_setopt_array($this->curlHandle, [
CURLOPT_URL => $this->baseUrl . '/publish',
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $payload,
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Authorization: Bearer ' . $token,
'X-Request-ID: ' . uniqid('req_', true)
]
]);
$response = curl_exec($this->curlHandle);
$httpCode = curl_getinfo($this->curlHandle, CURLINFO_HTTP_CODE);
if ($httpCode === 200) {
error_log("Message published successfully: $event to $channel");
return true;
}
if ($httpCode === 429) {
// Rate limited - exponential backoff
$delay = pow(2, $attempt - 1);
error_log("Rate limited, retrying in {$delay}s");
sleep($delay);
continue;
}
if ($httpCode >= 500 && $attempt < $this->maxRetries) {
// Server error - retry
$delay = pow(2, $attempt - 1);
error_log("Server error {$httpCode}, retrying in {$delay}s");
sleep($delay);
continue;
}
// Client error or final attempt failed
error_log("Failed to publish message: HTTP $httpCode - " . substr($response, 0, 200));
return false;
}
} catch (Exception $e) {
error_log('Error publishing message: ' . $e->getMessage());
return false;
}
return false;
}
public function getAnalytics(string $channel): ?array {
try {
$token = $this->getAuthToken($channel);
curl_setopt_array($this->curlHandle, [
CURLOPT_URL => $this->analyticsUrl . '/analytics?channel=' . urlencode($channel),
CURLOPT_HTTPGET => true,
CURLOPT_HTTPHEADER => [
'Authorization: Bearer ' . $token,
'User-Agent: ZephyrPush-PHP-Client/1.0'
]
]);
$response = curl_exec($this->curlHandle);
$httpCode = curl_getinfo($this->curlHandle, CURLINFO_HTTP_CODE);
if ($httpCode === 200 && $response) {
return json_decode($response, true);
}
error_log("Analytics request failed: HTTP $httpCode");
return null;
} catch (Exception $e) {
error_log('Error fetching analytics: ' . $e->getMessage());
return null;
}
}
}
// Usage examples with proper error handling
try {
$client = new ZephyrPushClient('your-api-key');
// Publish messages with automatic retries and error handling
$client->publishMessage('my-channel', ['message' => 'Hello World!']);
$client->publishMessage('my-channel', ['title' => 'System Alert', 'priority' => 'high'], 'alert');
$client->publishMessage('my-channel', ['user' => 'John', 'action' => 'logged in'], 'notification');
$client->publishMessage('my-channel', ['status' => 'updated', 'timestamp' => time()], 'update');
// Get analytics
$analytics = $client->getAnalytics('my-channel');
if ($analytics) {
echo 'Analytics: ' . json_encode($analytics) . PHP_EOL;
}
} catch (Exception $e) {
error_log('Application error: ' . $e->getMessage());
exit(1);
}
?>
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/avast/retry-go"
"golang.org/x/time/rate"
)
type ZephyrPushClient struct {
apiKey string
baseURL string
analyticsURL string
httpClient *http.Client
tokenCache map[string]*TokenCacheEntry
rateLimiter *rate.Limiter
mu sync.RWMutex
}
type TokenCacheEntry struct {
Token string
Expires time.Time
}
type AuthRequest struct {
Channel string `json:"channel"`
APIKey string `json:"api_key"`
}
type PublishRequest struct {
Channel string `json:"channel"`
Data interface{} `json:"data"`
APIKey string `json:"api_key"`
Event string `json:"event,omitempty"`
}
func NewZephyrPushClient(apiKey string) *ZephyrPushClient {
// Production-grade HTTP client with connection pooling and timeouts
transport := &http.Transport{
MaxIdleConns: 100,
MaxConnsPerHost: 10,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
client := &ZephyrPushClient{
apiKey: apiKey,
baseURL: "https://api.zephyrpush.com",
analyticsURL: "https://analytics.zephyrpush.com",
httpClient: &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
},
tokenCache: make(map[string]*TokenCacheEntry),
rateLimiter: rate.NewLimiter(rate.Every(time.Second), 100), // 100 req/sec
}
return client
}
func (c *ZephyrPushClient) getAuthToken(ctx context.Context, channel string) (string, error) {
cacheKey := fmt.Sprintf("%s:%s", c.apiKey, channel)
c.mu.RLock()
if entry, exists := c.tokenCache[cacheKey]; exists && time.Now().Before(entry.Expires) {
c.mu.RUnlock()
return entry.Token, nil
}
c.mu.RUnlock()
var token string
err := retry.Do(
func() error {
return c.fetchNewToken(ctx, channel, &token)
},
retry.Attempts(3),
retry.Delay(1*time.Second),
retry.MaxDelay(10*time.Second),
retry.Context(ctx),
)
if err != nil {
return "", fmt.Errorf("failed to get auth token after retries: %w", err)
}
// Cache token with 50min expiry
c.mu.Lock()
c.tokenCache[cacheKey] = &TokenCacheEntry{
Token: token,
Expires: time.Now().Add(50 * time.Minute),
}
c.mu.Unlock()
return token, nil
}
func (c *ZephyrPushClient) fetchNewToken(ctx context.Context, channel string, token *string) error {
// Rate limiting
if err := c.rateLimiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limited: %w", err)
}
authReq := AuthRequest{Channel: channel, APIKey: c.apiKey}
jsonData, err := json.Marshal(authReq)
if err != nil {
return fmt.Errorf("failed to marshal auth request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/auth", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "ZephyrPush-Go-Client/1.0")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("auth request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("auth request failed with status: %d", resp.StatusCode)
}
var authResp map[string]string
if err := json.NewDecoder(resp.Body).Decode(&authResp); err != nil {
return fmt.Errorf("failed to decode auth response: %w", err)
}
if tokenVal, exists := authResp["token"]; exists {
*token = tokenVal
return nil
}
return fmt.Errorf("token not found in auth response")
}
func (c *ZephyrPushClient) PublishMessage(ctx context.Context, channel string, data interface{}, event string) error {
token, err := c.getAuthToken(ctx, channel)
if err != nil {
return fmt.Errorf("failed to get auth token: %w", err)
}
publishReq := PublishRequest{
Channel: channel,
Data: data,
APIKey: c.apiKey,
Event: event,
}
return retry.Do(
func() error {
return c.doPublishRequest(ctx, token, publishReq)
},
retry.Attempts(3),
retry.Delay(500*time.Millisecond),
retry.MaxDelay(5*time.Second),
retry.Context(ctx),
retry.RetryIf(func(err error) bool {
// Retry on rate limits and server errors
if httpErr, ok := err.(*url.Error); ok {
if resp, respOk := httpErr.Err.(*http.Response); respOk {
return resp.StatusCode == 429 || resp.StatusCode >= 500
}
}
return false
}),
)
}
func (c *ZephyrPushClient) doPublishRequest(ctx context.Context, token string, req PublishRequest) error {
// Rate limiting
if err := c.rateLimiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limited: %w", err)
}
jsonData, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("failed to marshal publish request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/publish", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create publish request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Authorization", "Bearer "+token)
httpReq.Header.Set("User-Agent", "ZephyrPush-Go-Client/1.0")
httpReq.Header.Set("X-Request-ID", fmt.Sprintf("%d", time.Now().UnixNano()))
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("publish request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("publish failed with status %d: %s", resp.StatusCode, string(body))
}
log.Printf("Message published successfully to channel %s: %s", req.Channel, req.Event)
return nil
}
func (c *ZephyrPushClient) GetAnalytics(ctx context.Context, channel string) (map[string]interface{}, error) {
token, err := c.getAuthToken(ctx, channel)
if err != nil {
return nil, fmt.Errorf("failed to get auth token: %w", err)
}
// Rate limiting
if err := c.rateLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("rate limited: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/analytics?channel=%s", c.analyticsURL, channel), nil)
if err != nil {
return nil, fmt.Errorf("failed to create analytics request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("User-Agent", "ZephyrPush-Go-Client/1.0")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("analytics request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("analytics request failed with status: %d", resp.StatusCode)
}
var analytics map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&analytics); err != nil {
return nil, fmt.Errorf("failed to decode analytics response: %w", err)
}
return analytics, nil
}
func main() {
client := NewZephyrPushClient("your-api-key")
ctx := context.Background()
// Publish different event types with proper error handling
events := []struct {
data interface{}
event string
}{
{map[string]string{"message": "Hello World!"}, "message"},
{map[string]interface{}{"title": "System Alert", "priority": "high"}, "alert"},
{map[string]interface{}{"user": "John", "action": "logged in"}, "notification"},
{map[string]interface{}{"status": "updated", "timestamp": time.Now().Unix()}, "update"},
}
for _, e := range events {
if err := client.PublishMessage(ctx, "my-channel", e.data, e.event); err != nil {
log.Printf("Failed to publish %s: %v", e.event, err)
}
}
// Get analytics
if analytics, err := client.GetAnalytics(ctx, "my-channel"); err != nil {
log.Printf("Failed to get analytics: %v", err)
} else {
log.Printf("Analytics: %+v", analytics)
}
}
// JavaScript Example - Production Optimized
class ZephyrPushClient {
constructor(apiKey, options = {}) {
this.apiKey = apiKey;
this.baseURL = options.baseURL || 'https://api.zephyrpush.com';
this.analyticsURL = options.analyticsURL || 'https://analytics.zephyrpush.com';
this.tokenCache = new Map();
this.maxRetries = options.maxRetries || 3;
this.timeout = options.timeout || 30000;
this.rateLimiter = new RateLimiter(100, 1000); // 100 requests per second
this.logger = options.logger || console;
}
async getAuthToken(channel) {
const cacheKey = `${this.apiKey}:${channel}`;
const cached = this.tokenCache.get(cacheKey);
// Check if token is still valid (50min buffer)
if (cached && Date.now() < cached.expires) {
return cached.token;
}
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
await this.rateLimiter.wait();
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);
const response = await fetch(`${this.baseURL}/auth`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'ZephyrPush-JS-Client/1.0',
'X-Request-ID': crypto.randomUUID()
},
body: JSON.stringify({ channel, api_key: this.apiKey }),
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`Auth failed: ${response.status} ${response.statusText}`);
}
const data = await response.json();
const token = data.token;
// Cache token for 50 minutes
this.tokenCache.set(cacheKey, {
token,
expires: Date.now() + (50 * 60 * 1000)
});
return token;
} catch (error) {
const isLastAttempt = attempt === this.maxRetries;
const isRetryable = this.isRetryableError(error);
if (isLastAttempt || !isRetryable) {
throw new Error(`Failed to get auth token after ${attempt} attempts: ${error.message}`);
}
// Exponential backoff
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
this.logger.warn(`Auth attempt ${attempt} failed, retrying in ${delay}ms:`, error.message);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
isRetryableError(error) {
// Retry on network errors, timeouts, and server errors
if (error.name === 'AbortError') return true; // Timeout
if (error.message.includes('fetch')) return true; // Network error
if (error.message.includes('500') || error.message.includes('502') || error.message.includes('503')) return true;
if (error.message.includes('429')) return true; // Rate limited
return false;
}
async publishMessage(channel, data, event = 'message') {
const token = await this.getAuthToken(channel);
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
await this.rateLimiter.wait();
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);
const response = await fetch(`${this.baseURL}/publish`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
'User-Agent': 'ZephyrPush-JS-Client/1.0',
'X-Request-ID': crypto.randomUUID()
},
body: JSON.stringify({
channel,
data,
api_key: this.apiKey,
event
}),
signal: controller.signal
});
clearTimeout(timeoutId);
if (response.ok) {
this.logger.info(`Message published successfully: ${event} to ${channel}`);
return true;
}
const errorText = await response.text().catch(() => 'Unknown error');
if (response.status === 429) {
// Rate limited - exponential backoff
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
this.logger.warn(`Rate limited, retrying in ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
if (response.status >= 500 && attempt < this.maxRetries) {
// Server error - retry
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
this.logger.warn(`Server error ${response.status}, retrying in ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
// Client error or final attempt failed
throw new Error(`Publish failed: ${response.status} ${response.statusText} - ${errorText}`);
} catch (error) {
const isLastAttempt = attempt === this.maxRetries;
const isRetryable = this.isRetryableError(error);
if (isLastAttempt || !isRetryable) {
this.logger.error(`Failed to publish message after ${attempt} attempts:`, error.message);
return false;
}
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
this.logger.warn(`Publish attempt ${attempt} failed, retrying in ${delay}ms:`, error.message);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
return false;
}
async getAnalytics(channel) {
try {
const token = await this.getAuthToken(channel);
await this.rateLimiter.wait();
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);
const response = await fetch(`${this.analyticsURL}/analytics?channel=${encodeURIComponent(channel)}`, {
headers: {
'Authorization': `Bearer ${token}`,
'User-Agent': 'ZephyrPush-JS-Client/1.0'
},
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`Analytics request failed: ${response.status}`);
}
return await response.json();
} catch (error) {
this.logger.error('Failed to get analytics:', error.message);
return null;
}
}
subscribeToChannel(channel, messageHandler, options = {}) {
const reconnectInterval = options.reconnectInterval || 5000;
const maxReconnectAttempts = options.maxReconnectAttempts || 10;
let reconnectAttempts = 0;
let ws = null;
const connect = async () => {
try {
const token = await this.getAuthToken(channel);
ws = new WebSocket(`${this.baseURL.replace('https://', 'wss://')}/subscribe?token=${token}`);
ws.onopen = () => {
this.logger.info(`Connected to channel: ${channel}`);
reconnectAttempts = 0;
// Send periodic pings to keep connection alive
const pingInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
} else {
clearInterval(pingInterval);
}
}, 30000);
};
ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
messageHandler(channel, message).catch(error => {
this.logger.error('Error in message handler:', error);
});
} catch (error) {
this.logger.error('Failed to parse WebSocket message:', error);
}
};
ws.onerror = (error) => {
this.logger.error(`WebSocket error for channel ${channel}:`, error);
};
ws.onclose = (event) => {
this.logger.warn(`WebSocket closed for channel ${channel}: ${event.code} ${event.reason}`);
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(reconnectInterval * reconnectAttempts, 30000);
this.logger.info(`Reconnecting to ${channel} in ${delay}ms (attempt ${reconnectAttempts})`);
setTimeout(connect, delay);
} else {
this.logger.error(`Max reconnection attempts reached for channel ${channel}`);
}
};
} catch (error) {
this.logger.error(`Failed to connect to channel ${channel}:`, error);
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = Math.min(reconnectInterval * reconnectAttempts, 30000);
setTimeout(connect, delay);
}
}
};
connect();
// Return cleanup function
return () => {
if (ws) {
ws.close(1000, 'Client disconnecting');
}
};
}
}
// Rate limiter implementation
class RateLimiter {
constructor(requestsPerSecond, burstSize = requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
this.burstSize = burstSize;
this.tokens = burstSize;
this.lastRefill = Date.now();
this.refillRate = requestsPerSecond / 1000; // tokens per millisecond
}
async wait() {
const now = Date.now();
const timePassed = now - this.lastRefill;
this.tokens = Math.min(this.burstSize, this.tokens + timePassed * this.refillRate);
this.lastRefill = now;
if (this.tokens < 1) {
const waitTime = (1 - this.tokens) / this.refillRate;
await new Promise(resolve => setTimeout(resolve, waitTime));
this.tokens = 0;
} else {
this.tokens -= 1;
}
}
}
// Usage examples with proper error handling
async function main() {
const client = new ZephyrPushClient('your-api-key', {
logger: console,
maxRetries: 3,
timeout: 30000
});
try {
// Publish messages with automatic retries and error handling
await client.publishMessage('my-channel', { message: 'Hello World!' });
await client.publishMessage('my-channel', { title: 'System Alert', priority: 'high' }, 'alert');
await client.publishMessage('my-channel', { user: 'John', action: 'logged in' }, 'notification');
await client.publishMessage('my-channel', { status: 'updated', timestamp: Date.now() }, 'update');
// Get analytics
const analytics = await client.getAnalytics('my-channel');
if (analytics) {
}
// Subscribe to channel with reconnection logic
const unsubscribe = client.subscribeToChannel('my-channel', async (channel, message) => {
const eventType = message.event || 'message';
// Handle different event types
switch (eventType) {
case 'alert':
await handleAlert(message.data);
break;
case 'notification':
await handleNotification(message.data);
break;
case 'update':
await handleUpdate(message.data);
break;
default:
await handleMessage(message.data);
}
});
// Run for demo (in real app, this would be your main event loop)
await new Promise(resolve => setTimeout(resolve, 30000));
// Cleanup
unsubscribe();
} catch (error) {
process.exit(1);
}
}
// Helper functions (implement according to your needs)
// Run if in Node.js environment
if (typeof module !== 'undefined' && module.exports) {
module.exports = { ZephyrPushClient, RateLimiter };
// Uncomment to run: main();
}
// Rust Example - Production Optimized
use reqwest::blocking::{Client, ClientBuilder};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tungstenite::{connect, Message, WebSocket};
use url::Url;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::thread;
use backoff::{ExponentialBackoff, Operation};
use governor::{Quota, RateLimiter};
use nonzero_ext::nonzero;
#[derive(Clone)]
struct ZephyrPushClient {
api_key: String,
base_url: String,
analytics_url: String,
http_client: Client,
token_cache: Arc>>,
rate_limiter: Arc>,
}
#[derive(Clone)]
struct TokenCacheEntry {
token: String,
expires: Instant,
}
#[derive(Serialize)]
struct AuthRequest {
channel: String,
api_key: String,
}
#[derive(Serialize)]
struct PublishRequest {
channel: String,
data: Value,
api_key: String,
event: String,
}
#[derive(Deserialize)]
struct AuthResponse {
token: String,
}
impl ZephyrPushClient {
fn new(api_key: String) -> Result> {
let http_client = ClientBuilder::new()
.timeout(Duration::from_secs(30))
.pool_max_idle_per_host(10)
.pool_idle_timeout(Duration::from_secs(90))
.user_agent("ZephyrPush-Rust-Client/1.0")
.build()?;
// Rate limiter: 100 requests per second
let quota = Quota::per_second(nonzero!(100u32));
let rate_limiter = Arc::new(RateLimiter::direct(quota));
Ok(Self {
api_key,
base_url: "https://api.zephyrpush.com".to_string(),
analytics_url: "https://analytics.zephyrpush.com".to_string(),
http_client,
token_cache: Arc::new(Mutex::new(HashMap::new())),
rate_limiter,
})
}
fn get_auth_token(&self, channel: &str) -> Result> {
let cache_key = format!("{}:{}", self.api_key, channel);
let now = Instant::now();
// Check cache
{
let cache = self.token_cache.lock().unwrap();
if let Some(entry) = cache.get(&cache_key) {
if now < entry.expires {
return Ok(entry.token.clone());
}
}
}
// Fetch new token with retry logic
let operation = || {
self.rate_limiter.check_n(1)?;
self.fetch_auth_token(channel)
};
let backoff = ExponentialBackoff::default();
let token = operation.retry(&backoff)?;
// Cache token for 50 minutes
{
let mut cache = self.token_cache.lock().unwrap();
cache.insert(cache_key, TokenCacheEntry {
token: token.clone(),
expires: now + Duration::from_secs(3000), // 50 minutes
});
}
Ok(token)
}
fn fetch_auth_token(&self, channel: &str) -> Result>> {
let auth_req = AuthRequest {
channel: channel.to_string(),
api_key: self.api_key.clone(),
};
let response = self.http_client
.post(&format!("{}/auth", self.base_url))
.json(&auth_req)
.send()
.map_err(|e| backoff::Error::transient(e.into()))?;
if !response.status().is_success() {
return Err(backoff::Error::transient(
format!("Auth failed with status: {}", response.status()).into()
));
}
let auth_resp: AuthResponse = response.json()
.map_err(|e| backoff::Error::transient(e.into()))?;
Ok(auth_resp.token)
}
fn publish_message(&self, channel: &str, data: Value, event: &str) -> Result<(), Box> {
let token = self.get_auth_token(channel)?;
let publish_req = PublishRequest {
channel: channel.to_string(),
data,
api_key: self.api_key.clone(),
event: event.to_string(),
};
let operation = || {
self.rate_limiter.check_n(1)?;
self.do_publish(&token, &publish_req)
};
let backoff = ExponentialBackoff::default();
operation.retry(&backoff)?;
log::info!("Message published successfully: {} to {}", event, channel);
Ok(())
}
fn do_publish(&self, token: &str, req: &PublishRequest) -> Result<(), backoff::Error>> {
let response = self.http_client
.post(&format!("{}/publish", self.base_url))
.bearer_auth(token)
.header("X-Request-ID", format!("req-{}", chrono::Utc::now().timestamp_nanos()))
.json(req)
.send()
.map_err(|e| backoff::Error::transient(e.into()))?;
match response.status() {
reqwest::StatusCode::OK => Ok(()),
reqwest::StatusCode::TOO_MANY_REQUESTS => {
Err(backoff::Error::transient("Rate limited".into()))
},
status if status.is_server_error() => {
Err(backoff::Error::transient(format!("Server error: {}", status).into()))
},
status => {
Err(backoff::Error::permanent(format!("Client error: {}", status).into()))
}
}
}
fn get_analytics(&self, channel: &str) -> Result> {
let token = self.get_auth_token(channel)?;
self.rate_limiter.check_n(1)?;
let response = self.http_client
.get(&format!("{}/analytics?channel={}", self.analytics_url, urlencoding::encode(channel)))
.bearer_auth(token)
.send()?;
if !response.status().is_success() {
return Err(format!("Analytics request failed: {}", response.status()).into());
}
let analytics: Value = response.json()?;
Ok(analytics)
}
fn subscribe_to_channel(&self, channel: &str, message_handler: F) -> Result<(), Box>
where
F: Fn(&str, &Value) -> Result<(), Box> + Send + 'static,
{
let channel = channel.to_string();
let client = self.clone();
thread::spawn(move || {
let mut reconnect_attempts = 0;
let max_reconnect_attempts = 10;
loop {
match client.connect_and_listen(&channel, &message_handler) {
Ok(_) => break, // Normal disconnection
Err(e) => {
log::error!("WebSocket error for channel {}: {}", channel, e);
reconnect_attempts += 1;
if reconnect_attempts >= max_reconnect_attempts {
log::error!("Max reconnection attempts reached for channel {}", channel);
break;
}
let delay = std::cmp::min(5000 * reconnect_attempts, 30000);
log::info!("Reconnecting to {} in {}ms (attempt {})", channel, delay, reconnect_attempts);
thread::sleep(Duration::from_millis(delay as u64));
}
}
}
});
Ok(())
}
fn connect_and_listen(&self, channel: &str, message_handler: &F) -> Result<(), Box>
where
F: Fn(&str, &Value) -> Result<(), Box>,
{
let token = self.get_auth_token(channel)?;
let ws_url = format!("wss://api.zephyrpush.com/subscribe?token={}", token);
let (mut socket, _) = connect(Url::parse(&ws_url)?)?;
log::info!("Connected to channel: {}", channel);
// Send periodic pings to keep connection alive
let ping_interval = Duration::from_secs(30);
let mut last_ping = Instant::now();
loop {
if last_ping.elapsed() >= ping_interval {
socket.write_message(Message::Ping(vec![]))?;
last_ping = Instant::now();
}
match socket.read_message()? {
Message::Text(text) => {
match serde_json::from_str::(&text) {
Ok(message) => {
if let Err(e) = message_handler(channel, &message) {
log::error!("Error in message handler for {}: {}", channel, e);
}
},
Err(e) => {
log::error!("Failed to parse message on {}: {}", channel, e);
}
}
},
Message::Close(_) => {
log::info!("WebSocket closed for channel: {}", channel);
break;
},
Message::Ping(payload) => {
socket.write_message(Message::Pong(payload))?;
},
_ => {}
}
}
Ok(())
}
}
fn handle_alert(data: &Value) {
println!("Alert: {:?}", data);
}
fn handle_notification(data: &Value) {
println!("Notification: {:?}", data);
}
fn handle_update(data: &Value) {
println!("Update: {:?}", data);
}
fn handle_message(data: &Value) {
println!("Message: {:?}", data);
}
fn main() -> Result<(), Box> {
env_logger::init();
let client = ZephyrPushClient::new("your-api-key".to_string())?;
// Publish messages with automatic retries and error handling
client.publish_message("my-channel", json!({"message": "Hello World!"}), "message")?;
client.publish_message("my-channel", json!({"title": "System Alert", "priority": "high"}), "alert")?;
client.publish_message("my-channel", json!({"user": "John", "action": "logged in"}), "notification")?;
client.publish_message("my-channel", json!({"status": "updated", "timestamp": chrono::Utc::now().timestamp()}), "update")?;
// Get analytics
match client.get_analytics("my-channel") {
Ok(analytics) => println!("Analytics: {:?}", analytics),
Err(e) => log::error!("Failed to get analytics: {}", e),
}
// Subscribe to channel with reconnection logic
client.subscribe_to_channel("my-channel", |channel, message| {
let event_type = message.get("event")
.and_then(|e| e.as_str())
.unwrap_or("message");
println!("Received {} on {}: {:?}", event_type, channel, message["data"]);
match event_type {
"alert" => handle_alert(&message["data"]),
"notification" => handle_notification(&message["data"]),
"update" => handle_update(&message["data"]),
_ => handle_message(&message["data"]),
}
Ok(())
})?;
// Keep main thread alive
loop {
thread::sleep(Duration::from_secs(1));
}
}
Frequently Asked Questions
What is ZephyrPush?
ZephyrPush is a real-time messaging service built on Cloudflare Workers that allows you to publish messages to channels and subscribe to them via WebSocket connections. It's perfect for real-time notifications, live chat, and event-driven applications.
How do I get started?
1. Sign up for a free account
2. Generate an API key from your dashboard
3. Use the provided code examples to publish messages or subscribe to channels
4. Test your integration using the WebSocket connection
5. Monitor usage and performance from the Dashboard
What are the rate limits?
The free tier allows up to 100,000 messages per month. Each API key is rate-limited to prevent abuse. If you need higher limits, please contact support for enterprise options.
How do channels work?
Channels are named endpoints for message routing. When you publish a message to a channel, all subscribers to that channel receive the message in real-time. Channels are created automatically when you first publish to them - you don't need to create them in advance. Channel names are case-sensitive and can contain letters, numbers, hyphens, and underscores.
Is my data secure?
Yes! All connections use JWT authentication, and messages are transmitted over secure WebSocket connections (WSS). API keys are required for all operations and should be kept secret.
Can I use multiple channels?
Absolutely! You can publish to multiple channels simultaneously and subscribe to multiple channels from a single WebSocket connection. This is useful for broadcasting messages to different user groups or implementing complex notification systems.
What programming languages are supported?
ZephyrPush works with any language that can make HTTP requests and establish WebSocket connections. We provide code examples for JavaScript, Python, cURL, PHP, Go and Rust. The REST API is language-agnostic.
How do I monitor my usage?
The Dashboard provides real-time analytics including total messages, active channels, connected users, and monthly usage. You can view message volume over time and monitor system health.
What happens if I exceed the free tier?
Once you reach 100,000 messages in a month, publishing will be rate-limited. You'll still be able to receive messages, but new publishes may be delayed. Upgrade to a paid plan for higher limits.
How do I subscribe to multiple channels?
To subscribe to multiple channels, create separate WebSocket connections for each channel. Each connection requires its own authentication token.
JavaScript Example:
// Subscribe to multiple channels
async function subscribeToChannels(channels, apiKey) {
const connections = {};
for (const channel of channels) {
// Get token for each channel
const authResponse = await fetch('https://api.zephyrpush.com/auth', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ channel, api_key: apiKey })
});
const { token } = await authResponse.json();
// Create WebSocket for each channel
const ws = new WebSocket(`wss://api.zephyrpush.com/subscribe?token=${token}`);
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
};
connections[channel] = ws;
}
return connections;
}
// Usage
const connections = await subscribeToChannels(['notifications', 'chat', 'updates'], 'your-api-key');
How do I get support?
For technical issues, check the WebSocket connection and review the code examples. For account or billing questions, contact our support team. Enterprise customers get priority support and dedicated account management.
Support & Contact
Technical Support
Need help with integration or troubleshooting? Our technical support team is here to help.
We typically respond within 24 hours
Resources
Check out our comprehensive resources for detailed guides and API references.
Before Contacting Support
- Check that your API key is correct and not expired
- Verify your channel name follows the naming rules
- Ensure you're using the correct WebSocket URL and protocol
- Check the browser console for JavaScript errors