Merge pull request #1553 from croneter/py3-fix-websocket

Bump websocket client: fix AttributeError: 'NoneType' object has no attribute 'is_ssl'
This commit is contained in:
croneter 2021-07-25 10:57:42 +02:00 committed by GitHub
commit cbcc4d1a74
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 196 additions and 1510 deletions

View file

@ -25,4 +25,4 @@ from ._exceptions import *
from ._logging import * from ._logging import *
from ._socket import * from ._socket import *
__version__ = "1.0.0" __version__ = "1.1.0"

View file

@ -25,30 +25,31 @@ Copyright (C) 2010 Hiroki Ohtani(liris)
import array import array
import os import os
import struct import struct
import sys
from ._exceptions import * from ._exceptions import *
from ._utils import validate_utf8 from ._utils import validate_utf8
from threading import Lock from threading import Lock
try: try:
import numpy # If wsaccel is available, use compiled routines to mask data.
except ImportError: # wsaccel only provides around a 10% speed boost compared
numpy = None # to the websocket-client _mask() implementation.
# Note that wsaccel is unmaintained.
try:
# If wsaccel is available we use compiled routines to mask data.
if not numpy:
from wsaccel.xormask import XorMaskerSimple from wsaccel.xormask import XorMaskerSimple
def _mask(_m, _d): def _mask(_m, _d):
return XorMaskerSimple(_m).process(_d) return XorMaskerSimple(_m).process(_d)
except ImportError:
# wsaccel is not available, we rely on python implementations.
def _mask(_m, _d):
for i in range(len(_d)):
_d[i] ^= _m[i % 4]
return _d.tobytes() except ImportError:
# wsaccel is not available, use websocket-client _mask()
native_byteorder = sys.byteorder
def _mask(mask_value, data_value):
datalen = len(data_value)
data_value = int.from_bytes(data_value, native_byteorder)
mask_value = int.from_bytes(mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder)
return (data_value ^ mask_value).to_bytes(datalen, native_byteorder)
__all__ = [ __all__ = [
@ -266,18 +267,6 @@ class ABNF(object):
if isinstance(data, str): if isinstance(data, str):
data = data.encode('latin-1') data = data.encode('latin-1')
if numpy:
origlen = len(data)
_mask_key = mask_key[3] << 24 | mask_key[2] << 16 | mask_key[1] << 8 | mask_key[0]
# We need data to be a multiple of four...
data += b' ' * (4 - (len(data) % 4))
a = numpy.frombuffer(data, dtype="uint32")
masked = numpy.bitwise_xor(a, [_mask_key]).astype("uint32")
if len(data) > origlen:
return masked.tobytes()[:origlen]
return masked.tobytes()
else:
return _mask(array.array("B", mask_key), array.array("B", data)) return _mask(array.array("B", mask_key), array.array("B", data))
@ -374,7 +363,7 @@ class frame_buffer(object):
return frame return frame
def recv_strict(self, bufsize): def recv_strict(self, bufsize):
shortage = bufsize - sum(len(x) for x in self.recv_buffer) shortage = bufsize - sum(map(len, self.recv_buffer))
while shortage > 0: while shortage > 0:
# Limit buffer size that we pass to socket.recv() to avoid # Limit buffer size that we pass to socket.recv() to avoid
# fragmenting the heap -- the number of bytes recv() actually # fragmenting the heap -- the number of bytes recv() actually

View file

@ -105,52 +105,56 @@ class WebSocketApp(object):
Parameters Parameters
---------- ----------
url: <type> url: str
websocket url. Websocket url.
header: list or dict header: list or dict
custom header for websocket handshake. Custom header for websocket handshake.
on_open: <type> on_open: function
callable object which is called at opening websocket. Callback object which is called at opening websocket.
this function has one argument. The argument is this class object. on_open has one argument.
on_message: <type> The 1st argument is this class object.
callable object which is called when received data. on_message: function
Callback object which is called when received data.
on_message has 2 arguments. on_message has 2 arguments.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server. The 2nd argument is utf-8 data received from the server.
on_error: <type> on_error: function
callable object which is called when we get error. Callback object which is called when we get error.
on_error has 2 arguments. on_error has 2 arguments.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is exception object. The 2nd argument is exception object.
on_close: <type> on_close: function
callable object which is called when closed the connection. Callback object which is called when connection is closed.
this function has one argument. The argument is this class object. on_close has 3 arguments.
on_cont_message: <type> The 1st argument is this class object.
callback object which is called when receive continued The 2nd argument is close_status_code.
frame data. The 3rd argument is close_msg.
on_cont_message: function
Callback object which is called when a continuation
frame is received.
on_cont_message has 3 arguments. on_cont_message has 3 arguments.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server. The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is continue flag. if 0, the data continue The 3rd argument is continue flag. if 0, the data continue
to next frame data to next frame data
on_data: <type> on_data: function
callback object which is called when a message received. Callback object which is called when a message received.
This is called before on_message or on_cont_message, This is called before on_message or on_cont_message,
and then on_message or on_cont_message is called. and then on_message or on_cont_message is called.
on_data has 4 argument. on_data has 4 argument.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server. The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
The 4th argument is continue flag. if 0, the data continue The 4th argument is continue flag. If 0, the data continue
keep_running: <type> keep_running: bool
this parameter is obsolete and ignored. This parameter is obsolete and ignored.
get_mask_key: func get_mask_key: function
a callable to produce new mask keys, A callable function to get new mask keys, see the
see the WebSocket.set_mask_key's docstring for more information WebSocket.set_mask_key's docstring for more information.
cookie: str cookie: str
cookie value. Cookie value.
subprotocols: <type> subprotocols: list
array of available sub protocols. default is None. List of available sub protocols. Default is None.
""" """
self.url = url self.url = url
self.header = header if header is not None else [] self.header = header if header is not None else []
@ -177,11 +181,11 @@ class WebSocketApp(object):
Parameters Parameters
---------- ----------
data: <type> data: str
Message to send. If you set opcode to OPCODE_TEXT, Message to send. If you set opcode to OPCODE_TEXT,
data must be utf-8 string or unicode. data must be utf-8 string or unicode.
opcode: <type> opcode: int
Operation code of data. default is OPCODE_TEXT. Operation code of data. Default is OPCODE_TEXT.
""" """
if not self.sock or self.sock.send(data, opcode) == 0: if not self.sock or self.sock.send(data, opcode) == 0:
@ -214,8 +218,7 @@ class WebSocketApp(object):
http_no_proxy=None, http_proxy_auth=None, http_no_proxy=None, http_proxy_auth=None,
skip_utf8_validation=False, skip_utf8_validation=False,
host=None, origin=None, dispatcher=None, host=None, origin=None, dispatcher=None,
suppress_origin=False, proxy_type=None, suppress_origin=False, proxy_type=None):
enable_multithread=True):
""" """
Run event loop for WebSocket framework. Run event loop for WebSocket framework.
@ -224,32 +227,32 @@ class WebSocketApp(object):
Parameters Parameters
---------- ----------
sockopt: tuple sockopt: tuple
values for socket.setsockopt. Values for socket.setsockopt.
sockopt must be tuple sockopt must be tuple
and each element is argument of sock.setsockopt. and each element is argument of sock.setsockopt.
sslopt: dict sslopt: dict
optional dict object for ssl socket option. Optional dict object for ssl socket option.
ping_interval: int or float ping_interval: int or float
automatically send "ping" command Automatically send "ping" command
every specified period (in seconds) every specified period (in seconds).
if set to 0, not send automatically. If set to 0, no ping is sent periodically.
ping_timeout: int or float ping_timeout: int or float
timeout (in seconds) if the pong message is not received. Timeout (in seconds) if the pong message is not received.
ping_payload: str ping_payload: str
payload message to send with each ping. Payload message to send with each ping.
http_proxy_host: <type> http_proxy_host: str
http proxy host name. HTTP proxy host name.
http_proxy_port: <type> http_proxy_port: int or str
http proxy port. If not set, set to 80. HTTP proxy port. If not set, set to 80.
http_no_proxy: <type> http_no_proxy: list
host names, which doesn't use proxy. Whitelisted host names that don't use the proxy.
skip_utf8_validation: bool skip_utf8_validation: bool
skip utf8 validation. skip utf8 validation.
host: str host: str
update host header. update host header.
origin: str origin: str
update origin header. update origin header.
dispatcher: <type> dispatcher: Dispatcher object
customize reading data from socket. customize reading data from socket.
suppress_origin: bool suppress_origin: bool
suppress outputting origin header. suppress outputting origin header.
@ -281,8 +284,11 @@ class WebSocketApp(object):
""" """
Tears down the connection. Tears down the connection.
If close_frame is set, we will invoke the on_close handler with the Parameters
statusCode and reason from there. ----------
close_frame: ABNF frame
If close_frame is set, the on_close handler is invoked
with the statusCode and reason from the provided frame.
""" """
if thread and thread.is_alive(): if thread and thread.is_alive():
@ -301,7 +307,7 @@ class WebSocketApp(object):
self.get_mask_key, sockopt=sockopt, sslopt=sslopt, self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=self.on_cont_message is not None, fire_cont_frame=self.on_cont_message is not None,
skip_utf8_validation=skip_utf8_validation, skip_utf8_validation=skip_utf8_validation,
enable_multithread=enable_multithread) enable_multithread=True)
self.sock.settimeout(getdefaulttimeout()) self.sock.settimeout(getdefaulttimeout())
self.sock.connect( self.sock.connect(
self.url, header=self.header, cookie=self.cookie, self.url, header=self.header, cookie=self.cookie,

View file

@ -62,30 +62,31 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
get_mask_key: func get_mask_key: func
a callable to produce new mask keys, see the set_mask_key A callable function to get new mask keys, see the
function's docstring for more details WebSocket.set_mask_key's docstring for more information.
sockopt: tuple sockopt: tuple
values for socket.setsockopt. Values for socket.setsockopt.
sockopt must be tuple and each element is argument of sock.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt.
sslopt: dict sslopt: dict
optional dict object for ssl socket option. Optional dict object for ssl socket options.
fire_cont_frame: bool fire_cont_frame: bool
fire recv event for each cont frame. default is False Fire recv event for each cont frame. Default is False.
enable_multithread: bool enable_multithread: bool
if set to True, lock send method. If set to True, lock send method.
skip_utf8_validation: bool skip_utf8_validation: bool
skip utf8 validation. Skip utf8 validation.
""" """
def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, def __init__(self, get_mask_key=None, sockopt=None, sslopt=None,
fire_cont_frame=False, enable_multithread=False, fire_cont_frame=False, enable_multithread=True,
skip_utf8_validation=False, **_): skip_utf8_validation=False, **_):
""" """
Initialize WebSocket object. Initialize WebSocket object.
Parameters Parameters
---------- ----------
sslopt: specify ssl certification verification options sslopt: dict
Optional dict object for ssl socket options.
""" """
self.sock_opt = sock_opt(sockopt, sslopt) self.sock_opt = sock_opt(sockopt, sslopt)
self.handshake_response = None self.handshake_response = None
@ -194,7 +195,10 @@ class WebSocket(object):
return None return None
def is_ssl(self): def is_ssl(self):
try:
return isinstance(self.sock, ssl.SSLSocket) return isinstance(self.sock, ssl.SSLSocket)
except:
return False
headers = property(getheaders) headers = property(getheaders)
@ -210,40 +214,38 @@ class WebSocket(object):
... header=["User-Agent: MyProgram", ... header=["User-Agent: MyProgram",
... "x-custom: header"]) ... "x-custom: header"])
timeout: <type>
socket timeout time. This value is an integer or float.
if you set None for this value, it means "use default_timeout value"
Parameters Parameters
---------- ----------
options: header: list or dict
- header: list or dict Custom http header list or dict.
custom http header list or dict. cookie: str
- cookie: str Cookie value.
cookie value. origin: str
- origin: str Custom origin url.
custom origin url. connection: str
- connection: str Custom connection header value.
custom connection header value. Default value "Upgrade" set in _handshake.py
default value "Upgrade" set in _handshake.py suppress_origin: bool
- suppress_origin: bool Suppress outputting origin header.
suppress outputting origin header. host: str
- host: str Custom host header string.
custom host header string. timeout: int or float
- http_proxy_host: <type> Socket timeout time. This value is an integer or float.
http proxy host name. If you set None for this value, it means "use default_timeout value"
- http_proxy_port: <type> http_proxy_host: str
http proxy port. If not set, set to 80. HTTP proxy host name.
- http_no_proxy: <type> http_proxy_port: str or int
host names, which doesn't use proxy. HTTP proxy port. Default is 80.
- http_proxy_auth: <type> http_no_proxy: list
http proxy auth information. tuple of username and password. default is None Whitelisted host names that don't use the proxy.
- redirect_limit: <type> http_proxy_auth: tuple
number of redirects to follow. HTTP proxy auth information. Tuple of username and password. Default is None.
- subprotocols: <type> redirect_limit: int
array of available sub protocols. default is None. Number of redirects to follow.
- socket: <type> subprotocols: list
pre-initialized stream socket. List of available subprotocols. Default is None.
socket: socket
Pre-initialized stream socket.
""" """
self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options),
@ -273,10 +275,10 @@ class WebSocket(object):
---------- ----------
payload: str payload: str
Payload must be utf-8 string or unicode, Payload must be utf-8 string or unicode,
if the opcode is OPCODE_TEXT. If the opcode is OPCODE_TEXT.
Otherwise, it must be string(byte array) Otherwise, it must be string(byte array).
opcode: int opcode: int
operation code to send. Please see OPCODE_XXX. Operation code (opcode) to send.
""" """
frame = ABNF.create_frame(payload, opcode) frame = ABNF.create_frame(payload, opcode)
@ -440,10 +442,10 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
status: <type> status: int
status code to send. see STATUS_XXX. Status code to send. See STATUS_XXX.
reason: str or bytes reason: str or bytes
the reason to close. This must be string or bytes. The reason to close. This must be string or bytes.
""" """
if status < 0 or status >= ABNF.LENGTH_16: if status < 0 or status >= ABNF.LENGTH_16:
raise ValueError("code is invalid range") raise ValueError("code is invalid range")
@ -457,11 +459,11 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
status: int status: int
status code to send. see STATUS_XXX. Status code to send. See STATUS_XXX.
reason: bytes reason: bytes
the reason to close. The reason to close.
timeout: int or float timeout: int or float
timeout until receive a close frame. Timeout until receive a close frame.
If None, it will wait forever until receive a close frame. If None, it will wait forever until receive a close frame.
""" """
if self.connected: if self.connected:
@ -490,10 +492,8 @@ class WebSocket(object):
break break
self.sock.settimeout(sock_timeout) self.sock.settimeout(sock_timeout)
self.sock.shutdown(socket.SHUT_RDWR) self.sock.shutdown(socket.SHUT_RDWR)
except OSError: # This happens often on Mac
pass
except: except:
raise pass
self.shutdown() self.shutdown()
@ -544,52 +544,51 @@ def create_connection(url, timeout=None, class_=WebSocket, **options):
Parameters Parameters
---------- ----------
timeout: int or float class_: class
socket timeout time. This value could be either float/integer.
if you set None for this value,
it means "use default_timeout value"
class_: <type>
class to instantiate when creating the connection. It has to implement class to instantiate when creating the connection. It has to implement
settimeout and connect. It's __init__ should be compatible with settimeout and connect. It's __init__ should be compatible with
WebSocket.__init__, i.e. accept all of it's kwargs. WebSocket.__init__, i.e. accept all of it's kwargs.
options: <type> header: list or dict
- header: list or dict
custom http header list or dict. custom http header list or dict.
- cookie: str cookie: str
cookie value. Cookie value.
- origin: str origin: str
custom origin url. custom origin url.
- suppress_origin: bool suppress_origin: bool
suppress outputting origin header. suppress outputting origin header.
- host: <type> host: str
custom host header string. custom host header string.
- http_proxy_host: <type> timeout: int or float
http proxy host name. socket timeout time. This value could be either float/integer.
- http_proxy_port: <type> If set to None, it uses the default_timeout value.
http proxy port. If not set, set to 80. http_proxy_host: str
- http_no_proxy: <type> HTTP proxy host name.
host names, which doesn't use proxy. http_proxy_port: str or int
- http_proxy_auth: <type> HTTP proxy port. If not set, set to 80.
http proxy auth information. tuple of username and password. default is None http_no_proxy: list
- enable_multithread: bool Whitelisted host names that don't use the proxy.
enable lock for multithread. http_proxy_auth: tuple
- redirect_limit: <type> HTTP proxy auth information. tuple of username and password. Default is None.
number of redirects to follow. enable_multithread: bool
- sockopt: <type> Enable lock for multithread.
socket options redirect_limit: int
- sslopt: <type> Number of redirects to follow.
ssl option sockopt: tuple
- subprotocols: <type> Values for socket.setsockopt.
array of available sub protocols. default is None. sockopt must be a tuple and each element is an argument of sock.setsockopt.
- skip_utf8_validation: bool sslopt: dict
skip utf8 validation. Optional dict object for ssl socket options.
- socket: <type> subprotocols: list
pre-initialized stream socket. List of available subprotocols. Default is None.
skip_utf8_validation: bool
Skip utf8 validation.
socket: socket
Pre-initialized stream socket.
""" """
sockopt = options.pop("sockopt", []) sockopt = options.pop("sockopt", [])
sslopt = options.pop("sslopt", {}) sslopt = options.pop("sslopt", {})
fire_cont_frame = options.pop("fire_cont_frame", False) fire_cont_frame = options.pop("fire_cont_frame", False)
enable_multithread = options.pop("enable_multithread", False) enable_multithread = options.pop("enable_multithread", True)
skip_utf8_validation = options.pop("skip_utf8_validation", False) skip_utf8_validation = options.pop("skip_utf8_validation", False)
websock = class_(sockopt=sockopt, sslopt=sslopt, websock = class_(sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=fire_cont_frame, fire_cont_frame=fire_cont_frame,

View file

@ -79,7 +79,7 @@ def _open_proxied_socket(url, options, proxy):
(hostname, port), (hostname, port),
proxy_type=ptype, proxy_type=ptype,
proxy_addr=proxy.host, proxy_addr=proxy.host,
proxy_port=proxy.port, proxy_port=int(proxy.port),
proxy_rdns=rdns, proxy_rdns=rdns,
proxy_username=proxy.auth[0] if proxy.auth else None, proxy_username=proxy.auth[0] if proxy.auth else None,
proxy_password=proxy.auth[1] if proxy.auth else None, proxy_password=proxy.auth[1] if proxy.auth else None,
@ -200,7 +200,7 @@ def _open_socket(addrinfo_list, sockopt, timeout):
def _wrap_sni_socket(sock, sslopt, hostname, check_hostname): def _wrap_sni_socket(sock, sslopt, hostname, check_hostname):
context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_SSLv23)) context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_TLS))
if sslopt.get('cert_reqs', ssl.CERT_NONE) != ssl.CERT_NONE: if sslopt.get('cert_reqs', ssl.CERT_NONE) != ssl.CERT_NONE:
cafile = sslopt.get('ca_certs', None) cafile = sslopt.get('ca_certs', None)
@ -248,6 +248,9 @@ def _ssl_socket(sock, user_sslopt, hostname):
and user_sslopt.get('ca_cert_path', None) is None: and user_sslopt.get('ca_cert_path', None) is None:
sslopt['ca_cert_path'] = certPath sslopt['ca_cert_path'] = certPath
if sslopt.get('server_hostname', None):
hostname = sslopt['server_hostname']
check_hostname = sslopt["cert_reqs"] != ssl.CERT_NONE and sslopt.pop( check_hostname = sslopt["cert_reqs"] != ssl.CERT_NONE and sslopt.pop(
'check_hostname', True) 'check_hostname', True)
sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname) sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname)

View file

@ -125,7 +125,7 @@ def recv(sock, bufsize):
if not bytes_: if not bytes_:
raise WebSocketConnectionClosedException( raise WebSocketConnectionClosedException(
"Connection is already closed.") "Connection to remote host was lost.")
return bytes_ return bytes_

View file

@ -26,7 +26,7 @@ import os
import socket import socket
import struct import struct
from urllib.parse import urlparse from urllib.parse import unquote, urlparse
__all__ = ["parse_url", "get_proxy_info"] __all__ = ["parse_url", "get_proxy_info"]
@ -109,7 +109,7 @@ def _is_address_in_network(ip, net):
def _is_no_proxy_host(hostname, no_proxy): def _is_no_proxy_host(hostname, no_proxy):
if not no_proxy: if not no_proxy:
v = os.environ.get("no_proxy", "").replace(" ", "") v = os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(" ", "")
if v: if v:
no_proxy = v.split(",") no_proxy = v.split(",")
if not no_proxy: if not no_proxy:
@ -139,22 +139,21 @@ def get_proxy_info(
Parameters Parameters
---------- ----------
hostname: <type> hostname: str
websocket server name. Websocket server name.
is_secure: <type> is_secure: bool
is the connection secure? (wss) looks for "https_proxy" in env Is the connection secure? (wss) looks for "https_proxy" in env
before falling back to "http_proxy" before falling back to "http_proxy"
options: <type> proxy_host: str
- http_proxy_host: <type>
http proxy host name. http proxy host name.
- http_proxy_port: <type> http_proxy_port: str or int
http proxy port. http proxy port.
- http_no_proxy: <type> http_no_proxy: list
host names, which doesn't use proxy. Whitelisted host names that don't use the proxy.
- http_proxy_auth: <type> http_proxy_auth: tuple
http proxy auth information. tuple of username and password. default is None HTTP proxy auth information. Tuple of username and password. Default is None.
- proxy_type: <type> proxy_type: str
if set to "socks5" PySocks wrapper will be used in place of a http proxy. default is "http" If set to "socks4" or "socks5", a PySocks wrapper will be used in place of a HTTP proxy. Default is "http".
""" """
if _is_no_proxy_host(hostname, no_proxy): if _is_no_proxy_host(hostname, no_proxy):
return None, 0, None return None, 0, None
@ -169,10 +168,10 @@ def get_proxy_info(
env_keys.insert(0, "https_proxy") env_keys.insert(0, "https_proxy")
for key in env_keys: for key in env_keys:
value = os.environ.get(key, None) value = os.environ.get(key, os.environ.get(key.upper(), "")).replace(" ", "")
if value: if value:
proxy = urlparse(value) proxy = urlparse(value)
auth = (proxy.username, proxy.password) if proxy.username else None auth = (unquote(proxy.username), unquote(proxy.password)) if proxy.username else None
return proxy.hostname, proxy.port, auth return proxy.hostname, proxy.port, auth
return None, 0, None return None, 0, None

View file

@ -1,6 +0,0 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade
Upgrade: WebSocket
Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
some_header: something

View file

@ -1,6 +0,0 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade
Upgrade WebSocket
Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
some_header: something

View file

@ -1,7 +0,0 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade, Keep-Alive
Upgrade: WebSocket
Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
Set-Cookie: Token=ABCDE
some_header: something

View file

@ -1,94 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import websocket as ws
from websocket._abnf import *
import sys
import unittest
sys.path[0:0] = [""]
class ABNFTest(unittest.TestCase):
def testInit(self):
a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
self.assertEqual(a.fin, 0)
self.assertEqual(a.rsv1, 0)
self.assertEqual(a.rsv2, 0)
self.assertEqual(a.rsv3, 0)
self.assertEqual(a.opcode, 9)
self.assertEqual(a.data, '')
a_bad = ABNF(0,1,0,0, opcode=77)
self.assertEqual(a_bad.rsv1, 1)
self.assertEqual(a_bad.opcode, 77)
def testValidate(self):
a_invalid_ping = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_invalid_ping.validate, skip_utf8_validation=False)
a_bad_rsv_value = ABNF(0,1,0,0, opcode=ABNF.OPCODE_TEXT)
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_rsv_value.validate, skip_utf8_validation=False)
a_bad_opcode = ABNF(0,0,0,0, opcode=77)
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_opcode.validate, skip_utf8_validation=False)
a_bad_close_frame = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01')
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame.validate, skip_utf8_validation=False)
a_bad_close_frame_2 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01\x8a\xaa\xff\xdd')
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_2.validate, skip_utf8_validation=False)
a_bad_close_frame_3 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x03\xe7')
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_3.validate, skip_utf8_validation=True)
def testMask(self):
abnf_none_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data=None)
bytes_val = bytes("aaaa", 'utf-8')
self.assertEqual(abnf_none_data._get_masked(bytes_val), bytes_val)
abnf_str_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data="a")
self.assertEqual(abnf_str_data._get_masked(bytes_val), b'aaaa\x00')
def testFormat(self):
abnf_bad_rsv_bits = ABNF(2,0,0,0, opcode=ABNF.OPCODE_TEXT)
self.assertRaises(ValueError, abnf_bad_rsv_bits.format)
abnf_bad_opcode = ABNF(0,0,0,0, opcode=5)
self.assertRaises(ValueError, abnf_bad_opcode.format)
abnf_length_10 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, data="abcdefghij")
self.assertEqual(b'\x01', abnf_length_10.format()[0].to_bytes(1, 'big'))
self.assertEqual(b'\x8a', abnf_length_10.format()[1].to_bytes(1, 'big'))
self.assertEqual("fin=0 opcode=1 data=abcdefghij", abnf_length_10.__str__())
abnf_length_20 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_BINARY, data="abcdefghijabcdefghij")
self.assertEqual(b'\x02', abnf_length_20.format()[0].to_bytes(1, 'big'))
self.assertEqual(b'\x94', abnf_length_20.format()[1].to_bytes(1, 'big'))
abnf_no_mask = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, mask=0, data=b'\x01\x8a\xcc')
self.assertEqual(b'\x01\x03\x01\x8a\xcc', abnf_no_mask.format())
def testFrameBuffer(self):
fb = frame_buffer(0, True)
self.assertEqual(fb.recv, 0)
self.assertEqual(fb.skip_utf8_validation, True)
fb.clear
self.assertEqual(fb.header, None)
self.assertEqual(fb.length, None)
self.assertEqual(fb.mask, None)
self.assertEqual(fb.has_mask(), False)
if __name__ == "__main__":
unittest.main()

View file

@ -1,176 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import os.path
import websocket as ws
import sys
import ssl
import unittest
sys.path[0:0] = [""]
# Skip test to access the internet.
TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
TRACEABLE = True
class WebSocketAppTest(unittest.TestCase):
class NotSetYet(object):
""" A marker class for signalling that a value hasn't been set yet.
"""
def setUp(self):
ws.enableTrace(TRACEABLE)
WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
def tearDown(self):
WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testKeepRunning(self):
""" A WebSocketApp should keep running as long as its self.keep_running
is not False (in the boolean context).
"""
def on_open(self, *args, **kwargs):
""" Set the keep_running flag for later inspection and immediately
close the connection.
"""
WebSocketAppTest.keep_running_open = self.keep_running
self.close()
def on_close(self, *args, **kwargs):
""" Set the keep_running flag for the test to use.
"""
WebSocketAppTest.keep_running_close = self.keep_running
self.send("connection should be closed here")
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
app.run_forever()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSockMaskKey(self):
""" A WebSocketApp should forward the received mask_key function down
to the actual socket.
"""
def my_mask_key_func():
return "\x00\x00\x00\x00"
app = ws.WebSocketApp('wss://stream.meetup.com/2/rsvps', get_mask_key=my_mask_key_func)
# if numpy is installed, this assertion fail
# Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
self.assertEqual(id(app.get_mask_key), id(my_mask_key_func))
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testInvalidPingIntervalPingTimeout(self):
""" Test exception handling if ping_interval < ping_timeout
"""
def on_ping(app, msg):
print("Got a ping!")
app.close()
def on_pong(app, msg):
print("Got a pong! No need to respond")
app.close()
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong)
self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=1, ping_timeout=2, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testPingInterval(self):
""" Test WebSocketApp proper ping functionality
"""
def on_ping(app, msg):
print("Got a ping!")
app.close()
def on_pong(app, msg):
print("Got a pong! No need to respond")
app.close()
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong)
app.run_forever(ping_interval=2, ping_timeout=1, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testOpcodeClose(self):
""" Test WebSocketApp close opcode
"""
app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect')
app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testOpcodeBinary(self):
""" Test WebSocketApp binary opcode
"""
app = ws.WebSocketApp('streaming.vn.teslamotors.com/streaming/')
app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testBadPingInterval(self):
""" A WebSocketApp handling of negative ping_interval
"""
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1')
self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=-5, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testBadPingTimeout(self):
""" A WebSocketApp handling of negative ping_timeout
"""
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1')
self.assertRaises(ws.WebSocketException, app.run_forever, ping_timeout=-3, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testCloseStatusCode(self):
""" Test extraction of close frame status code and close reason in WebSocketApp
"""
def on_close(wsapp, close_status_code, close_msg):
print("on_close reached")
app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect', on_close=on_close)
closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'\x03\xe8no-init-from-client')
self.assertEqual([1000, 'no-init-from-client'], app._get_close_args(closeframe))
closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'')
self.assertEqual([None, None], app._get_close_args(closeframe))
app2 = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect')
closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'')
self.assertEqual([None, None], app2._get_close_args(closeframe))
self.assertRaises(ws.WebSocketConnectionClosedException, app.send, data="test if connection is closed")
if __name__ == "__main__":
unittest.main()

View file

@ -1,118 +0,0 @@
"""
"""
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import unittest
from websocket._cookiejar import SimpleCookieJar
class CookieJarTest(unittest.TestCase):
def testAdd(self):
cookie_jar = SimpleCookieJar()
cookie_jar.add("")
self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b")
self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; domain=.abc")
self.assertTrue(".abc" in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; domain=abc")
self.assertTrue(".abc" in cookie_jar.jar)
self.assertTrue("abc" not in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
self.assertEqual(cookie_jar.get(None), "")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
cookie_jar.add("e=f; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
cookie_jar.add("e=f; domain=.abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
cookie_jar.add("e=f; domain=xyz")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
self.assertEqual(cookie_jar.get("xyz"), "e=f")
self.assertEqual(cookie_jar.get("something"), "")
def testSet(self):
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b")
self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; domain=.abc")
self.assertTrue(".abc" in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; domain=abc")
self.assertTrue(".abc" in cookie_jar.jar)
self.assertTrue("abc" not in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
cookie_jar.set("e=f; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
cookie_jar.set("e=f; domain=.abc")
self.assertEqual(cookie_jar.get("abc"), "e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
cookie_jar.set("e=f; domain=xyz")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
self.assertEqual(cookie_jar.get("xyz"), "e=f")
self.assertEqual(cookie_jar.get("something"), "")
def testGet(self):
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc.com")
self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("abc.com.es"), "")
self.assertEqual(cookie_jar.get("xabc.com"), "")
cookie_jar.set("a=b; c=d; domain=.abc.com")
self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("abc.com.es"), "")
self.assertEqual(cookie_jar.get("xabc.com"), "")

View file

@ -1,150 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import os.path
import websocket as ws
from websocket._http import proxy_info, read_headers, _open_proxied_socket, _tunnel, _get_addrinfo_list, connect
import sys
import unittest
import ssl
import websocket
import socks
import socket
sys.path[0:0] = [""]
# Skip test to access the internet.
TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
class SockMock(object):
def __init__(self):
self.data = []
self.sent = []
def add_packet(self, data):
self.data.append(data)
def gettimeout(self):
return None
def recv(self, bufsize):
if self.data:
e = self.data.pop(0)
if isinstance(e, Exception):
raise e
if len(e) > bufsize:
self.data.insert(0, e[bufsize:])
return e[:bufsize]
def send(self, data):
self.sent.append(data)
return len(data)
def close(self):
pass
class HeaderSockMock(SockMock):
def __init__(self, fname):
SockMock.__init__(self)
path = os.path.join(os.path.dirname(__file__), fname)
with open(path, "rb") as f:
self.add_packet(f.read())
class OptsList():
def __init__(self):
self.timeout = 1
self.sockopt = []
class HttpTest(unittest.TestCase):
def testReadHeader(self):
status, header, status_message = read_headers(HeaderSockMock("data/header01.txt"))
self.assertEqual(status, 101)
self.assertEqual(header["connection"], "Upgrade")
# header02.txt is intentionally malformed
self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
def testTunnel(self):
self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header01.txt"), "example.com", 80, ("username", "password"))
self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header02.txt"), "example.com", 80, ("username", "password"))
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testConnect(self):
# Not currently testing an actual proxy connection, so just check whether TypeError is raised. This requires internet for a DNS lookup
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host=None, http_proxy_port=None, proxy_type=None))
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http"))
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks4"))
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks5h"))
self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"))
self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"))
self.assertRaises(socks.ProxyConnectionError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=8080, proxy_type="socks4"), None)
self.assertRaises(socket.timeout, connect, "wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), None)
self.assertEqual(
connect("wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), True),
(True, ("google.com", 443, "/")))
# The following test fails on Mac OS with a gaierror, not an OverflowError
# self.assertRaises(OverflowError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=99999, proxy_type="socks4", timeout=2), False)
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSSLopt(self):
ssloptions = {
"cert_reqs": ssl.CERT_NONE,
"check_hostname": False,
"ssl_version": ssl.PROTOCOL_SSLv23,
"ciphers": "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:\
TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:\
ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:\
ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:\
DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:\
ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES128-GCM-SHA256:\
ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:\
DHE-RSA-AES256-SHA256:ECDHE-ECDSA-AES128-SHA256:\
ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256:\
ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA",
"ecdh_curve": "prime256v1"
}
ws_ssl1 = websocket.WebSocket(sslopt=ssloptions)
ws_ssl1.connect("wss://api.bitfinex.com/ws/2")
ws_ssl1.send("Hello")
ws_ssl1.close()
ws_ssl2 = websocket.WebSocket(sslopt={"check_hostname": True})
ws_ssl2.connect("wss://api.bitfinex.com/ws/2")
ws_ssl2.close
def testProxyInfo(self):
self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").type, "http")
self.assertRaises(ValueError, proxy_info, http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="badval")
self.assertEqual(proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http").host, "example.com")
self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").port, "8080")
self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").auth, None)
if __name__ == "__main__":
unittest.main()

View file

@ -1,301 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import sys
import os
import unittest
sys.path[0:0] = [""]
from websocket._url import get_proxy_info, parse_url, _is_address_in_network, _is_no_proxy_host
class UrlTest(unittest.TestCase):
def test_address_in_network(self):
self.assertTrue(_is_address_in_network('127.0.0.1', '127.0.0.0/8'))
self.assertTrue(_is_address_in_network('127.1.0.1', '127.0.0.0/8'))
self.assertFalse(_is_address_in_network('127.1.0.1', '127.0.0.0/24'))
def testParseUrl(self):
p = parse_url("ws://www.example.com/r")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com/r/")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/r/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com/")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com:8080/r")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com:8080/")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com:8080")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("wss://www.example.com:8080/r")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], True)
p = parse_url("wss://www.example.com:8080/r?key=value")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r?key=value")
self.assertEqual(p[3], True)
self.assertRaises(ValueError, parse_url, "http://www.example.com/r")
p = parse_url("ws://[2a03:4000:123:83::3]/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("ws://[2a03:4000:123:83::3]:8080/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("wss://[2a03:4000:123:83::3]/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 443)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], True)
p = parse_url("wss://[2a03:4000:123:83::3]:8080/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], True)
class IsNoProxyHostTest(unittest.TestCase):
def setUp(self):
self.no_proxy = os.environ.get("no_proxy", None)
if "no_proxy" in os.environ:
del os.environ["no_proxy"]
def tearDown(self):
if self.no_proxy:
os.environ["no_proxy"] = self.no_proxy
elif "no_proxy" in os.environ:
del os.environ["no_proxy"]
def testMatchAll(self):
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['*']))
self.assertTrue(_is_no_proxy_host("192.168.0.1", ['*']))
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['other.websocket.org', '*']))
os.environ['no_proxy'] = '*'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
self.assertTrue(_is_no_proxy_host("192.168.0.1", None))
os.environ['no_proxy'] = 'other.websocket.org, *'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
def testIpAddress(self):
self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.1']))
self.assertFalse(_is_no_proxy_host("127.0.0.2", ['127.0.0.1']))
self.assertTrue(_is_no_proxy_host("127.0.0.1", ['other.websocket.org', '127.0.0.1']))
self.assertFalse(_is_no_proxy_host("127.0.0.2", ['other.websocket.org', '127.0.0.1']))
os.environ['no_proxy'] = '127.0.0.1'
self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
os.environ['no_proxy'] = 'other.websocket.org, 127.0.0.1'
self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
def testIpAddressInRange(self):
self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.0/8']))
self.assertTrue(_is_no_proxy_host("127.0.0.2", ['127.0.0.0/8']))
self.assertFalse(_is_no_proxy_host("127.1.0.1", ['127.0.0.0/24']))
os.environ['no_proxy'] = '127.0.0.0/8'
self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
self.assertTrue(_is_no_proxy_host("127.0.0.2", None))
os.environ['no_proxy'] = '127.0.0.0/24'
self.assertFalse(_is_no_proxy_host("127.1.0.1", None))
def testHostnameMatch(self):
self.assertTrue(_is_no_proxy_host("my.websocket.org", ['my.websocket.org']))
self.assertTrue(_is_no_proxy_host("my.websocket.org", ['other.websocket.org', 'my.websocket.org']))
self.assertFalse(_is_no_proxy_host("my.websocket.org", ['other.websocket.org']))
os.environ['no_proxy'] = 'my.websocket.org'
self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
self.assertFalse(_is_no_proxy_host("other.websocket.org", None))
os.environ['no_proxy'] = 'other.websocket.org, my.websocket.org'
self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
def testHostnameMatchDomain(self):
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['.websocket.org']))
self.assertTrue(_is_no_proxy_host("my.other.websocket.org", ['.websocket.org']))
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['my.websocket.org', '.websocket.org']))
self.assertFalse(_is_no_proxy_host("any.websocket.com", ['.websocket.org']))
os.environ['no_proxy'] = '.websocket.org'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
self.assertTrue(_is_no_proxy_host("my.other.websocket.org", None))
self.assertFalse(_is_no_proxy_host("any.websocket.com", None))
os.environ['no_proxy'] = 'my.websocket.org, .websocket.org'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
class ProxyInfoTest(unittest.TestCase):
def setUp(self):
self.http_proxy = os.environ.get("http_proxy", None)
self.https_proxy = os.environ.get("https_proxy", None)
self.no_proxy = os.environ.get("no_proxy", None)
if "http_proxy" in os.environ:
del os.environ["http_proxy"]
if "https_proxy" in os.environ:
del os.environ["https_proxy"]
if "no_proxy" in os.environ:
del os.environ["no_proxy"]
def tearDown(self):
if self.http_proxy:
os.environ["http_proxy"] = self.http_proxy
elif "http_proxy" in os.environ:
del os.environ["http_proxy"]
if self.https_proxy:
os.environ["https_proxy"] = self.https_proxy
elif "https_proxy" in os.environ:
del os.environ["https_proxy"]
if self.no_proxy:
os.environ["no_proxy"] = self.no_proxy
elif "no_proxy" in os.environ:
del os.environ["no_proxy"]
def testProxyFromArgs(self):
self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None))
self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128),
("localhost", 3128, None))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128),
("localhost", 3128, None))
self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")),
("localhost", 0, ("a", "b")))
self.assertEqual(
get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
("localhost", 3128, ("a", "b")))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")),
("localhost", 0, ("a", "b")))
self.assertEqual(
get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
("localhost", 3128, ("a", "b")))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128,
no_proxy=["example.com"], proxy_auth=("a", "b")),
("localhost", 3128, ("a", "b")))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128,
no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")),
(None, 0, None))
def testProxyFromEnv(self):
os.environ["http_proxy"] = "http://localhost/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
os.environ["http_proxy"] = "http://localhost:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
os.environ["http_proxy"] = "http://localhost/"
os.environ["https_proxy"] = "http://localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
os.environ["http_proxy"] = "http://localhost:3128/"
os.environ["https_proxy"] = "http://localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
os.environ["http_proxy"] = "http://localhost/"
os.environ["https_proxy"] = "http://localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None))
os.environ["http_proxy"] = "http://localhost:3128/"
os.environ["https_proxy"] = "http://localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None))
os.environ["http_proxy"] = "http://a:b@localhost/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost/"
os.environ["https_proxy"] = "http://a:b@localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost/"
os.environ["https_proxy"] = "http://a:b@localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost/"
os.environ["https_proxy"] = "http://a:b@localhost2/"
os.environ["no_proxy"] = "example1.com,example2.com"
self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org"
self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
os.environ["no_proxy"] = "example1.com,example2.com, .websocket.org"
self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16"
self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None))
self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None))
if __name__ == "__main__":
unittest.main()

View file

@ -1,452 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
"""
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import sys
sys.path[0:0] = [""]
import os
import os.path
import socket
import websocket as ws
from websocket._handshake import _create_sec_websocket_key, \
_validate as _validate_header
from websocket._http import read_headers
from websocket._utils import validate_utf8
from base64 import decodebytes as base64decode
import unittest
try:
import ssl
from ssl import SSLError
except ImportError:
# dummy class of SSLError for ssl none-support environment.
class SSLError(Exception):
pass
# Skip test to access the internet.
TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
TRACEABLE = True
def create_mask_key(_):
return "abcd"
class SockMock(object):
def __init__(self):
self.data = []
self.sent = []
def add_packet(self, data):
self.data.append(data)
def gettimeout(self):
return None
def recv(self, bufsize):
if self.data:
e = self.data.pop(0)
if isinstance(e, Exception):
raise e
if len(e) > bufsize:
self.data.insert(0, e[bufsize:])
return e[:bufsize]
def send(self, data):
self.sent.append(data)
return len(data)
def close(self):
pass
class HeaderSockMock(SockMock):
def __init__(self, fname):
SockMock.__init__(self)
path = os.path.join(os.path.dirname(__file__), fname)
with open(path, "rb") as f:
self.add_packet(f.read())
class WebSocketTest(unittest.TestCase):
def setUp(self):
ws.enableTrace(TRACEABLE)
def tearDown(self):
pass
def testDefaultTimeout(self):
self.assertEqual(ws.getdefaulttimeout(), None)
ws.setdefaulttimeout(10)
self.assertEqual(ws.getdefaulttimeout(), 10)
ws.setdefaulttimeout(None)
def testWSKey(self):
key = _create_sec_websocket_key()
self.assertTrue(key != 24)
self.assertTrue(str("¥n") not in key)
def testNonce(self):
""" WebSocket key should be a random 16-byte nonce.
"""
key = _create_sec_websocket_key()
nonce = base64decode(key.encode("utf-8"))
self.assertEqual(16, len(nonce))
def testWsUtils(self):
key = "c6b8hTg4EeGb2gQMztV1/g=="
required_header = {
"upgrade": "websocket",
"connection": "upgrade",
"sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0="}
self.assertEqual(_validate_header(required_header, key, None), (True, None))
header = required_header.copy()
header["upgrade"] = "http"
self.assertEqual(_validate_header(header, key, None), (False, None))
del header["upgrade"]
self.assertEqual(_validate_header(header, key, None), (False, None))
header = required_header.copy()
header["connection"] = "something"
self.assertEqual(_validate_header(header, key, None), (False, None))
del header["connection"]
self.assertEqual(_validate_header(header, key, None), (False, None))
header = required_header.copy()
header["sec-websocket-accept"] = "something"
self.assertEqual(_validate_header(header, key, None), (False, None))
del header["sec-websocket-accept"]
self.assertEqual(_validate_header(header, key, None), (False, None))
header = required_header.copy()
header["sec-websocket-protocol"] = "sub1"
self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1"))
# This case will print out a logging error using the error() function, but that is expected
self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None))
header = required_header.copy()
header["sec-websocket-protocol"] = "sUb1"
self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1"))
header = required_header.copy()
# This case will print out a logging error using the error() function, but that is expected
self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None))
def testReadHeader(self):
status, header, status_message = read_headers(HeaderSockMock("data/header01.txt"))
self.assertEqual(status, 101)
self.assertEqual(header["connection"], "Upgrade")
status, header, status_message = read_headers(HeaderSockMock("data/header03.txt"))
self.assertEqual(status, 101)
self.assertEqual(header["connection"], "Upgrade, Keep-Alive")
HeaderSockMock("data/header02.txt")
self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
def testSend(self):
# TODO: add longer frame data
sock = ws.WebSocket()
sock.set_mask_key(create_mask_key)
s = sock.sock = HeaderSockMock("data/header01.txt")
sock.send("Hello")
self.assertEqual(s.sent[0], b'\x81\x85abcd)\x07\x0f\x08\x0e')
sock.send("こんにちは")
self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc')
# sock.send("x" * 5000)
# self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
self.assertEqual(sock.send_binary(b'1111111111101'), 19)
def testRecv(self):
# TODO: add longer frame data
sock = ws.WebSocket()
s = sock.sock = SockMock()
something = b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc'
s.add_packet(something)
data = sock.recv()
self.assertEqual(data, "こんにちは")
s.add_packet(b'\x81\x85abcd)\x07\x0f\x08\x0e')
data = sock.recv()
self.assertEqual(data, "Hello")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testIter(self):
count = 2
for _ in ws.create_connection('wss://stream.meetup.com/2/rsvps'):
count -= 1
if count == 0:
break
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testNext(self):
sock = ws.create_connection('wss://stream.meetup.com/2/rsvps')
self.assertEqual(str, type(next(sock)))
def testInternalRecvStrict(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
s.add_packet(b'foo')
s.add_packet(socket.timeout())
s.add_packet(b'bar')
# s.add_packet(SSLError("The read operation timed out"))
s.add_packet(b'baz')
with self.assertRaises(ws.WebSocketTimeoutException):
sock.frame_buffer.recv_strict(9)
# with self.assertRaises(SSLError):
# data = sock._recv_strict(9)
data = sock.frame_buffer.recv_strict(9)
self.assertEqual(data, b'foobarbaz')
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.frame_buffer.recv_strict(1)
def testRecvTimeout(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
s.add_packet(b'\x81')
s.add_packet(socket.timeout())
s.add_packet(b'\x8dabcd\x29\x07\x0f\x08\x0e')
s.add_packet(socket.timeout())
s.add_packet(b'\x4e\x43\x33\x0e\x10\x0f\x00\x40')
with self.assertRaises(ws.WebSocketTimeoutException):
sock.recv()
with self.assertRaises(ws.WebSocketTimeoutException):
sock.recv()
data = sock.recv()
self.assertEqual(data, "Hello, World!")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testRecvWithSimpleFragmentation(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17')
data = sock.recv()
self.assertEqual(data, "Brevity is the soul of wit")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testRecvWithFireEventOfFragmentation(self):
sock = ws.WebSocket(fire_cont_frame=True)
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
# OPCODE=CONT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17')
_, data = sock.recv_data()
self.assertEqual(data, b'Brevity is ')
_, data = sock.recv_data()
self.assertEqual(data, b'Brevity is ')
_, data = sock.recv_data()
self.assertEqual(data, b'the soul of wit')
# OPCODE=CONT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
with self.assertRaises(ws.WebSocketException):
sock.recv_data()
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testClose(self):
sock = ws.WebSocket()
sock.connected = True
self.assertRaises(ws._exceptions.WebSocketConnectionClosedException, sock.close)
sock = ws.WebSocket()
s = sock.sock = SockMock()
sock.connected = True
s.add_packet(b'\x88\x80\x17\x98p\x84')
sock.recv()
self.assertEqual(sock.connected, False)
def testRecvContFragmentation(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17')
self.assertRaises(ws.WebSocketException, sock.recv)
def testRecvWithProlongedFragmentation(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
s.add_packet(b'\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC')
# OPCODE=CONT, FIN=0, MSG="dear friends, "
s.add_packet(b'\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07\x17MB')
# OPCODE=CONT, FIN=1, MSG="once more"
s.add_packet(b'\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04')
data = sock.recv()
self.assertEqual(
data,
"Once more unto the breach, dear friends, once more")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testRecvWithFragmentationAndControlFrame(self):
sock = ws.WebSocket()
sock.set_mask_key(create_mask_key)
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Too much "
s.add_packet(b'\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA')
# OPCODE=PING, FIN=1, MSG="Please PONG this"
s.add_packet(b'\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17')
# OPCODE=CONT, FIN=1, MSG="of a good thing"
s.add_packet(b'\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c\x08\x0c\x04')
data = sock.recv()
self.assertEqual(data, "Too much of a good thing")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
self.assertEqual(
s.sent[0],
b'\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17')
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testWebSocket(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.send("こにゃにゃちは、世界")
result = s.recv()
self.assertEqual(result, "こにゃにゃちは、世界")
self.assertRaises(ValueError, s.send_close, -1, "")
s.close()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testPingPong(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.ping("Hello")
s.pong("Hi")
s.close()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSecureWebSocket(self):
import ssl
s = ws.create_connection("wss://api.bitfinex.com/ws/2")
self.assertNotEqual(s, None)
self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
self.assertEqual(s.getstatus(), 101)
self.assertNotEqual(s.getheaders(), None)
s.settimeout(10)
self.assertEqual(s.gettimeout(), 10)
self.assertEqual(s.getsubprotocol(), None)
s.abort()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testWebSocketWithCustomHeader(self):
s = ws.create_connection("ws://echo.websocket.org/",
headers={"User-Agent": "PythonWebsocketClient"})
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
self.assertRaises(ValueError, s.close, -1, "")
s.close()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testAfterClose(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.close()
self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
class SockOptTest(unittest.TestCase):
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSockOpt(self):
sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt)
self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
s.close()
class UtilsTest(unittest.TestCase):
def testUtf8Validator(self):
state = validate_utf8(b'\xf0\x90\x80\x80')
self.assertEqual(state, True)
state = validate_utf8(b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited')
self.assertEqual(state, False)
state = validate_utf8(b'')
self.assertEqual(state, True)
class HandshakeTest(unittest.TestCase):
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def test_http_SSL(self):
websock1 = ws.WebSocket(sslopt={"cert_chain": ssl.get_default_verify_paths().capath})
self.assertRaises(ValueError,
websock1.connect, "wss://api.bitfinex.com/ws/2")
websock2 = ws.WebSocket(sslopt={"certfile": "myNonexistentCertFile"})
self.assertRaises(FileNotFoundError,
websock2.connect, "wss://api.bitfinex.com/ws/2")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testManualHeaders(self):
websock3 = ws.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE,
"ca_certs": ssl.get_default_verify_paths().capath,
"ca_cert_path": ssl.get_default_verify_paths().openssl_cafile})
self.assertRaises(ws._exceptions.WebSocketBadStatusException,
websock3.connect, "wss://api.bitfinex.com/ws/2", cookie="chocolate",
origin="testing_websockets.com",
host="echo.websocket.org/websocket-client-test",
subprotocols=["testproto"],
connection="Upgrade",
header={"CustomHeader1":"123",
"Cookie":"TestValue",
"Sec-WebSocket-Key":"k9kFAUWNAMmf5OEMfTlOEA==",
"Sec-WebSocket-Protocol":"newprotocol"})
def testIPv6(self):
websock2 = ws.WebSocket()
self.assertRaises(ValueError, websock2.connect, "2001:4860:4860::8888")
def testBadURLs(self):
websock3 = ws.WebSocket()
self.assertRaises(ValueError, websock3.connect, "ws//example.com")
self.assertRaises(ws.WebSocketAddressException, websock3.connect, "ws://example")
self.assertRaises(ValueError, websock3.connect, "example.com")
if __name__ == "__main__":
unittest.main()