WebSocket Code Examples

Drop-in consumers for every language we get questions about. Each example handles the full lifecycle: connect, parse connected / initial_state / odds_update, exponential-backoff reconnect, and clean shutdown.

Convention. All examples assume PARLAY_API_KEY is set in the environment. Sport is hardcoded to basketball_nba; swap for any value from GET /v1/sports.

Node.js (ws) — minimal

// stream.mjs  ·  npm install ws
import WebSocket from "ws";

const KEY = process.env.PARLAY_API_KEY;
const URL = `wss://parlay-api.com/v1/ws/odds/basketball_nba?apiKey=${KEY}`;

const ws = new WebSocket(URL);
ws.on("message", (buf) => {
  const f = JSON.parse(buf.toString());
  if (f.type === "odds_update") {
    console.log(`${f.count} rows @ ${new Date(f.timestamp * 1000).toISOString()}`);
  }
});
ws.on("close", (c) => console.log("closed", c));

Node.js — production reconnect

Exponential backoff, watchdog timer, key in header instead of URL.

import WebSocket from "ws";

const KEY  = process.env.PARLAY_API_KEY;
const SPORT = "basketball_nba";
const URL  = `wss://parlay-api.com/v1/ws/odds/${SPORT}`;

let backoffMs    = 1000;
let lastFrameMs  = Date.now();
let watchdog;

function connect() {
  const ws = new WebSocket(URL, { headers: { "X-API-Key": KEY } });

  ws.on("open", () => {
    console.log("[open]");
    backoffMs = 1000;
    lastFrameMs = Date.now();
    watchdog = setInterval(() => {
      if (Date.now() - lastFrameMs > 90_000 && ws.readyState === WebSocket.OPEN) {
        console.warn("[watchdog] no frame in 90s — reconnecting");
        ws.close();
      }
    }, 15_000);
  });

  ws.on("message", (buf) => {
    lastFrameMs = Date.now();
    const f = JSON.parse(buf.toString());
    if (f.type === "connected") {
      console.log(`[connected] tier=${f.tier} push_mode=${f.push_mode}`);
    } else if (f.type === "initial_state") {
      console.log(`[snapshot] ${f.count} rows`);
    } else if (f.type === "odds_update") {
      for (const row of f.data) handleRow(row);
    } else if (f.type === "heartbeat") {
      /* ignore */
    }
  });

  ws.on("close", (code, reason) => {
    clearInterval(watchdog);
    console.log(`[close] ${code} ${reason}`);
    // Don't retry on auth / tier / missing-key
    if ([1008, 4001, 4003].includes(code)) {
      console.error("Fatal — fix key, exiting");
      process.exit(1);
    }
    setTimeout(connect, backoffMs);
    backoffMs = Math.min(backoffMs * 2, 30_000);
  });

  ws.on("error", (err) => console.error("[error]", err.message));
}

function handleRow(row) {
  console.log(`${row.bookmaker.padEnd(12)} ${row.market_key.padEnd(10)} `
            + `${(row.outcome || "").padEnd(30)} `
            + `${row.price_american > 0 ? "+" : ""}${row.price_american}`);
}

connect();
process.on("SIGINT", () => process.exit(0));

Python (websockets)

# stream.py  ·  pip install websockets
import asyncio, json, os, sys, websockets

KEY   = os.environ["PARLAY_API_KEY"]
SPORT = "basketball_nba"
URL   = f"wss://parlay-api.com/v1/ws/odds/{SPORT}"

FATAL = {1008, 4001, 4003}

async def consume():
    backoff = 1
    while True:
        try:
            async with websockets.connect(
                URL,
                additional_headers={"X-API-Key": KEY},
                ping_interval=20,
                ping_timeout=20,
                close_timeout=5,
            ) as ws:
                backoff = 1
                async for raw in ws:
                    frame = json.loads(raw)
                    if frame["type"] == "connected":
                        print(f"[connected] {frame['tier']} {frame['push_mode']}")
                    elif frame["type"] == "initial_state":
                        print(f"[snapshot] {frame['count']} rows")
                    elif frame["type"] == "odds_update":
                        for row in frame["data"]:
                            print(f"  {row['bookmaker']:<12} "
                                  f"{row['market_key']:<10} "
                                  f"{row.get('outcome',''):<30} "
                                  f"{row['price_american']:+d}")
        except websockets.ConnectionClosed as e:
            if e.code in FATAL:
                print(f"fatal close {e.code}: {e.reason}", file=sys.stderr)
                return
            print(f"closed {e.code}: {e.reason} — retrying in {backoff}s")
        except Exception as e:
            print(f"error: {e} — retrying in {backoff}s")
        await asyncio.sleep(backoff)
        backoff = min(backoff * 2, 30)

asyncio.run(consume())

Python (aiohttp)

# stream_aio.py  ·  pip install aiohttp
import aiohttp, asyncio, json, os

KEY   = os.environ["PARLAY_API_KEY"]
SPORT = "basketball_nba"
URL   = f"wss://parlay-api.com/v1/ws/odds/{SPORT}"

async def main():
    async with aiohttp.ClientSession(headers={"X-API-Key": KEY}) as sess:
        async with sess.ws_connect(URL, heartbeat=20) as ws:
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    frame = json.loads(msg.data)
                    print(frame["type"], frame.get("count", ""))
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

asyncio.run(main())

Vanilla browser JS

Browsers can't set custom headers, so we use the query-param auth form. Don't ship your key in a browser bundle — proxy through your own backend.

const KEY = "pk_live_xxxx";        // ⚠️ keep this server-side
const SPORT = "basketball_nba";
const URL = `wss://parlay-api.com/v1/ws/odds/${SPORT}?apiKey=${KEY}`;

const ws = new WebSocket(URL);

ws.onopen    = ()   => console.log("[open]");
ws.onclose   = (e)  => console.log("[close]", e.code, e.reason);
ws.onerror   = (e)  => console.error("[error]", e);
ws.onmessage = (ev) => {
  const f = JSON.parse(ev.data);
  if (f.type === "odds_update") {
    f.data.forEach(row => {
      console.log(`${row.bookmaker} ${row.outcome} ${row.price_american}`);
    });
  } else {
    console.log(f.type, f);
  }
};

React hook

Drop into any component. Resilient to component remount and React 18 strict-mode double-effect.

// useOddsStream.ts
import { useEffect, useState, useRef } from "react";

type Row = {
  event_id: string; bookmaker: string; market_key: string;
  outcome: string;  price_american: number; last_update: string;
};

export function useOddsStream(sport: string, apiKey: string) {
  const [rows, setRows]   = useState<Row[]>([]);
  const [tier, setTier]   = useState<string | null>(null);
  const wsRef             = useRef<WebSocket | null>(null);

  useEffect(() => {
    if (!apiKey) return;
    const url = `wss://parlay-api.com/v1/ws/odds/${sport}?apiKey=${apiKey}`;
    const ws  = new WebSocket(url);
    wsRef.current = ws;

    ws.onmessage = (ev) => {
      const f = JSON.parse(ev.data);
      if (f.type === "connected")           setTier(f.tier);
      else if (f.type === "initial_state")  setRows(f.data);
      else if (f.type === "odds_update")    setRows(prev => mergeRows(prev, f.data));
    };

    return () => { try { ws.close(); } catch {} };
  }, [sport, apiKey]);

  return { rows, tier, ws: wsRef.current };
}

function mergeRows(prev: Row[], incoming: Row[]) {
  // Replace any (event_id, bookmaker, market_key, outcome) match; append the rest.
  const keyOf = (r: Row) => `${r.event_id}|${r.bookmaker}|${r.market_key}|${r.outcome}`;
  const map   = new Map(prev.map(r => [keyOf(r), r]));
  for (const r of incoming) map.set(keyOf(r), r);
  return Array.from(map.values());
}

Go (gorilla/websocket)

// main.go  ·  go get github.com/gorilla/websocket
package main

import (
  "encoding/json"
  "fmt"
  "log"
  "net/http"
  "os"
  "time"
  "github.com/gorilla/websocket"
)

type Frame struct {
  Type      string            `json:"type"`
  SportKey  string            `json:"sport_key"`
  Tier      string            `json:"tier"`
  PushMode  string            `json:"push_mode"`
  Timestamp int64             `json:"timestamp"`
  Count     int               `json:"count"`
  Data      []json.RawMessage `json:"data"`
}

func main() {
  key   := os.Getenv("PARLAY_API_KEY")
  url   := "wss://parlay-api.com/v1/ws/odds/basketball_nba"
  hdr   := http.Header{"X-API-Key": {key}}

  for backoff := time.Second; ; backoff = minDur(backoff*2, 30*time.Second) {
    c, _, err := websocket.DefaultDialer.Dial(url, hdr)
    if err != nil {
      log.Printf("dial: %v — retrying in %v", err, backoff)
      time.Sleep(backoff)
      continue
    }
    backoff = time.Second
    for {
      var f Frame
      if err := c.ReadJSON(&f); err != nil {
        log.Printf("read: %v", err)
        c.Close()
        break
      }
      fmt.Printf("[%s] count=%d\n", f.Type, f.Count)
    }
  }
}

func minDur(a, b time.Duration) time.Duration { if a < b { return a }; return b }

Ruby (faye-websocket)

# stream.rb  ·  gem install faye-websocket eventmachine
require "eventmachine"
require "faye/websocket"
require "json"

EM.run {
  key = ENV.fetch("PARLAY_API_KEY")
  url = "wss://parlay-api.com/v1/ws/odds/basketball_nba?apiKey=#{key}"
  ws  = Faye::WebSocket::Client.new(url)

  ws.on(:message) { |ev|
    f = JSON.parse(ev.data)
    case f["type"]
    when "connected"     then puts "tier=#{f['tier']}"
    when "odds_update"   then puts "updates: #{f['count']}"
    end
  }
  ws.on(:close) { |ev| EM.stop }
}

Java (Java-WebSocket)

// build.gradle:   implementation 'org.java-websocket:Java-WebSocket:1.5.6'
// or Maven:       org.java-websocket:Java-WebSocket:1.5.6
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.util.Map;

public class ParlayStream extends WebSocketClient {
  public ParlayStream(URI uri, String apiKey) {
    super(uri, Map.of("X-API-Key", apiKey));
  }
  public void onOpen(ServerHandshake h)            { System.out.println("[open]"); }
  public void onMessage(String msg)                { System.out.println(msg.substring(0, Math.min(140, msg.length()))); }
  public void onClose(int code, String r, boolean remote) { System.out.println("close " + code); }
  public void onError(Exception ex)                { ex.printStackTrace(); }

  public static void main(String[] args) throws Exception {
    String key  = System.getenv("PARLAY_API_KEY");
    URI    uri  = new URI("wss://parlay-api.com/v1/ws/odds/basketball_nba");
    new ParlayStream(uri, key).connectBlocking();
  }
}

PHP (Ratchet/Pawl)

// composer require ratchet/pawl
<?php
require __DIR__ . "/vendor/autoload.php";

$key = getenv("PARLAY_API_KEY");
$url = "wss://parlay-api.com/v1/ws/odds/basketball_nba?apiKey={$key}";

$loop = React\EventLoop\Loop::get();
$conn = new Ratchet\Client\Connector($loop);

$conn($url)->then(function($conn) {
  $conn->on("message", function($msg) {
    $f = json_decode($msg);
    if ($f->type === "odds_update") {
      echo "{$f->count} updates\n";
    }
  });
  $conn->on("close", function($code) { echo "closed $code\n"; });
});

$loop->run();

C# (.NET ClientWebSocket)

using System;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

class Program {
  static async Task Main() {
    var key = Environment.GetEnvironmentVariable("PARLAY_API_KEY")!;
    var ws  = new ClientWebSocket();
    ws.Options.SetRequestHeader("X-API-Key", key);
    await ws.ConnectAsync(
      new Uri("wss://parlay-api.com/v1/ws/odds/basketball_nba"),
      CancellationToken.None);

    var buf = new byte[64 * 1024];
    while (ws.State == WebSocketState.Open) {
      var r = await ws.ReceiveAsync(new ArraySegment<byte>(buf), CancellationToken.None);
      if (r.MessageType == WebSocketMessageType.Close) break;
      var json = Encoding.UTF8.GetString(buf, 0, r.Count);
      var doc  = JsonDocument.Parse(json);
      Console.WriteLine(doc.RootElement.GetProperty("type").GetString());
    }
  }
}

Elixir (websockex)

# mix.exs:  {:websockex, "~> 0.4"}
defmodule ParlayStream do
  use WebSockex

  def start_link(key) do
    url = "wss://parlay-api.com/v1/ws/odds/basketball_nba?apiKey=#{key}"
    WebSockex.start_link(url, __MODULE__, %{})
  end

  def handle_frame({:text, msg}, state) do
    case Jason.decode!(msg) do
      %{"type" => "odds_update", "count" => n} -> IO.puts("#{n} updates")
      _ -> :ok
    end
    {:ok, state}
  end
end

Rust (tokio-tungstenite)

// Cargo.toml
// tokio = { version = "1", features = ["full"] }
// tokio-tungstenite = "0.21"
// futures-util = "0.3"
// serde_json = "1"

use futures_util::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite::client::IntoClientRequest};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let key = std::env::var("PARLAY_API_KEY")?;
    let mut req = "wss://parlay-api.com/v1/ws/odds/basketball_nba".into_client_request()?;
    req.headers_mut().insert("X-API-Key", key.parse()?);

    let (mut ws, _) = connect_async(req).await?;
    while let Some(msg) = ws.next().await {
        let txt = msg?.into_text()?;
        let v: serde_json::Value = serde_json::from_str(&txt)?;
        println!("{}", v["type"]);
    }
    Ok(())
}

Discord bot — alert on +EV

Tag yourself in Discord when a price drops more than 30 cents at a specific book. Build on the Node example.

// Discord webhook URL stored in env (Server Settings → Integrations → Webhooks → Copy)
const DISCORD_WEBHOOK = process.env.DISCORD_WEBHOOK;
const THRESHOLD_CENTS = 30;
const lastPrice = new Map();  // (event|book|market|outcome) → previous price

function handleRow(row) {
  const key  = `${row.event_id}|${row.bookmaker}|${row.market_key}|${row.outcome}`;
  const prev = lastPrice.get(key);
  const now  = row.price_american;
  if (prev != null) {
    const drop = prev - now;  // negative side getting more negative = sharper
    if (Math.abs(drop) >= THRESHOLD_CENTS) {
      fetch(DISCORD_WEBHOOK, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          content: `${row.bookmaker} **${row.outcome}** ${row.market_key} `
                 + `moved ${prev > 0 ? "+" : ""}${prev} → ${now > 0 ? "+" : ""}${now} `
                 + `(${drop > 0 ? "-" : "+"}${Math.abs(drop)})`
        }),
      });
    }
  }
  lastPrice.set(key, now);
}

See the full edge-alert pattern for the Pinnacle-anchored version that filters by EV percentage rather than absolute cent movement.

Fan out to Redis pubsub

If multiple downstream services want the same stream, hold one WebSocket per sport and re-publish to Redis. Subscribers consume from Redis, not from our endpoint, so you stay within your tier's concurrent-conn cap.

// fanout.mjs  ·  npm install ws ioredis
import WebSocket from "ws";
import Redis     from "ioredis";

const redis = new Redis(process.env.REDIS_URL);
const ws    = new WebSocket(
  `wss://parlay-api.com/v1/ws/odds/basketball_nba`,
  { headers: { "X-API-Key": process.env.PARLAY_API_KEY } });

ws.on("message", (buf) => {
  const f = JSON.parse(buf.toString());
  if (f.type === "odds_update") {
    for (const row of f.data) {
      redis.publish(`odds:${row.event_id}`, JSON.stringify(row));
    }
  }
});

Bridge to Kafka

Same pattern, Kafka producer instead of Redis. One topic per sport works well; partition by event_id for ordered per-event consumption.

// bridge.mjs  ·  npm install ws kafkajs
import WebSocket  from "ws";
import { Kafka } from "kafkajs";

const kafka    = new Kafka({ brokers: ["broker:9092"] });
const producer = kafka.producer();
await producer.connect();

const ws = new WebSocket(
  `wss://parlay-api.com/v1/ws/odds/basketball_nba`,
  { headers: { "X-API-Key": process.env.PARLAY_API_KEY } });

ws.on("message", async (buf) => {
  const f = JSON.parse(buf.toString());
  if (f.type === "odds_update") {
    await producer.send({
      topic: "odds.basketball_nba",
      messages: f.data.map(r => ({
        key: r.event_id,
        value: JSON.stringify(r),
      })),
    });
  }
});