diff --git a/resources/lib/websocket/__init__.py b/resources/lib/websocket/__init__.py index 38da7b30..2cf07af7 100644 --- a/resources/lib/websocket/__init__.py +++ b/resources/lib/websocket/__init__.py @@ -25,4 +25,4 @@ from ._exceptions import * from ._logging import * from ._socket import * -__version__ = "1.0.0" +__version__ = "1.1.0" diff --git a/resources/lib/websocket/_abnf.py b/resources/lib/websocket/_abnf.py index 554f3b08..e14acbc8 100644 --- a/resources/lib/websocket/_abnf.py +++ b/resources/lib/websocket/_abnf.py @@ -25,30 +25,31 @@ Copyright (C) 2010 Hiroki Ohtani(liris) import array import os import struct +import sys from ._exceptions import * from ._utils import validate_utf8 from threading import Lock try: - import numpy -except ImportError: - numpy = None + # If wsaccel is available, use compiled routines to mask data. + # wsaccel only provides around a 10% speed boost compared + # to the websocket-client _mask() implementation. + # Note that wsaccel is unmaintained. + from wsaccel.xormask import XorMaskerSimple -try: - # If wsaccel is available we use compiled routines to mask data. - if not numpy: - from wsaccel.xormask import XorMaskerSimple - - def _mask(_m, _d): - return XorMaskerSimple(_m).process(_d) -except ImportError: - # wsaccel is not available, we rely on python implementations. def _mask(_m, _d): - for i in range(len(_d)): - _d[i] ^= _m[i % 4] + return XorMaskerSimple(_m).process(_d) - return _d.tobytes() +except ImportError: + # wsaccel is not available, use websocket-client _mask() + native_byteorder = sys.byteorder + + def _mask(mask_value, data_value): + datalen = len(data_value) + data_value = int.from_bytes(data_value, native_byteorder) + mask_value = int.from_bytes(mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder) + return (data_value ^ mask_value).to_bytes(datalen, native_byteorder) __all__ = [ @@ -266,19 +267,7 @@ class ABNF(object): if isinstance(data, str): data = data.encode('latin-1') - if numpy: - origlen = len(data) - _mask_key = mask_key[3] << 24 | mask_key[2] << 16 | mask_key[1] << 8 | mask_key[0] - - # We need data to be a multiple of four... - data += b' ' * (4 - (len(data) % 4)) - a = numpy.frombuffer(data, dtype="uint32") - masked = numpy.bitwise_xor(a, [_mask_key]).astype("uint32") - if len(data) > origlen: - return masked.tobytes()[:origlen] - return masked.tobytes() - else: - return _mask(array.array("B", mask_key), array.array("B", data)) + return _mask(array.array("B", mask_key), array.array("B", data)) class frame_buffer(object): @@ -374,7 +363,7 @@ class frame_buffer(object): return frame def recv_strict(self, bufsize): - shortage = bufsize - sum(len(x) for x in self.recv_buffer) + shortage = bufsize - sum(map(len, self.recv_buffer)) while shortage > 0: # Limit buffer size that we pass to socket.recv() to avoid # fragmenting the heap -- the number of bytes recv() actually diff --git a/resources/lib/websocket/_app.py b/resources/lib/websocket/_app.py index 2326796b..8e885e2b 100644 --- a/resources/lib/websocket/_app.py +++ b/resources/lib/websocket/_app.py @@ -105,52 +105,56 @@ class WebSocketApp(object): Parameters ---------- - url: - websocket url. + url: str + Websocket url. header: list or dict - custom header for websocket handshake. - on_open: - callable object which is called at opening websocket. - this function has one argument. The argument is this class object. - on_message: - callable object which is called when received data. + Custom header for websocket handshake. + on_open: function + Callback object which is called at opening websocket. + on_open has one argument. + The 1st argument is this class object. + on_message: function + Callback object which is called when received data. on_message has 2 arguments. The 1st argument is this class object. - The 2nd argument is utf-8 string which we get from the server. - on_error: - callable object which is called when we get error. + The 2nd argument is utf-8 data received from the server. + on_error: function + Callback object which is called when we get error. on_error has 2 arguments. The 1st argument is this class object. The 2nd argument is exception object. - on_close: - callable object which is called when closed the connection. - this function has one argument. The argument is this class object. - on_cont_message: - callback object which is called when receive continued - frame data. + on_close: function + Callback object which is called when connection is closed. + on_close has 3 arguments. + The 1st argument is this class object. + The 2nd argument is close_status_code. + The 3rd argument is close_msg. + on_cont_message: function + Callback object which is called when a continuation + frame is received. on_cont_message has 3 arguments. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is continue flag. if 0, the data continue to next frame data - on_data: - callback object which is called when a message received. + on_data: function + Callback object which is called when a message received. This is called before on_message or on_cont_message, and then on_message or on_cont_message is called. on_data has 4 argument. The 1st argument is this class object. The 2nd argument is utf-8 string which we get from the server. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. - The 4th argument is continue flag. if 0, the data continue - keep_running: - this parameter is obsolete and ignored. - get_mask_key: func - a callable to produce new mask keys, - see the WebSocket.set_mask_key's docstring for more information + The 4th argument is continue flag. If 0, the data continue + keep_running: bool + This parameter is obsolete and ignored. + get_mask_key: function + A callable function to get new mask keys, see the + WebSocket.set_mask_key's docstring for more information. cookie: str - cookie value. - subprotocols: - array of available sub protocols. default is None. + Cookie value. + subprotocols: list + List of available sub protocols. Default is None. """ self.url = url self.header = header if header is not None else [] @@ -177,11 +181,11 @@ class WebSocketApp(object): Parameters ---------- - data: + data: str Message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. - opcode: - Operation code of data. default is OPCODE_TEXT. + opcode: int + Operation code of data. Default is OPCODE_TEXT. """ if not self.sock or self.sock.send(data, opcode) == 0: @@ -214,8 +218,7 @@ class WebSocketApp(object): http_no_proxy=None, http_proxy_auth=None, skip_utf8_validation=False, host=None, origin=None, dispatcher=None, - suppress_origin=False, proxy_type=None, - enable_multithread=True): + suppress_origin=False, proxy_type=None): """ Run event loop for WebSocket framework. @@ -224,32 +227,32 @@ class WebSocketApp(object): Parameters ---------- sockopt: tuple - values for socket.setsockopt. + Values for socket.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt. sslopt: dict - optional dict object for ssl socket option. + Optional dict object for ssl socket option. ping_interval: int or float - automatically send "ping" command - every specified period (in seconds) - if set to 0, not send automatically. + Automatically send "ping" command + every specified period (in seconds). + If set to 0, no ping is sent periodically. ping_timeout: int or float - timeout (in seconds) if the pong message is not received. + Timeout (in seconds) if the pong message is not received. ping_payload: str - payload message to send with each ping. - http_proxy_host: - http proxy host name. - http_proxy_port: - http proxy port. If not set, set to 80. - http_no_proxy: - host names, which doesn't use proxy. + Payload message to send with each ping. + http_proxy_host: str + HTTP proxy host name. + http_proxy_port: int or str + HTTP proxy port. If not set, set to 80. + http_no_proxy: list + Whitelisted host names that don't use the proxy. skip_utf8_validation: bool skip utf8 validation. host: str update host header. origin: str update origin header. - dispatcher: + dispatcher: Dispatcher object customize reading data from socket. suppress_origin: bool suppress outputting origin header. @@ -281,8 +284,11 @@ class WebSocketApp(object): """ Tears down the connection. - If close_frame is set, we will invoke the on_close handler with the - statusCode and reason from there. + Parameters + ---------- + close_frame: ABNF frame + If close_frame is set, the on_close handler is invoked + with the statusCode and reason from the provided frame. """ if thread and thread.is_alive(): @@ -301,7 +307,7 @@ class WebSocketApp(object): self.get_mask_key, sockopt=sockopt, sslopt=sslopt, fire_cont_frame=self.on_cont_message is not None, skip_utf8_validation=skip_utf8_validation, - enable_multithread=enable_multithread) + enable_multithread=True) self.sock.settimeout(getdefaulttimeout()) self.sock.connect( self.url, header=self.header, cookie=self.cookie, diff --git a/resources/lib/websocket/_core.py b/resources/lib/websocket/_core.py index 1e36d90b..39cd7e4c 100644 --- a/resources/lib/websocket/_core.py +++ b/resources/lib/websocket/_core.py @@ -62,30 +62,31 @@ class WebSocket(object): Parameters ---------- get_mask_key: func - a callable to produce new mask keys, see the set_mask_key - function's docstring for more details + A callable function to get new mask keys, see the + WebSocket.set_mask_key's docstring for more information. sockopt: tuple - values for socket.setsockopt. + Values for socket.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt. sslopt: dict - optional dict object for ssl socket option. + Optional dict object for ssl socket options. fire_cont_frame: bool - fire recv event for each cont frame. default is False + Fire recv event for each cont frame. Default is False. enable_multithread: bool - if set to True, lock send method. + If set to True, lock send method. skip_utf8_validation: bool - skip utf8 validation. + Skip utf8 validation. """ def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, - fire_cont_frame=False, enable_multithread=False, + fire_cont_frame=False, enable_multithread=True, skip_utf8_validation=False, **_): """ Initialize WebSocket object. Parameters ---------- - sslopt: specify ssl certification verification options + sslopt: dict + Optional dict object for ssl socket options. """ self.sock_opt = sock_opt(sockopt, sslopt) self.handshake_response = None @@ -194,7 +195,10 @@ class WebSocket(object): return None def is_ssl(self): - return isinstance(self.sock, ssl.SSLSocket) + try: + return isinstance(self.sock, ssl.SSLSocket) + except: + return False headers = property(getheaders) @@ -210,40 +214,38 @@ class WebSocket(object): ... header=["User-Agent: MyProgram", ... "x-custom: header"]) - timeout: - socket timeout time. This value is an integer or float. - if you set None for this value, it means "use default_timeout value" - Parameters ---------- - options: - - header: list or dict - custom http header list or dict. - - cookie: str - cookie value. - - origin: str - custom origin url. - - connection: str - custom connection header value. - default value "Upgrade" set in _handshake.py - - suppress_origin: bool - suppress outputting origin header. - - host: str - custom host header string. - - http_proxy_host: - http proxy host name. - - http_proxy_port: - http proxy port. If not set, set to 80. - - http_no_proxy: - host names, which doesn't use proxy. - - http_proxy_auth: - http proxy auth information. tuple of username and password. default is None - - redirect_limit: - number of redirects to follow. - - subprotocols: - array of available sub protocols. default is None. - - socket: - pre-initialized stream socket. + header: list or dict + Custom http header list or dict. + cookie: str + Cookie value. + origin: str + Custom origin url. + connection: str + Custom connection header value. + Default value "Upgrade" set in _handshake.py + suppress_origin: bool + Suppress outputting origin header. + host: str + Custom host header string. + timeout: int or float + Socket timeout time. This value is an integer or float. + If you set None for this value, it means "use default_timeout value" + http_proxy_host: str + HTTP proxy host name. + http_proxy_port: str or int + HTTP proxy port. Default is 80. + http_no_proxy: list + Whitelisted host names that don't use the proxy. + http_proxy_auth: tuple + HTTP proxy auth information. Tuple of username and password. Default is None. + redirect_limit: int + Number of redirects to follow. + subprotocols: list + List of available subprotocols. Default is None. + socket: socket + Pre-initialized stream socket. """ self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), @@ -271,12 +273,12 @@ class WebSocket(object): Parameters ---------- - payload: str - Payload must be utf-8 string or unicode, - if the opcode is OPCODE_TEXT. - Otherwise, it must be string(byte array) - opcode: int - operation code to send. Please see OPCODE_XXX. + payload: str + Payload must be utf-8 string or unicode, + If the opcode is OPCODE_TEXT. + Otherwise, it must be string(byte array). + opcode: int + Operation code (opcode) to send. """ frame = ABNF.create_frame(payload, opcode) @@ -440,10 +442,10 @@ class WebSocket(object): Parameters ---------- - status: - status code to send. see STATUS_XXX. + status: int + Status code to send. See STATUS_XXX. reason: str or bytes - the reason to close. This must be string or bytes. + The reason to close. This must be string or bytes. """ if status < 0 or status >= ABNF.LENGTH_16: raise ValueError("code is invalid range") @@ -457,11 +459,11 @@ class WebSocket(object): Parameters ---------- status: int - status code to send. see STATUS_XXX. + Status code to send. See STATUS_XXX. reason: bytes - the reason to close. + The reason to close. timeout: int or float - timeout until receive a close frame. + Timeout until receive a close frame. If None, it will wait forever until receive a close frame. """ if self.connected: @@ -490,10 +492,8 @@ class WebSocket(object): break self.sock.settimeout(sock_timeout) self.sock.shutdown(socket.SHUT_RDWR) - except OSError: # This happens often on Mac - pass except: - raise + pass self.shutdown() @@ -544,52 +544,51 @@ def create_connection(url, timeout=None, class_=WebSocket, **options): Parameters ---------- + class_: class + class to instantiate when creating the connection. It has to implement + settimeout and connect. It's __init__ should be compatible with + WebSocket.__init__, i.e. accept all of it's kwargs. + header: list or dict + custom http header list or dict. + cookie: str + Cookie value. + origin: str + custom origin url. + suppress_origin: bool + suppress outputting origin header. + host: str + custom host header string. timeout: int or float - socket timeout time. This value could be either float/integer. - if you set None for this value, - it means "use default_timeout value" - class_: - class to instantiate when creating the connection. It has to implement - settimeout and connect. It's __init__ should be compatible with - WebSocket.__init__, i.e. accept all of it's kwargs. - options: - - header: list or dict - custom http header list or dict. - - cookie: str - cookie value. - - origin: str - custom origin url. - - suppress_origin: bool - suppress outputting origin header. - - host: - custom host header string. - - http_proxy_host: - http proxy host name. - - http_proxy_port: - http proxy port. If not set, set to 80. - - http_no_proxy: - host names, which doesn't use proxy. - - http_proxy_auth: - http proxy auth information. tuple of username and password. default is None - - enable_multithread: bool - enable lock for multithread. - - redirect_limit: - number of redirects to follow. - - sockopt: - socket options - - sslopt: - ssl option - - subprotocols: - array of available sub protocols. default is None. - - skip_utf8_validation: bool - skip utf8 validation. - - socket: - pre-initialized stream socket. + socket timeout time. This value could be either float/integer. + If set to None, it uses the default_timeout value. + http_proxy_host: str + HTTP proxy host name. + http_proxy_port: str or int + HTTP proxy port. If not set, set to 80. + http_no_proxy: list + Whitelisted host names that don't use the proxy. + http_proxy_auth: tuple + HTTP proxy auth information. tuple of username and password. Default is None. + enable_multithread: bool + Enable lock for multithread. + redirect_limit: int + Number of redirects to follow. + sockopt: tuple + Values for socket.setsockopt. + sockopt must be a tuple and each element is an argument of sock.setsockopt. + sslopt: dict + Optional dict object for ssl socket options. + subprotocols: list + List of available subprotocols. Default is None. + skip_utf8_validation: bool + Skip utf8 validation. + socket: socket + Pre-initialized stream socket. """ sockopt = options.pop("sockopt", []) sslopt = options.pop("sslopt", {}) fire_cont_frame = options.pop("fire_cont_frame", False) - enable_multithread = options.pop("enable_multithread", False) + enable_multithread = options.pop("enable_multithread", True) skip_utf8_validation = options.pop("skip_utf8_validation", False) websock = class_(sockopt=sockopt, sslopt=sslopt, fire_cont_frame=fire_cont_frame, diff --git a/resources/lib/websocket/_http.py b/resources/lib/websocket/_http.py index a13667a2..5db4d02a 100644 --- a/resources/lib/websocket/_http.py +++ b/resources/lib/websocket/_http.py @@ -79,7 +79,7 @@ def _open_proxied_socket(url, options, proxy): (hostname, port), proxy_type=ptype, proxy_addr=proxy.host, - proxy_port=proxy.port, + proxy_port=int(proxy.port), proxy_rdns=rdns, proxy_username=proxy.auth[0] if proxy.auth else None, proxy_password=proxy.auth[1] if proxy.auth else None, @@ -200,7 +200,7 @@ def _open_socket(addrinfo_list, sockopt, timeout): def _wrap_sni_socket(sock, sslopt, hostname, check_hostname): - context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_SSLv23)) + context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_TLS)) if sslopt.get('cert_reqs', ssl.CERT_NONE) != ssl.CERT_NONE: cafile = sslopt.get('ca_certs', None) @@ -248,6 +248,9 @@ def _ssl_socket(sock, user_sslopt, hostname): and user_sslopt.get('ca_cert_path', None) is None: sslopt['ca_cert_path'] = certPath + if sslopt.get('server_hostname', None): + hostname = sslopt['server_hostname'] + check_hostname = sslopt["cert_reqs"] != ssl.CERT_NONE and sslopt.pop( 'check_hostname', True) sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname) diff --git a/resources/lib/websocket/_socket.py b/resources/lib/websocket/_socket.py index 70e163f2..29a8b4c9 100644 --- a/resources/lib/websocket/_socket.py +++ b/resources/lib/websocket/_socket.py @@ -125,7 +125,7 @@ def recv(sock, bufsize): if not bytes_: raise WebSocketConnectionClosedException( - "Connection is already closed.") + "Connection to remote host was lost.") return bytes_ diff --git a/resources/lib/websocket/_url.py b/resources/lib/websocket/_url.py index 32aaf23f..08679118 100644 --- a/resources/lib/websocket/_url.py +++ b/resources/lib/websocket/_url.py @@ -26,7 +26,7 @@ import os import socket import struct -from urllib.parse import urlparse +from urllib.parse import unquote, urlparse __all__ = ["parse_url", "get_proxy_info"] @@ -109,7 +109,7 @@ def _is_address_in_network(ip, net): def _is_no_proxy_host(hostname, no_proxy): if not no_proxy: - v = os.environ.get("no_proxy", "").replace(" ", "") + v = os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(" ", "") if v: no_proxy = v.split(",") if not no_proxy: @@ -139,22 +139,21 @@ def get_proxy_info( Parameters ---------- - hostname: - websocket server name. - is_secure: - is the connection secure? (wss) looks for "https_proxy" in env + hostname: str + Websocket server name. + is_secure: bool + Is the connection secure? (wss) looks for "https_proxy" in env before falling back to "http_proxy" - options: - - http_proxy_host: - http proxy host name. - - http_proxy_port: - http proxy port. - - http_no_proxy: - host names, which doesn't use proxy. - - http_proxy_auth: - http proxy auth information. tuple of username and password. default is None - - proxy_type: - if set to "socks5" PySocks wrapper will be used in place of a http proxy. default is "http" + proxy_host: str + http proxy host name. + http_proxy_port: str or int + http proxy port. + http_no_proxy: list + Whitelisted host names that don't use the proxy. + http_proxy_auth: tuple + HTTP proxy auth information. Tuple of username and password. Default is None. + proxy_type: str + If set to "socks4" or "socks5", a PySocks wrapper will be used in place of a HTTP proxy. Default is "http". """ if _is_no_proxy_host(hostname, no_proxy): return None, 0, None @@ -169,10 +168,10 @@ def get_proxy_info( env_keys.insert(0, "https_proxy") for key in env_keys: - value = os.environ.get(key, None) + value = os.environ.get(key, os.environ.get(key.upper(), "")).replace(" ", "") if value: proxy = urlparse(value) - auth = (proxy.username, proxy.password) if proxy.username else None + auth = (unquote(proxy.username), unquote(proxy.password)) if proxy.username else None return proxy.hostname, proxy.port, auth return None, 0, None diff --git a/resources/lib/websocket/tests/__init__.py b/resources/lib/websocket/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/resources/lib/websocket/tests/data/header01.txt b/resources/lib/websocket/tests/data/header01.txt deleted file mode 100644 index d44d24c2..00000000 --- a/resources/lib/websocket/tests/data/header01.txt +++ /dev/null @@ -1,6 +0,0 @@ -HTTP/1.1 101 WebSocket Protocol Handshake -Connection: Upgrade -Upgrade: WebSocket -Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0= -some_header: something - diff --git a/resources/lib/websocket/tests/data/header02.txt b/resources/lib/websocket/tests/data/header02.txt deleted file mode 100644 index f481de92..00000000 --- a/resources/lib/websocket/tests/data/header02.txt +++ /dev/null @@ -1,6 +0,0 @@ -HTTP/1.1 101 WebSocket Protocol Handshake -Connection: Upgrade -Upgrade WebSocket -Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0= -some_header: something - diff --git a/resources/lib/websocket/tests/data/header03.txt b/resources/lib/websocket/tests/data/header03.txt deleted file mode 100644 index 030e13a8..00000000 --- a/resources/lib/websocket/tests/data/header03.txt +++ /dev/null @@ -1,7 +0,0 @@ -HTTP/1.1 101 WebSocket Protocol Handshake -Connection: Upgrade, Keep-Alive -Upgrade: WebSocket -Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0= -Set-Cookie: Token=ABCDE -some_header: something - diff --git a/resources/lib/websocket/tests/test_abnf.py b/resources/lib/websocket/tests/test_abnf.py deleted file mode 100644 index d629c5c2..00000000 --- a/resources/lib/websocket/tests/test_abnf.py +++ /dev/null @@ -1,94 +0,0 @@ -# -*- coding: utf-8 -*- -# -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -""" - -import os -import websocket as ws -from websocket._abnf import * -import sys -import unittest -sys.path[0:0] = [""] - - -class ABNFTest(unittest.TestCase): - - def testInit(self): - a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING) - self.assertEqual(a.fin, 0) - self.assertEqual(a.rsv1, 0) - self.assertEqual(a.rsv2, 0) - self.assertEqual(a.rsv3, 0) - self.assertEqual(a.opcode, 9) - self.assertEqual(a.data, '') - a_bad = ABNF(0,1,0,0, opcode=77) - self.assertEqual(a_bad.rsv1, 1) - self.assertEqual(a_bad.opcode, 77) - - def testValidate(self): - a_invalid_ping = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING) - self.assertRaises(ws._exceptions.WebSocketProtocolException, a_invalid_ping.validate, skip_utf8_validation=False) - a_bad_rsv_value = ABNF(0,1,0,0, opcode=ABNF.OPCODE_TEXT) - self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_rsv_value.validate, skip_utf8_validation=False) - a_bad_opcode = ABNF(0,0,0,0, opcode=77) - self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_opcode.validate, skip_utf8_validation=False) - a_bad_close_frame = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01') - self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame.validate, skip_utf8_validation=False) - a_bad_close_frame_2 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01\x8a\xaa\xff\xdd') - self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_2.validate, skip_utf8_validation=False) - a_bad_close_frame_3 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x03\xe7') - self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_3.validate, skip_utf8_validation=True) - - def testMask(self): - abnf_none_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data=None) - bytes_val = bytes("aaaa", 'utf-8') - self.assertEqual(abnf_none_data._get_masked(bytes_val), bytes_val) - abnf_str_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data="a") - self.assertEqual(abnf_str_data._get_masked(bytes_val), b'aaaa\x00') - - def testFormat(self): - abnf_bad_rsv_bits = ABNF(2,0,0,0, opcode=ABNF.OPCODE_TEXT) - self.assertRaises(ValueError, abnf_bad_rsv_bits.format) - abnf_bad_opcode = ABNF(0,0,0,0, opcode=5) - self.assertRaises(ValueError, abnf_bad_opcode.format) - abnf_length_10 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, data="abcdefghij") - self.assertEqual(b'\x01', abnf_length_10.format()[0].to_bytes(1, 'big')) - self.assertEqual(b'\x8a', abnf_length_10.format()[1].to_bytes(1, 'big')) - self.assertEqual("fin=0 opcode=1 data=abcdefghij", abnf_length_10.__str__()) - abnf_length_20 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_BINARY, data="abcdefghijabcdefghij") - self.assertEqual(b'\x02', abnf_length_20.format()[0].to_bytes(1, 'big')) - self.assertEqual(b'\x94', abnf_length_20.format()[1].to_bytes(1, 'big')) - abnf_no_mask = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, mask=0, data=b'\x01\x8a\xcc') - self.assertEqual(b'\x01\x03\x01\x8a\xcc', abnf_no_mask.format()) - - def testFrameBuffer(self): - fb = frame_buffer(0, True) - self.assertEqual(fb.recv, 0) - self.assertEqual(fb.skip_utf8_validation, True) - fb.clear - self.assertEqual(fb.header, None) - self.assertEqual(fb.length, None) - self.assertEqual(fb.mask, None) - self.assertEqual(fb.has_mask(), False) - - -if __name__ == "__main__": - unittest.main() diff --git a/resources/lib/websocket/tests/test_app.py b/resources/lib/websocket/tests/test_app.py deleted file mode 100644 index 2678793a..00000000 --- a/resources/lib/websocket/tests/test_app.py +++ /dev/null @@ -1,176 +0,0 @@ -# -*- coding: utf-8 -*- -# -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -""" - -import os -import os.path -import websocket as ws -import sys -import ssl -import unittest -sys.path[0:0] = [""] - -# Skip test to access the internet. -TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' -TRACEABLE = True - - -class WebSocketAppTest(unittest.TestCase): - - class NotSetYet(object): - """ A marker class for signalling that a value hasn't been set yet. - """ - - def setUp(self): - ws.enableTrace(TRACEABLE) - - WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() - WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() - WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() - - def tearDown(self): - WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet() - WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet() - WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet() - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testKeepRunning(self): - """ A WebSocketApp should keep running as long as its self.keep_running - is not False (in the boolean context). - """ - - def on_open(self, *args, **kwargs): - """ Set the keep_running flag for later inspection and immediately - close the connection. - """ - WebSocketAppTest.keep_running_open = self.keep_running - self.close() - - def on_close(self, *args, **kwargs): - """ Set the keep_running flag for the test to use. - """ - WebSocketAppTest.keep_running_close = self.keep_running - self.send("connection should be closed here") - - app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close) - app.run_forever() - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testSockMaskKey(self): - """ A WebSocketApp should forward the received mask_key function down - to the actual socket. - """ - - def my_mask_key_func(): - return "\x00\x00\x00\x00" - - app = ws.WebSocketApp('wss://stream.meetup.com/2/rsvps', get_mask_key=my_mask_key_func) - - # if numpy is installed, this assertion fail - # Note: We can't use 'is' for comparing the functions directly, need to use 'id'. - self.assertEqual(id(app.get_mask_key), id(my_mask_key_func)) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testInvalidPingIntervalPingTimeout(self): - """ Test exception handling if ping_interval < ping_timeout - """ - - def on_ping(app, msg): - print("Got a ping!") - app.close() - - def on_pong(app, msg): - print("Got a pong! No need to respond") - app.close() - - app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong) - self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=1, ping_timeout=2, sslopt={"cert_reqs": ssl.CERT_NONE}) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testPingInterval(self): - """ Test WebSocketApp proper ping functionality - """ - - def on_ping(app, msg): - print("Got a ping!") - app.close() - - def on_pong(app, msg): - print("Got a pong! No need to respond") - app.close() - - app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong) - app.run_forever(ping_interval=2, ping_timeout=1, sslopt={"cert_reqs": ssl.CERT_NONE}) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testOpcodeClose(self): - """ Test WebSocketApp close opcode - """ - - app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect') - app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload") - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testOpcodeBinary(self): - """ Test WebSocketApp binary opcode - """ - - app = ws.WebSocketApp('streaming.vn.teslamotors.com/streaming/') - app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload") - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testBadPingInterval(self): - """ A WebSocketApp handling of negative ping_interval - """ - app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1') - self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=-5, sslopt={"cert_reqs": ssl.CERT_NONE}) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testBadPingTimeout(self): - """ A WebSocketApp handling of negative ping_timeout - """ - app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1') - self.assertRaises(ws.WebSocketException, app.run_forever, ping_timeout=-3, sslopt={"cert_reqs": ssl.CERT_NONE}) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testCloseStatusCode(self): - """ Test extraction of close frame status code and close reason in WebSocketApp - """ - def on_close(wsapp, close_status_code, close_msg): - print("on_close reached") - - app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect', on_close=on_close) - closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'\x03\xe8no-init-from-client') - self.assertEqual([1000, 'no-init-from-client'], app._get_close_args(closeframe)) - - closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'') - self.assertEqual([None, None], app._get_close_args(closeframe)) - - app2 = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect') - closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'') - self.assertEqual([None, None], app2._get_close_args(closeframe)) - - self.assertRaises(ws.WebSocketConnectionClosedException, app.send, data="test if connection is closed") - - -if __name__ == "__main__": - unittest.main() diff --git a/resources/lib/websocket/tests/test_cookiejar.py b/resources/lib/websocket/tests/test_cookiejar.py deleted file mode 100644 index 8d2dff50..00000000 --- a/resources/lib/websocket/tests/test_cookiejar.py +++ /dev/null @@ -1,118 +0,0 @@ -""" - -""" - -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -""" -import unittest - -from websocket._cookiejar import SimpleCookieJar - - -class CookieJarTest(unittest.TestCase): - def testAdd(self): - cookie_jar = SimpleCookieJar() - cookie_jar.add("") - self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar") - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b") - self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar") - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b; domain=.abc") - self.assertTrue(".abc" in cookie_jar.jar) - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b; domain=abc") - self.assertTrue(".abc" in cookie_jar.jar) - self.assertTrue("abc" not in cookie_jar.jar) - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b; c=d; domain=abc") - self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") - self.assertEqual(cookie_jar.get(None), "") - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b; c=d; domain=abc") - cookie_jar.add("e=f; domain=abc") - self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f") - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b; c=d; domain=abc") - cookie_jar.add("e=f; domain=.abc") - self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f") - - cookie_jar = SimpleCookieJar() - cookie_jar.add("a=b; c=d; domain=abc") - cookie_jar.add("e=f; domain=xyz") - self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") - self.assertEqual(cookie_jar.get("xyz"), "e=f") - self.assertEqual(cookie_jar.get("something"), "") - - def testSet(self): - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b") - self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar") - - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; domain=.abc") - self.assertTrue(".abc" in cookie_jar.jar) - - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; domain=abc") - self.assertTrue(".abc" in cookie_jar.jar) - self.assertTrue("abc" not in cookie_jar.jar) - - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; c=d; domain=abc") - self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") - - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; c=d; domain=abc") - cookie_jar.set("e=f; domain=abc") - self.assertEqual(cookie_jar.get("abc"), "e=f") - - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; c=d; domain=abc") - cookie_jar.set("e=f; domain=.abc") - self.assertEqual(cookie_jar.get("abc"), "e=f") - - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; c=d; domain=abc") - cookie_jar.set("e=f; domain=xyz") - self.assertEqual(cookie_jar.get("abc"), "a=b; c=d") - self.assertEqual(cookie_jar.get("xyz"), "e=f") - self.assertEqual(cookie_jar.get("something"), "") - - def testGet(self): - cookie_jar = SimpleCookieJar() - cookie_jar.set("a=b; c=d; domain=abc.com") - self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d") - self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d") - self.assertEqual(cookie_jar.get("abc.com.es"), "") - self.assertEqual(cookie_jar.get("xabc.com"), "") - - cookie_jar.set("a=b; c=d; domain=.abc.com") - self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d") - self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d") - self.assertEqual(cookie_jar.get("abc.com.es"), "") - self.assertEqual(cookie_jar.get("xabc.com"), "") diff --git a/resources/lib/websocket/tests/test_http.py b/resources/lib/websocket/tests/test_http.py deleted file mode 100644 index 2a059eac..00000000 --- a/resources/lib/websocket/tests/test_http.py +++ /dev/null @@ -1,150 +0,0 @@ -# -*- coding: utf-8 -*- -# -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -""" - -import os -import os.path -import websocket as ws -from websocket._http import proxy_info, read_headers, _open_proxied_socket, _tunnel, _get_addrinfo_list, connect -import sys -import unittest -import ssl -import websocket -import socks -import socket -sys.path[0:0] = [""] - -# Skip test to access the internet. -TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' - - -class SockMock(object): - def __init__(self): - self.data = [] - self.sent = [] - - def add_packet(self, data): - self.data.append(data) - - def gettimeout(self): - return None - - def recv(self, bufsize): - if self.data: - e = self.data.pop(0) - if isinstance(e, Exception): - raise e - if len(e) > bufsize: - self.data.insert(0, e[bufsize:]) - return e[:bufsize] - - def send(self, data): - self.sent.append(data) - return len(data) - - def close(self): - pass - - -class HeaderSockMock(SockMock): - - def __init__(self, fname): - SockMock.__init__(self) - path = os.path.join(os.path.dirname(__file__), fname) - with open(path, "rb") as f: - self.add_packet(f.read()) - - -class OptsList(): - - def __init__(self): - self.timeout = 1 - self.sockopt = [] - - -class HttpTest(unittest.TestCase): - - def testReadHeader(self): - status, header, status_message = read_headers(HeaderSockMock("data/header01.txt")) - self.assertEqual(status, 101) - self.assertEqual(header["connection"], "Upgrade") - # header02.txt is intentionally malformed - self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")) - - def testTunnel(self): - self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header01.txt"), "example.com", 80, ("username", "password")) - self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header02.txt"), "example.com", 80, ("username", "password")) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testConnect(self): - # Not currently testing an actual proxy connection, so just check whether TypeError is raised. This requires internet for a DNS lookup - self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host=None, http_proxy_port=None, proxy_type=None)) - self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http")) - self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks4")) - self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks5h")) - self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http")) - self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http")) - self.assertRaises(socks.ProxyConnectionError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=8080, proxy_type="socks4"), None) - self.assertRaises(socket.timeout, connect, "wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), None) - self.assertEqual( - connect("wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), True), - (True, ("google.com", 443, "/"))) - # The following test fails on Mac OS with a gaierror, not an OverflowError - # self.assertRaises(OverflowError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=99999, proxy_type="socks4", timeout=2), False) - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testSSLopt(self): - ssloptions = { - "cert_reqs": ssl.CERT_NONE, - "check_hostname": False, - "ssl_version": ssl.PROTOCOL_SSLv23, - "ciphers": "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:\ - TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:\ - ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:\ - ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:\ - DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:\ - ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES128-GCM-SHA256:\ - ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:\ - DHE-RSA-AES256-SHA256:ECDHE-ECDSA-AES128-SHA256:\ - ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256:\ - ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA", - "ecdh_curve": "prime256v1" - } - ws_ssl1 = websocket.WebSocket(sslopt=ssloptions) - ws_ssl1.connect("wss://api.bitfinex.com/ws/2") - ws_ssl1.send("Hello") - ws_ssl1.close() - - ws_ssl2 = websocket.WebSocket(sslopt={"check_hostname": True}) - ws_ssl2.connect("wss://api.bitfinex.com/ws/2") - ws_ssl2.close - - def testProxyInfo(self): - self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").type, "http") - self.assertRaises(ValueError, proxy_info, http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="badval") - self.assertEqual(proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http").host, "example.com") - self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").port, "8080") - self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").auth, None) - - -if __name__ == "__main__": - unittest.main() diff --git a/resources/lib/websocket/tests/test_url.py b/resources/lib/websocket/tests/test_url.py deleted file mode 100644 index a33e9343..00000000 --- a/resources/lib/websocket/tests/test_url.py +++ /dev/null @@ -1,301 +0,0 @@ -# -*- coding: utf-8 -*- -# -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -""" - -import sys -import os -import unittest -sys.path[0:0] = [""] -from websocket._url import get_proxy_info, parse_url, _is_address_in_network, _is_no_proxy_host - - -class UrlTest(unittest.TestCase): - - def test_address_in_network(self): - self.assertTrue(_is_address_in_network('127.0.0.1', '127.0.0.0/8')) - self.assertTrue(_is_address_in_network('127.1.0.1', '127.0.0.0/8')) - self.assertFalse(_is_address_in_network('127.1.0.1', '127.0.0.0/24')) - - def testParseUrl(self): - p = parse_url("ws://www.example.com/r") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 80) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], False) - - p = parse_url("ws://www.example.com/r/") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 80) - self.assertEqual(p[2], "/r/") - self.assertEqual(p[3], False) - - p = parse_url("ws://www.example.com/") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 80) - self.assertEqual(p[2], "/") - self.assertEqual(p[3], False) - - p = parse_url("ws://www.example.com") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 80) - self.assertEqual(p[2], "/") - self.assertEqual(p[3], False) - - p = parse_url("ws://www.example.com:8080/r") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], False) - - p = parse_url("ws://www.example.com:8080/") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/") - self.assertEqual(p[3], False) - - p = parse_url("ws://www.example.com:8080") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/") - self.assertEqual(p[3], False) - - p = parse_url("wss://www.example.com:8080/r") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], True) - - p = parse_url("wss://www.example.com:8080/r?key=value") - self.assertEqual(p[0], "www.example.com") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/r?key=value") - self.assertEqual(p[3], True) - - self.assertRaises(ValueError, parse_url, "http://www.example.com/r") - - p = parse_url("ws://[2a03:4000:123:83::3]/r") - self.assertEqual(p[0], "2a03:4000:123:83::3") - self.assertEqual(p[1], 80) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], False) - - p = parse_url("ws://[2a03:4000:123:83::3]:8080/r") - self.assertEqual(p[0], "2a03:4000:123:83::3") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], False) - - p = parse_url("wss://[2a03:4000:123:83::3]/r") - self.assertEqual(p[0], "2a03:4000:123:83::3") - self.assertEqual(p[1], 443) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], True) - - p = parse_url("wss://[2a03:4000:123:83::3]:8080/r") - self.assertEqual(p[0], "2a03:4000:123:83::3") - self.assertEqual(p[1], 8080) - self.assertEqual(p[2], "/r") - self.assertEqual(p[3], True) - - -class IsNoProxyHostTest(unittest.TestCase): - def setUp(self): - self.no_proxy = os.environ.get("no_proxy", None) - if "no_proxy" in os.environ: - del os.environ["no_proxy"] - - def tearDown(self): - if self.no_proxy: - os.environ["no_proxy"] = self.no_proxy - elif "no_proxy" in os.environ: - del os.environ["no_proxy"] - - def testMatchAll(self): - self.assertTrue(_is_no_proxy_host("any.websocket.org", ['*'])) - self.assertTrue(_is_no_proxy_host("192.168.0.1", ['*'])) - self.assertTrue(_is_no_proxy_host("any.websocket.org", ['other.websocket.org', '*'])) - os.environ['no_proxy'] = '*' - self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) - self.assertTrue(_is_no_proxy_host("192.168.0.1", None)) - os.environ['no_proxy'] = 'other.websocket.org, *' - self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) - - def testIpAddress(self): - self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.1'])) - self.assertFalse(_is_no_proxy_host("127.0.0.2", ['127.0.0.1'])) - self.assertTrue(_is_no_proxy_host("127.0.0.1", ['other.websocket.org', '127.0.0.1'])) - self.assertFalse(_is_no_proxy_host("127.0.0.2", ['other.websocket.org', '127.0.0.1'])) - os.environ['no_proxy'] = '127.0.0.1' - self.assertTrue(_is_no_proxy_host("127.0.0.1", None)) - self.assertFalse(_is_no_proxy_host("127.0.0.2", None)) - os.environ['no_proxy'] = 'other.websocket.org, 127.0.0.1' - self.assertTrue(_is_no_proxy_host("127.0.0.1", None)) - self.assertFalse(_is_no_proxy_host("127.0.0.2", None)) - - def testIpAddressInRange(self): - self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.0/8'])) - self.assertTrue(_is_no_proxy_host("127.0.0.2", ['127.0.0.0/8'])) - self.assertFalse(_is_no_proxy_host("127.1.0.1", ['127.0.0.0/24'])) - os.environ['no_proxy'] = '127.0.0.0/8' - self.assertTrue(_is_no_proxy_host("127.0.0.1", None)) - self.assertTrue(_is_no_proxy_host("127.0.0.2", None)) - os.environ['no_proxy'] = '127.0.0.0/24' - self.assertFalse(_is_no_proxy_host("127.1.0.1", None)) - - def testHostnameMatch(self): - self.assertTrue(_is_no_proxy_host("my.websocket.org", ['my.websocket.org'])) - self.assertTrue(_is_no_proxy_host("my.websocket.org", ['other.websocket.org', 'my.websocket.org'])) - self.assertFalse(_is_no_proxy_host("my.websocket.org", ['other.websocket.org'])) - os.environ['no_proxy'] = 'my.websocket.org' - self.assertTrue(_is_no_proxy_host("my.websocket.org", None)) - self.assertFalse(_is_no_proxy_host("other.websocket.org", None)) - os.environ['no_proxy'] = 'other.websocket.org, my.websocket.org' - self.assertTrue(_is_no_proxy_host("my.websocket.org", None)) - - def testHostnameMatchDomain(self): - self.assertTrue(_is_no_proxy_host("any.websocket.org", ['.websocket.org'])) - self.assertTrue(_is_no_proxy_host("my.other.websocket.org", ['.websocket.org'])) - self.assertTrue(_is_no_proxy_host("any.websocket.org", ['my.websocket.org', '.websocket.org'])) - self.assertFalse(_is_no_proxy_host("any.websocket.com", ['.websocket.org'])) - os.environ['no_proxy'] = '.websocket.org' - self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) - self.assertTrue(_is_no_proxy_host("my.other.websocket.org", None)) - self.assertFalse(_is_no_proxy_host("any.websocket.com", None)) - os.environ['no_proxy'] = 'my.websocket.org, .websocket.org' - self.assertTrue(_is_no_proxy_host("any.websocket.org", None)) - - -class ProxyInfoTest(unittest.TestCase): - def setUp(self): - self.http_proxy = os.environ.get("http_proxy", None) - self.https_proxy = os.environ.get("https_proxy", None) - self.no_proxy = os.environ.get("no_proxy", None) - if "http_proxy" in os.environ: - del os.environ["http_proxy"] - if "https_proxy" in os.environ: - del os.environ["https_proxy"] - if "no_proxy" in os.environ: - del os.environ["no_proxy"] - - def tearDown(self): - if self.http_proxy: - os.environ["http_proxy"] = self.http_proxy - elif "http_proxy" in os.environ: - del os.environ["http_proxy"] - - if self.https_proxy: - os.environ["https_proxy"] = self.https_proxy - elif "https_proxy" in os.environ: - del os.environ["https_proxy"] - - if self.no_proxy: - os.environ["no_proxy"] = self.no_proxy - elif "no_proxy" in os.environ: - del os.environ["no_proxy"] - - def testProxyFromArgs(self): - self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None)) - self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128), - ("localhost", 3128, None)) - self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None)) - self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128), - ("localhost", 3128, None)) - - self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")), - ("localhost", 0, ("a", "b"))) - self.assertEqual( - get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), - ("localhost", 3128, ("a", "b"))) - self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")), - ("localhost", 0, ("a", "b"))) - self.assertEqual( - get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")), - ("localhost", 3128, ("a", "b"))) - - self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, - no_proxy=["example.com"], proxy_auth=("a", "b")), - ("localhost", 3128, ("a", "b"))) - self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, - no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")), - (None, 0, None)) - - def testProxyFromEnv(self): - os.environ["http_proxy"] = "http://localhost/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None)) - os.environ["http_proxy"] = "http://localhost:3128/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None)) - - os.environ["http_proxy"] = "http://localhost/" - os.environ["https_proxy"] = "http://localhost2/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None)) - os.environ["http_proxy"] = "http://localhost:3128/" - os.environ["https_proxy"] = "http://localhost2:3128/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None)) - - os.environ["http_proxy"] = "http://localhost/" - os.environ["https_proxy"] = "http://localhost2/" - self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None)) - os.environ["http_proxy"] = "http://localhost:3128/" - os.environ["https_proxy"] = "http://localhost2:3128/" - self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None)) - - os.environ["http_proxy"] = "http://a:b@localhost/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b"))) - os.environ["http_proxy"] = "http://a:b@localhost:3128/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b"))) - - os.environ["http_proxy"] = "http://a:b@localhost/" - os.environ["https_proxy"] = "http://a:b@localhost2/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b"))) - os.environ["http_proxy"] = "http://a:b@localhost:3128/" - os.environ["https_proxy"] = "http://a:b@localhost2:3128/" - self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b"))) - - os.environ["http_proxy"] = "http://a:b@localhost/" - os.environ["https_proxy"] = "http://a:b@localhost2/" - self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b"))) - os.environ["http_proxy"] = "http://a:b@localhost:3128/" - os.environ["https_proxy"] = "http://a:b@localhost2:3128/" - self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b"))) - - os.environ["http_proxy"] = "http://a:b@localhost/" - os.environ["https_proxy"] = "http://a:b@localhost2/" - os.environ["no_proxy"] = "example1.com,example2.com" - self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b"))) - os.environ["http_proxy"] = "http://a:b@localhost:3128/" - os.environ["https_proxy"] = "http://a:b@localhost2:3128/" - os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org" - self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None)) - os.environ["http_proxy"] = "http://a:b@localhost:3128/" - os.environ["https_proxy"] = "http://a:b@localhost2:3128/" - os.environ["no_proxy"] = "example1.com,example2.com, .websocket.org" - self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None)) - - os.environ["http_proxy"] = "http://a:b@localhost:3128/" - os.environ["https_proxy"] = "http://a:b@localhost2:3128/" - os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16" - self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None)) - self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None)) - - -if __name__ == "__main__": - unittest.main() diff --git a/resources/lib/websocket/tests/test_websocket.py b/resources/lib/websocket/tests/test_websocket.py deleted file mode 100644 index 35a928cd..00000000 --- a/resources/lib/websocket/tests/test_websocket.py +++ /dev/null @@ -1,452 +0,0 @@ -# -*- coding: utf-8 -*- -# -""" - -""" - -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - -""" - -import sys -sys.path[0:0] = [""] -import os -import os.path -import socket -import websocket as ws -from websocket._handshake import _create_sec_websocket_key, \ - _validate as _validate_header -from websocket._http import read_headers -from websocket._utils import validate_utf8 -from base64 import decodebytes as base64decode - -import unittest - -try: - import ssl - from ssl import SSLError -except ImportError: - # dummy class of SSLError for ssl none-support environment. - class SSLError(Exception): - pass - -# Skip test to access the internet. -TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1' -TRACEABLE = True - - -def create_mask_key(_): - return "abcd" - - -class SockMock(object): - def __init__(self): - self.data = [] - self.sent = [] - - def add_packet(self, data): - self.data.append(data) - - def gettimeout(self): - return None - - def recv(self, bufsize): - if self.data: - e = self.data.pop(0) - if isinstance(e, Exception): - raise e - if len(e) > bufsize: - self.data.insert(0, e[bufsize:]) - return e[:bufsize] - - def send(self, data): - self.sent.append(data) - return len(data) - - def close(self): - pass - - -class HeaderSockMock(SockMock): - - def __init__(self, fname): - SockMock.__init__(self) - path = os.path.join(os.path.dirname(__file__), fname) - with open(path, "rb") as f: - self.add_packet(f.read()) - - -class WebSocketTest(unittest.TestCase): - def setUp(self): - ws.enableTrace(TRACEABLE) - - def tearDown(self): - pass - - def testDefaultTimeout(self): - self.assertEqual(ws.getdefaulttimeout(), None) - ws.setdefaulttimeout(10) - self.assertEqual(ws.getdefaulttimeout(), 10) - ws.setdefaulttimeout(None) - - def testWSKey(self): - key = _create_sec_websocket_key() - self.assertTrue(key != 24) - self.assertTrue(str("¥n") not in key) - - def testNonce(self): - """ WebSocket key should be a random 16-byte nonce. - """ - key = _create_sec_websocket_key() - nonce = base64decode(key.encode("utf-8")) - self.assertEqual(16, len(nonce)) - - def testWsUtils(self): - key = "c6b8hTg4EeGb2gQMztV1/g==" - required_header = { - "upgrade": "websocket", - "connection": "upgrade", - "sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0="} - self.assertEqual(_validate_header(required_header, key, None), (True, None)) - - header = required_header.copy() - header["upgrade"] = "http" - self.assertEqual(_validate_header(header, key, None), (False, None)) - del header["upgrade"] - self.assertEqual(_validate_header(header, key, None), (False, None)) - - header = required_header.copy() - header["connection"] = "something" - self.assertEqual(_validate_header(header, key, None), (False, None)) - del header["connection"] - self.assertEqual(_validate_header(header, key, None), (False, None)) - - header = required_header.copy() - header["sec-websocket-accept"] = "something" - self.assertEqual(_validate_header(header, key, None), (False, None)) - del header["sec-websocket-accept"] - self.assertEqual(_validate_header(header, key, None), (False, None)) - - header = required_header.copy() - header["sec-websocket-protocol"] = "sub1" - self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1")) - # This case will print out a logging error using the error() function, but that is expected - self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None)) - - header = required_header.copy() - header["sec-websocket-protocol"] = "sUb1" - self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1")) - - header = required_header.copy() - # This case will print out a logging error using the error() function, but that is expected - self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None)) - - def testReadHeader(self): - status, header, status_message = read_headers(HeaderSockMock("data/header01.txt")) - self.assertEqual(status, 101) - self.assertEqual(header["connection"], "Upgrade") - - status, header, status_message = read_headers(HeaderSockMock("data/header03.txt")) - self.assertEqual(status, 101) - self.assertEqual(header["connection"], "Upgrade, Keep-Alive") - - HeaderSockMock("data/header02.txt") - self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt")) - - def testSend(self): - # TODO: add longer frame data - sock = ws.WebSocket() - sock.set_mask_key(create_mask_key) - s = sock.sock = HeaderSockMock("data/header01.txt") - sock.send("Hello") - self.assertEqual(s.sent[0], b'\x81\x85abcd)\x07\x0f\x08\x0e') - - sock.send("こんにちは") - self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc') - -# sock.send("x" * 5000) -# self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc") - - self.assertEqual(sock.send_binary(b'1111111111101'), 19) - - def testRecv(self): - # TODO: add longer frame data - sock = ws.WebSocket() - s = sock.sock = SockMock() - something = b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc' - s.add_packet(something) - data = sock.recv() - self.assertEqual(data, "こんにちは") - - s.add_packet(b'\x81\x85abcd)\x07\x0f\x08\x0e') - data = sock.recv() - self.assertEqual(data, "Hello") - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testIter(self): - count = 2 - for _ in ws.create_connection('wss://stream.meetup.com/2/rsvps'): - count -= 1 - if count == 0: - break - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testNext(self): - sock = ws.create_connection('wss://stream.meetup.com/2/rsvps') - self.assertEqual(str, type(next(sock))) - - def testInternalRecvStrict(self): - sock = ws.WebSocket() - s = sock.sock = SockMock() - s.add_packet(b'foo') - s.add_packet(socket.timeout()) - s.add_packet(b'bar') - # s.add_packet(SSLError("The read operation timed out")) - s.add_packet(b'baz') - with self.assertRaises(ws.WebSocketTimeoutException): - sock.frame_buffer.recv_strict(9) - # with self.assertRaises(SSLError): - # data = sock._recv_strict(9) - data = sock.frame_buffer.recv_strict(9) - self.assertEqual(data, b'foobarbaz') - with self.assertRaises(ws.WebSocketConnectionClosedException): - sock.frame_buffer.recv_strict(1) - - def testRecvTimeout(self): - sock = ws.WebSocket() - s = sock.sock = SockMock() - s.add_packet(b'\x81') - s.add_packet(socket.timeout()) - s.add_packet(b'\x8dabcd\x29\x07\x0f\x08\x0e') - s.add_packet(socket.timeout()) - s.add_packet(b'\x4e\x43\x33\x0e\x10\x0f\x00\x40') - with self.assertRaises(ws.WebSocketTimeoutException): - sock.recv() - with self.assertRaises(ws.WebSocketTimeoutException): - sock.recv() - data = sock.recv() - self.assertEqual(data, "Hello, World!") - with self.assertRaises(ws.WebSocketConnectionClosedException): - sock.recv() - - def testRecvWithSimpleFragmentation(self): - sock = ws.WebSocket() - s = sock.sock = SockMock() - # OPCODE=TEXT, FIN=0, MSG="Brevity is " - s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') - # OPCODE=CONT, FIN=1, MSG="the soul of wit" - s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17') - data = sock.recv() - self.assertEqual(data, "Brevity is the soul of wit") - with self.assertRaises(ws.WebSocketConnectionClosedException): - sock.recv() - - def testRecvWithFireEventOfFragmentation(self): - sock = ws.WebSocket(fire_cont_frame=True) - s = sock.sock = SockMock() - # OPCODE=TEXT, FIN=0, MSG="Brevity is " - s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') - # OPCODE=CONT, FIN=0, MSG="Brevity is " - s.add_packet(b'\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') - # OPCODE=CONT, FIN=1, MSG="the soul of wit" - s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17') - - _, data = sock.recv_data() - self.assertEqual(data, b'Brevity is ') - _, data = sock.recv_data() - self.assertEqual(data, b'Brevity is ') - _, data = sock.recv_data() - self.assertEqual(data, b'the soul of wit') - - # OPCODE=CONT, FIN=0, MSG="Brevity is " - s.add_packet(b'\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C') - - with self.assertRaises(ws.WebSocketException): - sock.recv_data() - - with self.assertRaises(ws.WebSocketConnectionClosedException): - sock.recv() - - def testClose(self): - sock = ws.WebSocket() - sock.connected = True - self.assertRaises(ws._exceptions.WebSocketConnectionClosedException, sock.close) - - sock = ws.WebSocket() - s = sock.sock = SockMock() - sock.connected = True - s.add_packet(b'\x88\x80\x17\x98p\x84') - sock.recv() - self.assertEqual(sock.connected, False) - - def testRecvContFragmentation(self): - sock = ws.WebSocket() - s = sock.sock = SockMock() - # OPCODE=CONT, FIN=1, MSG="the soul of wit" - s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17') - self.assertRaises(ws.WebSocketException, sock.recv) - - def testRecvWithProlongedFragmentation(self): - sock = ws.WebSocket() - s = sock.sock = SockMock() - # OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, " - s.add_packet(b'\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC') - # OPCODE=CONT, FIN=0, MSG="dear friends, " - s.add_packet(b'\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07\x17MB') - # OPCODE=CONT, FIN=1, MSG="once more" - s.add_packet(b'\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04') - data = sock.recv() - self.assertEqual( - data, - "Once more unto the breach, dear friends, once more") - with self.assertRaises(ws.WebSocketConnectionClosedException): - sock.recv() - - def testRecvWithFragmentationAndControlFrame(self): - sock = ws.WebSocket() - sock.set_mask_key(create_mask_key) - s = sock.sock = SockMock() - # OPCODE=TEXT, FIN=0, MSG="Too much " - s.add_packet(b'\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA') - # OPCODE=PING, FIN=1, MSG="Please PONG this" - s.add_packet(b'\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17') - # OPCODE=CONT, FIN=1, MSG="of a good thing" - s.add_packet(b'\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c\x08\x0c\x04') - data = sock.recv() - self.assertEqual(data, "Too much of a good thing") - with self.assertRaises(ws.WebSocketConnectionClosedException): - sock.recv() - self.assertEqual( - s.sent[0], - b'\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17') - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testWebSocket(self): - s = ws.create_connection("ws://echo.websocket.org/") - self.assertNotEqual(s, None) - s.send("Hello, World") - result = s.recv() - self.assertEqual(result, "Hello, World") - - s.send("こにゃにゃちは、世界") - result = s.recv() - self.assertEqual(result, "こにゃにゃちは、世界") - self.assertRaises(ValueError, s.send_close, -1, "") - s.close() - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testPingPong(self): - s = ws.create_connection("ws://echo.websocket.org/") - self.assertNotEqual(s, None) - s.ping("Hello") - s.pong("Hi") - s.close() - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testSecureWebSocket(self): - import ssl - s = ws.create_connection("wss://api.bitfinex.com/ws/2") - self.assertNotEqual(s, None) - self.assertTrue(isinstance(s.sock, ssl.SSLSocket)) - self.assertEqual(s.getstatus(), 101) - self.assertNotEqual(s.getheaders(), None) - s.settimeout(10) - self.assertEqual(s.gettimeout(), 10) - self.assertEqual(s.getsubprotocol(), None) - s.abort() - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testWebSocketWithCustomHeader(self): - s = ws.create_connection("ws://echo.websocket.org/", - headers={"User-Agent": "PythonWebsocketClient"}) - self.assertNotEqual(s, None) - s.send("Hello, World") - result = s.recv() - self.assertEqual(result, "Hello, World") - self.assertRaises(ValueError, s.close, -1, "") - s.close() - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testAfterClose(self): - s = ws.create_connection("ws://echo.websocket.org/") - self.assertNotEqual(s, None) - s.close() - self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello") - self.assertRaises(ws.WebSocketConnectionClosedException, s.recv) - - -class SockOptTest(unittest.TestCase): - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testSockOpt(self): - sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),) - s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt) - self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0) - s.close() - - -class UtilsTest(unittest.TestCase): - def testUtf8Validator(self): - state = validate_utf8(b'\xf0\x90\x80\x80') - self.assertEqual(state, True) - state = validate_utf8(b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited') - self.assertEqual(state, False) - state = validate_utf8(b'') - self.assertEqual(state, True) - - -class HandshakeTest(unittest.TestCase): - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def test_http_SSL(self): - websock1 = ws.WebSocket(sslopt={"cert_chain": ssl.get_default_verify_paths().capath}) - self.assertRaises(ValueError, - websock1.connect, "wss://api.bitfinex.com/ws/2") - websock2 = ws.WebSocket(sslopt={"certfile": "myNonexistentCertFile"}) - self.assertRaises(FileNotFoundError, - websock2.connect, "wss://api.bitfinex.com/ws/2") - - @unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled") - def testManualHeaders(self): - websock3 = ws.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE, - "ca_certs": ssl.get_default_verify_paths().capath, - "ca_cert_path": ssl.get_default_verify_paths().openssl_cafile}) - self.assertRaises(ws._exceptions.WebSocketBadStatusException, - websock3.connect, "wss://api.bitfinex.com/ws/2", cookie="chocolate", - origin="testing_websockets.com", - host="echo.websocket.org/websocket-client-test", - subprotocols=["testproto"], - connection="Upgrade", - header={"CustomHeader1":"123", - "Cookie":"TestValue", - "Sec-WebSocket-Key":"k9kFAUWNAMmf5OEMfTlOEA==", - "Sec-WebSocket-Protocol":"newprotocol"}) - - def testIPv6(self): - websock2 = ws.WebSocket() - self.assertRaises(ValueError, websock2.connect, "2001:4860:4860::8888") - - def testBadURLs(self): - websock3 = ws.WebSocket() - self.assertRaises(ValueError, websock3.connect, "ws//example.com") - self.assertRaises(ws.WebSocketAddressException, websock3.connect, "ws://example") - self.assertRaises(ValueError, websock3.connect, "example.com") - - -if __name__ == "__main__": - unittest.main()