import { EventTarget } from "./event-target"; import { messageFrom,Message, TypeMessage } from "./message"; import { RPCError } from "./rpc-error"; import SockJS from 'sockjs-client'; const EventTypeMessage = "message"; export class Client extends EventTarget { _conn: any _rpcID: number _pendingRPC: {} _queue: Message[] _reconnectionDelay: number _autoReconnect: boolean debug: boolean constructor(autoReconnect = true) { super(); this._conn = null; this._onConnectionClose = this._onConnectionClose.bind(this); this._onConnectionMessage = this._onConnectionMessage.bind(this); this._handleRPCResponse = this._handleRPCResponse.bind(this); this._rpcID = 0; this._pendingRPC = {}; this._queue = []; this._reconnectionDelay = 250; this._autoReconnect = autoReconnect; this.debug = false; this.connect = this.connect.bind(this); this.disconnect = this.disconnect.bind(this); this.rpc = this.rpc.bind(this); this.send = this.send.bind(this); this.upload = this.upload.bind(this); this.addEventListener("message", this._handleRPCResponse); } connect(token = "") { return new Promise((resolve, reject) => { const url = `//${document.location.host}/edge/sock?token=${token}`; this._log("opening connection to", url); const conn: any = new SockJS(url); const onOpen = () => { this._log('client connected'); resetHandlers(); conn.onclose = this._onConnectionClose; conn.onmessage = this._onConnectionMessage; this._conn = conn; this._sendQueued(); setTimeout(() => { this._dispatchConnect(); }, 0); return resolve(this); }; const onError = (evt) => { resetHandlers(); this._scheduleReconnection(); return reject(evt); }; const resetHandlers = () => { conn.removeEventListener('open', onOpen); conn.removeEventListener('close', onError); conn.removeEventListener('error', onError); }; conn.addEventListener('open', onOpen); conn.addEventListener('error', onError); conn.addEventListener('close', onError); }); } disconnect() { this._cleanupConnection(); } _onConnectionMessage(evt) { const rawMessage = JSON.parse(evt.data); const message = messageFrom(rawMessage); const event = new CustomEvent(message.getType(), { cancelable: true, detail: message.getPayload() }); this.dispatchEvent(event); } _handleRPCResponse(evt) { const { jsonrpc, id, error, result } = evt.detail; if (jsonrpc !== '2.0' || id === undefined) return; // Prevent additional handlers to catch this event evt.stopImmediatePropagation(); const pending = this._pendingRPC[id]; if (!pending) return; delete this._pendingRPC[id]; if (error) { pending.reject(new RPCError(error.code, error.message, error.data)); return; } pending.resolve(result); } _onConnectionClose(evt) { this._log('client disconnected'); this._dispatchDisconnect(); this._cleanupConnection(); this._scheduleReconnection(); } _dispatchDisconnect() { const event = new CustomEvent('disconnect'); this.dispatchEvent(event); } _dispatchConnect() { const event = new CustomEvent('connect'); this.dispatchEvent(event); } _scheduleReconnection() { if (!this._autoReconnect) return; this._reconnectionDelay = this._reconnectionDelay * 2 + Math.random(); this._log('client will try to reconnect in %dms', this._reconnectionDelay); setTimeout(this.connect.bind(this), this._reconnectionDelay); } _cleanupConnection() { if (!this._conn) return; this._conn.onopen = null; this._conn.onerror = null; this._conn.onclose = null; this._conn.onmessage = null; this._conn.close(); this._conn = null; } _send(message) { if (!this._conn) return false; this._log('sending message', message); this._conn.send(JSON.stringify(message)); return true; } _sendQueued() { this._log("sending queued messages", this._queue.length); let msg = this._queue.shift(); while (msg) { const sent = this._send(msg); if (!sent) return; msg = this._queue.shift(); } } _log(...args) { if (!this.debug) return; console.log(...args); } _sendOrQueue(msg) { if (this.isConnected()) { this._sendQueued(); this._send(msg); } else { this._log('queuing message', msg); this._queue.push(msg); } } send(data) { const msg = new Message("message", data); this._sendOrQueue(msg); } rpc(method, params) { return new Promise((resolve, reject) => { const id = this._rpcID++; const rpc = new Message(TypeMessage, { jsonrpc: '2.0', id, method, params }); this._sendOrQueue(rpc); this._pendingRPC[id.toString()] = { resolve, reject }; }); } isConnected() { return this._conn !== null; } upload(blob: string|Blob, metadata: any) { return new Promise((resolve, reject) => { const formData = new FormData(); formData.set("file", blob); if (metadata) { try { formData.set("metadata", JSON.stringify(metadata)); } catch(err) { return reject(err); } } const xhr = new XMLHttpRequest(); const result = { onProgress: null, abort: () => xhr.abort(), result: () => { return new Promise((resolve, reject) => { xhr.onload = () => { let data; try { data = JSON.parse(xhr.responseText); } catch(err) { reject(err); return; } resolve(data); }; xhr.onerror = reject; xhr.onabort = reject; }); } }; xhr.upload.onprogress = evt => { if (typeof result.onProgress !== 'function') return; (result as any).onProgress(evt.loaded, evt.total); }; xhr.onabort = reject; xhr.onerror = reject; xhr.open('POST', `/edge/api/v1/upload`); xhr.send(formData); resolve(result); }); } blobUrl(bucket: string, blobId: string) { return `/edge/api/v1/download/${bucket}/${blobId}`; } }