""" """ """ websocket - WebSocket client library for Python Copyright (C) 2010 Hiroki Ohtani(liris) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA """ import selectors import sys import threading import time import traceback from ._abnf import ABNF from ._core import WebSocket, getdefaulttimeout from ._exceptions import * from . import _logging __all__ = ["WebSocketApp"] class Dispatcher: """ Dispatcher """ def __init__(self, app, ping_timeout): self.app = app self.ping_timeout = ping_timeout def read(self, sock, read_callback, check_callback): while self.app.keep_running: sel = selectors.DefaultSelector() sel.register(self.app.sock.sock, selectors.EVENT_READ) r = sel.select(self.ping_timeout) if r: if not read_callback(): break check_callback() sel.close() class SSLDispatcher: """ SSLDispatcher """ def __init__(self, app, ping_timeout): self.app = app self.ping_timeout = ping_timeout def read(self, sock, read_callback, check_callback): while self.app.keep_running: r = self.select() if r: if not read_callback(): break check_callback() def select(self): sock = self.app.sock.sock if sock.pending(): return [sock,] sel = selectors.DefaultSelector() sel.register(sock, selectors.EVENT_READ) r = sel.select(self.ping_timeout) sel.close() if len(r) > 0: return r[0][0] class WebSocketApp(object): """ Higher level of APIs are provided. The interface is like JavaScript WebSocket object. """ def __init__(self, url, header=None, on_open=None, on_message=None, on_error=None, on_close=None, on_ping=None, on_pong=None, on_cont_message=None, keep_running=True, get_mask_key=None, cookie=None, subprotocols=None, on_data=None): """ WebSocketApp initialization Parameters ---------- url: websocket url. header: list or dict custom header for websocket handshake. on_open: callable object which is called at opening websocket. this function has one argument. The argument is this class object. on_message: callable object which is called when received data. on_message has 2 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. on_error: callable object which is called when we get error. on_error has 2 arguments. The 1st argument is this class object. The 2nd argument is exception object. on_close: callable object which is called when closed the connection. this function has one argument. The argument is this class object. on_cont_message: callback object which is called when receive continued frame data. on_cont_message has 3 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is continue flag. if 0, the data continue to next frame data on_data: callback object which is called when a message received. This is called before on_message or on_cont_message, and then on_message or on_cont_message is called. on_data has 4 argument. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. The 4th argument is continue flag. if 0, the data continue keep_running: this parameter is obsolete and ignored. get_mask_key: func a callable to produce new mask keys, see the WebSocket.set_mask_key's docstring for more information cookie: str cookie value. subprotocols: array of available sub protocols. default is None. """ self.url = url self.header = header if header is not None else [] self.cookie = cookie self.on_open = on_open self.on_message = on_message self.on_data = on_data self.on_error = on_error self.on_close = on_close self.on_ping = on_ping self.on_pong = on_pong self.on_cont_message = on_cont_message self.keep_running = False self.get_mask_key = get_mask_key self.sock = None self.last_ping_tm = 0 self.last_pong_tm = 0 self.subprotocols = subprotocols def send(self, data, opcode=ABNF.OPCODE_TEXT): """ send message Parameters ---------- data: Message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. opcode: Operation code of data. default is OPCODE_TEXT. """ if not self.sock or self.sock.send(data, opcode) == 0: raise WebSocketConnectionClosedException( "Connection is already closed.") def close(self, **kwargs): """ Close websocket connection. """ self.keep_running = False if self.sock: self.sock.close(**kwargs) self.sock = None def _send_ping(self, interval, event, payload): while not event.wait(interval): self.last_ping_tm = time.time() if self.sock: try: self.sock.ping(payload) except Exception as ex: _logging.warning("send_ping routine terminated: {}".format(ex)) break def run_forever(self, sockopt=None, sslopt=None, ping_interval=0, ping_timeout=None, ping_payload="", http_proxy_host=None, http_proxy_port=None, http_no_proxy=None, http_proxy_auth=None, skip_utf8_validation=False, host=None, origin=None, dispatcher=None, suppress_origin=False, proxy_type=None, enable_multithread=True): """ Run event loop for WebSocket framework. This loop is an infinite loop and is alive while websocket is available. Parameters ---------- sockopt: tuple values for socket.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt. sslopt: dict optional dict object for ssl socket option. ping_interval: int or float automatically send "ping" command every specified period (in seconds) if set to 0, not send automatically. ping_timeout: int or float timeout (in seconds) if the pong message is not received. ping_payload: str payload message to send with each ping. http_proxy_host: http proxy host name. http_proxy_port: http proxy port. If not set, set to 80. http_no_proxy: host names, which doesn't use proxy. skip_utf8_validation: bool skip utf8 validation. host: str update host header. origin: str update origin header. dispatcher: customize reading data from socket. suppress_origin: bool suppress outputting origin header. Returns ------- teardown: bool False if caught KeyboardInterrupt, True if other exception was raised during a loop """ if ping_timeout is not None and ping_timeout <= 0: raise WebSocketException("Ensure ping_timeout > 0") if ping_interval is not None and ping_interval < 0: raise WebSocketException("Ensure ping_interval >= 0") if ping_timeout and ping_interval and ping_interval <= ping_timeout: raise WebSocketException("Ensure ping_interval > ping_timeout") if not sockopt: sockopt = [] if not sslopt: sslopt = {} if self.sock: raise WebSocketException("socket is already opened") thread = None self.keep_running = True self.last_ping_tm = 0 self.last_pong_tm = 0 def teardown(close_frame=None): """ Tears down the connection. If close_frame is set, we will invoke the on_close handler with the statusCode and reason from there. """ if thread and thread.is_alive(): event.set() thread.join() self.keep_running = False if self.sock: self.sock.close() close_status_code, close_reason = self._get_close_args( close_frame if close_frame else None) self._callback(self.on_close, close_status_code, close_reason) self.sock = None try: self.sock = WebSocket( self.get_mask_key, sockopt=sockopt, sslopt=sslopt, fire_cont_frame=self.on_cont_message is not None, skip_utf8_validation=skip_utf8_validation, enable_multithread=enable_multithread) self.sock.settimeout(getdefaulttimeout()) self.sock.connect( self.url, header=self.header, cookie=self.cookie, http_proxy_host=http_proxy_host, http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, host=host, origin=origin, suppress_origin=suppress_origin, proxy_type=proxy_type) if not dispatcher: dispatcher = self.create_dispatcher(ping_timeout) self._callback(self.on_open) if ping_interval: event = threading.Event() thread = threading.Thread( target=self._send_ping, args=(ping_interval, event, ping_payload)) thread.daemon = True thread.start() def read(): if not self.keep_running: return teardown() op_code, frame = self.sock.recv_data_frame(True) if op_code == ABNF.OPCODE_CLOSE: return teardown(frame) elif op_code == ABNF.OPCODE_PING: self._callback(self.on_ping, frame.data) elif op_code == ABNF.OPCODE_PONG: self.last_pong_tm = time.time() self._callback(self.on_pong, frame.data) elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: self._callback(self.on_data, frame.data, frame.opcode, frame.fin) self._callback(self.on_cont_message, frame.data, frame.fin) else: data = frame.data if op_code == ABNF.OPCODE_TEXT: data = data.decode("utf-8") self._callback(self.on_data, data, frame.opcode, True) self._callback(self.on_message, data) return True def check(): if (ping_timeout): has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout if (self.last_ping_tm and has_timeout_expired and (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): raise WebSocketTimeoutException("ping/pong timed out") return True dispatcher.read(self.sock.sock, read, check) except (Exception, KeyboardInterrupt, SystemExit) as e: self._callback(self.on_error, e) if isinstance(e, SystemExit): # propagate SystemExit further raise teardown() return not isinstance(e, KeyboardInterrupt) def create_dispatcher(self, ping_timeout): timeout = ping_timeout or 10 if self.sock.is_ssl(): return SSLDispatcher(self, timeout) return Dispatcher(self, timeout) def _get_close_args(self, close_frame): """ _get_close_args extracts the close code and reason from the close body if it exists (RFC6455 says WebSocket Connection Close Code is optional) """ # Need to catch the case where close_frame is None # Otherwise the following if statement causes an error if not self.on_close or not close_frame: return [None, None] # Extract close frame status code if close_frame.data and len(close_frame.data) >= 2: close_status_code = 256 * close_frame.data[0] + close_frame.data[1] reason = close_frame.data[2:].decode('utf-8') return [close_status_code, reason] else: # Most likely reached this because len(close_frame_data.data) < 2 return [None, None] def _callback(self, callback, *args): if callback: try: callback(self, *args) except Exception as e: _logging.error("error from callback {}: {}".format(callback, e)) if _logging.isEnabledForDebug(): _, _, tb = sys.exc_info() traceback.print_tb(tb)