import { EventEmitter } from 'events';
interface StreamConfig {
accountId: string;
apiKey: string;
sampleRate: number;
language?: string;
maxReconnectAttempts?: number;
baseReconnectDelayMs?: number;
maxReconnectDelayMs?: number;
audioBufferMaxSize?: number;
}
interface TranscriptionSegment {
index: number;
text: string;
isFinal: boolean;
}
type ConnectionState =
| 'disconnected'
| 'connecting'
| 'connected'
| 'reconnecting';
class ProductionTranscriptionStream extends EventEmitter {
private ws: WebSocket | null = null;
private state: ConnectionState = 'disconnected';
private reconnectAttempt = 0;
private audioBuffer: string[] = [];
private segments: string[] = [];
private currentSegmentIndex = 0;
private token: string | null = null;
private abortController: AbortController | null = null;
private readonly config: Required<StreamConfig>;
constructor(config: StreamConfig) {
super();
this.config = {
maxReconnectAttempts: 5,
baseReconnectDelayMs: 1000,
maxReconnectDelayMs: 30000,
audioBufferMaxSize: 100, // Buffer up to 100 audio chunks
language: 'en',
...config,
};
}
async connect(signal?: AbortSignal): Promise<void> {
if (signal?.aborted) {
throw new Error('Connection aborted');
}
this.abortController = new AbortController();
this.state = 'connecting';
this.emit('stateChange', this.state);
try {
// Get fresh token
this.token = await this.fetchToken();
await this.establishConnection();
} catch (error) {
this.state = 'disconnected';
this.emit('stateChange', this.state);
throw error;
}
}
private async fetchToken(): Promise<string> {
const response = await fetch(
'https://api.sully.ai/v1/audio/transcriptions/stream/token',
{
method: 'POST',
headers: {
'X-API-Key': this.config.apiKey,
'X-Account-Id': this.config.accountId,
},
signal: this.abortController?.signal,
}
);
if (!response.ok) {
throw new Error(`Token fetch failed: ${response.status}`);
}
const { data } = await response.json();
return data.token;
}
private async establishConnection(): Promise<void> {
return new Promise((resolve, reject) => {
const params = new URLSearchParams({
sample_rate: this.config.sampleRate.toString(),
account_id: this.config.accountId,
api_token: this.token!,
});
if (this.config.language) {
params.set('language', this.config.language);
}
const url = `wss://api.sully.ai/v1/audio/transcriptions/stream?${params}`;
this.ws = new WebSocket(url);
const connectionTimeout = setTimeout(() => {
this.ws?.close();
reject(new Error('Connection timeout'));
}, 10000);
this.ws.onopen = () => {
clearTimeout(connectionTimeout);
this.state = 'connected';
this.reconnectAttempt = 0;
this.emit('stateChange', this.state);
this.emit('connected');
// Flush buffered audio
this.flushAudioBuffer();
resolve();
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onerror = (error) => {
clearTimeout(connectionTimeout);
this.emit('error', error);
};
this.ws.onclose = (event) => {
clearTimeout(connectionTimeout);
this.handleDisconnect(event);
if (this.state === 'connecting') {
reject(new Error(`Connection closed: ${event.code}`));
}
};
});
}
private handleMessage(data: string): void {
try {
const message = JSON.parse(data);
if (message.error) {
this.emit('error', new Error(message.error));
return;
}
if (message.text !== undefined) {
this.segments[this.currentSegmentIndex] = message.text;
const segment: TranscriptionSegment = {
index: this.currentSegmentIndex,
text: message.text,
isFinal: message.isFinal ?? false,
};
this.emit('transcription', segment);
if (message.isFinal) {
this.currentSegmentIndex++;
}
}
} catch (error) {
this.emit('error', new Error(`Failed to parse message: ${data}`));
}
}
private async handleDisconnect(event: CloseEvent): Promise<void> {
const wasConnected = this.state === 'connected';
this.ws = null;
// Normal closure or intentional disconnect
if (event.code === 1000 || this.state === 'disconnected') {
this.state = 'disconnected';
this.emit('stateChange', this.state);
this.emit('disconnected', { code: event.code, reason: event.reason });
return;
}
// Unexpected disconnect - attempt reconnection
if (wasConnected && this.reconnectAttempt < this.config.maxReconnectAttempts) {
await this.attemptReconnect();
} else {
this.state = 'disconnected';
this.emit('stateChange', this.state);
this.emit('disconnected', {
code: event.code,
reason: event.reason,
reconnectFailed: true,
});
}
}
private async attemptReconnect(): Promise<void> {
this.state = 'reconnecting';
this.emit('stateChange', this.state);
const delay = this.calculateBackoff();
this.emit('reconnecting', {
attempt: this.reconnectAttempt + 1,
maxAttempts: this.config.maxReconnectAttempts,
delayMs: delay,
});
await this.sleep(delay);
this.reconnectAttempt++;
try {
// Get fresh token for reconnection
this.token = await this.fetchToken();
await this.establishConnection();
} catch (error) {
this.emit('error', error);
// Will trigger another reconnect attempt via onclose handler
}
}
private calculateBackoff(): number {
const exponentialDelay =
this.config.baseReconnectDelayMs * Math.pow(2, this.reconnectAttempt);
const cappedDelay = Math.min(
exponentialDelay,
this.config.maxReconnectDelayMs
);
const jitter = cappedDelay * Math.random() * 0.25;
return Math.floor(cappedDelay + jitter);
}
sendAudio(audioData: ArrayBuffer | Uint8Array): void {
const base64Audio = this.arrayBufferToBase64(audioData);
if (this.state === 'connected' && this.ws?.readyState === WebSocket.OPEN) {
// Send immediately if connected
this.ws.send(JSON.stringify({ audio: base64Audio }));
} else if (
this.state === 'reconnecting' ||
this.state === 'connecting'
) {
// Buffer audio during reconnection
this.bufferAudio(base64Audio);
}
// Drop audio if disconnected (not reconnecting)
}
private bufferAudio(base64Audio: string): void {
this.audioBuffer.push(base64Audio);
// Prevent unbounded buffer growth
while (this.audioBuffer.length > this.config.audioBufferMaxSize) {
this.audioBuffer.shift();
this.emit('bufferOverflow');
}
}
private flushAudioBuffer(): void {
if (this.audioBuffer.length === 0) return;
const bufferedCount = this.audioBuffer.length;
this.emit('bufferFlush', { count: bufferedCount });
for (const base64Audio of this.audioBuffer) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ audio: base64Audio }));
}
}
this.audioBuffer = [];
}
private arrayBufferToBase64(buffer: ArrayBuffer | Uint8Array): string {
const bytes = buffer instanceof Uint8Array ? buffer : new Uint8Array(buffer);
let binary = '';
for (let i = 0; i < bytes.byteLength; i++) {
binary += String.fromCharCode(bytes[i]);
}
return btoa(binary);
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
getTranscript(): string {
return this.segments.join(' ');
}
getState(): ConnectionState {
return this.state;
}
disconnect(): void {
this.state = 'disconnected';
this.abortController?.abort();
this.ws?.close(1000, 'Client disconnect');
this.ws = null;
this.audioBuffer = [];
}
}
// Usage example
const stream = new ProductionTranscriptionStream({
accountId: process.env.SULLY_ACCOUNT_ID!,
apiKey: process.env.SULLY_API_KEY!,
sampleRate: 16000,
language: 'en',
});
stream.on('stateChange', (state) => {
console.log(`Connection state: ${state}`);
});
stream.on('transcription', (segment) => {
if (segment.isFinal) {
console.log(`[Final] ${segment.text}`);
} else {
console.log(`[Interim] ${segment.text}`);
}
});
stream.on('reconnecting', ({ attempt, maxAttempts, delayMs }) => {
console.log(`Reconnecting (${attempt}/${maxAttempts}) in ${delayMs}ms`);
});
stream.on('error', (error) => {
console.error('Stream error:', error);
});
await stream.connect();
// Send audio from microphone, file, etc.
// stream.sendAudio(audioChunk);
// When done
// stream.disconnect();