This commit is contained in:
Croneter 2018-06-15 14:40:29 +02:00
parent 1a58967111
commit 51444111d2
1 changed files with 84 additions and 98 deletions

View File

@ -27,8 +27,10 @@ try:
from ssl import SSLError
HAVE_SSL = True
except ImportError:
# dummy class of SSLError for ssl none-support environment.
class SSLError(Exception):
"""
Dummy class of SSLError for ssl none-support environment.
"""
pass
HAVE_SSL = False
@ -50,7 +52,7 @@ import utils
###############################################################################
log = logging.getLogger("PLEX."+__name__)
LOG = logging.getLogger("PLEX." + __name__)
###############################################################################
@ -95,28 +97,31 @@ class WebSocketConnectionClosedException(WebSocketException):
"""
pass
class WebSocketTimeoutException(WebSocketException):
"""
WebSocketTimeoutException will be raised at socket timeout during read/write data.
WebSocketTimeoutException will be raised at socket timeout during read and
write data.
"""
pass
default_timeout = None
traceEnabled = False
DEFAULT_TIMEOUT = None
TRACE_ENABLED = False
def enableTrace(tracable):
def enable_trace(tracable):
"""
turn on/off the tracability.
tracable: boolean value. if set True, tracability is enabled.
"""
global traceEnabled
traceEnabled = tracable
global TRACE_ENABLED
TRACE_ENABLED = tracable
if tracable:
if not log.handlers:
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)
if not LOG.handlers:
LOG.addHandler(logging.StreamHandler())
LOG.setLevel(logging.DEBUG)
def setdefaulttimeout(timeout):
@ -125,15 +130,15 @@ def setdefaulttimeout(timeout):
timeout: default socket timeout time. This value is second.
"""
global default_timeout
default_timeout = timeout
global DEFAULT_TIMEOUT
DEFAULT_TIMEOUT = timeout
def getdefaulttimeout():
"""
Return the global timeout setting(second) to connect.
"""
return default_timeout
return DEFAULT_TIMEOUT
def _parse_url(url):
@ -185,7 +190,8 @@ def create_connection(url, timeout=None, **options):
Connect to url and return the WebSocket object.
Passing optional timeout parameter will set the timeout on the socket.
If no timeout is supplied, the global default timeout setting returned by getdefauttimeout() is used.
If no timeout is supplied, the global default timeout setting returned by
getdefauttimeout() is used.
You can customize using 'options'.
If you set "header" list object, you can set your own custom header.
@ -195,21 +201,23 @@ def create_connection(url, timeout=None, **options):
timeout: socket timeout time. This value is integer.
if you set None for this value, it means "use default_timeout value"
if you set None for this value, it means "use DEFAULT_TIMEOUT
value"
options: current support option is only "header".
if you set header as dict value, the custom HTTP headers are added.
if you set header as dict value, the custom HTTP headers are added
"""
sockopt = options.get("sockopt", [])
sslopt = options.get("sslopt", {})
websock = WebSocket(sockopt=sockopt, sslopt=sslopt)
websock.settimeout(timeout if timeout is not None else default_timeout)
websock.settimeout(timeout if timeout is not None else DEFAULT_TIMEOUT)
websock.connect(url, **options)
return websock
_MAX_INTEGER = (1 << 32) -1
_MAX_INTEGER = (1 << 32) - 1
_AVAILABLE_KEY_CHARS = range(0x21, 0x2f + 1) + range(0x3a, 0x7e + 1)
_MAX_CHAR_BYTE = (1<<8) -1
_MAX_CHAR_BYTE = (1 << 8) - 1
# ref. Websocket gets an update, and it breaks stuff.
# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html
@ -220,10 +228,7 @@ def _create_sec_websocket_key():
return base64.encodestring(uid.bytes).strip()
_HEADERS_TO_CHECK = {
"upgrade": "websocket",
"connection": "upgrade",
}
_HEADERS_TO_CHECK = {"upgrade": "websocket", "connection": "upgrade"}
class ABNF(object):
@ -234,16 +239,16 @@ class ABNF(object):
"""
# operation code values.
OPCODE_CONT = 0x0
OPCODE_TEXT = 0x1
OPCODE_CONT = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xa
OPCODE_CLOSE = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xa
# available operation code value tuple
OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
OPCODE_PING, OPCODE_PONG)
OPCODE_PING, OPCODE_PONG)
# opcode human readable string
OPCODE_MAP = {
@ -253,10 +258,10 @@ class ABNF(object):
OPCODE_CLOSE: "close",
OPCODE_PING: "ping",
OPCODE_PONG: "pong"
}
}
# data length threashold.
LENGTH_7 = 0x7d
LENGTH_7 = 0x7d
LENGTH_16 = 1 << 16
LENGTH_63 = 1 << 63
@ -277,8 +282,8 @@ class ABNF(object):
def __str__(self):
return "fin=" + str(self.fin) \
+ " opcode=" + str(self.opcode) \
+ " data=" + str(self.data)
+ " opcode=" + str(self.opcode) \
+ " data=" + str(self.data)
@staticmethod
def create_frame(data, opcode):
@ -308,9 +313,9 @@ class ABNF(object):
if length >= ABNF.LENGTH_63:
raise ValueError("data is too long")
frame_header = chr(self.fin << 7
| self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4
| self.opcode)
frame_header = chr(self.fin << 7 |
self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 |
self.opcode)
if length < ABNF.LENGTH_7:
frame_header += chr(self.mask << 7 | length)
elif length < ABNF.LENGTH_16:
@ -395,6 +400,9 @@ class WebSocket(object):
self._cont_data = None
def fileno(self):
"""
Returns sock.fileno()
"""
return self.sock.fileno()
def set_mask_key(self, func):
@ -438,7 +446,7 @@ class WebSocket(object):
timeout: socket timeout time. This value is integer.
if you set None for this value,
it means "use default_timeout value"
it means "use DEFAULT_TIMEOUT value"
options: current support option is only "header".
if you set header as dict value,
@ -487,10 +495,10 @@ class WebSocket(object):
header_str = "\r\n".join(headers)
self._send(header_str)
if traceEnabled:
log.debug("--- request header ---")
log.debug(header_str)
log.debug("-----------------------")
if TRACE_ENABLED:
LOG.debug("--- request header ---")
LOG.debug(header_str)
LOG.debug("-----------------------")
status, resp_headers = self._read_headers()
if status != 101:
@ -526,16 +534,16 @@ class WebSocket(object):
def _read_headers(self):
status = None
headers = {}
if traceEnabled:
log.debug("--- response header ---")
if TRACE_ENABLED:
LOG.debug("--- response header ---")
while True:
line = self._recv_line()
if line == "\r\n":
break
line = line.strip()
if traceEnabled:
log.debug(line)
if TRACE_ENABLED:
LOG.debug(line)
if not status:
status_info = line.split(" ", 2)
status = int(status_info[1])
@ -547,8 +555,8 @@ class WebSocket(object):
else:
raise WebSocketException("Invalid header")
if traceEnabled:
log.debug("-----------------------")
if TRACE_ENABLED:
LOG.debug("-----------------------")
return status, headers
@ -567,14 +575,17 @@ class WebSocket(object):
frame.get_mask_key = self.get_mask_key
data = frame.format()
length = len(data)
if traceEnabled:
log.debug("send: " + repr(data))
if TRACE_ENABLED:
LOG.debug("send: %s", repr(data))
while data:
l = self._send(data)
data = data[l:]
return length
def send_binary(self, payload):
"""
send the payload
"""
return self.send(payload, ABNF.OPCODE_BINARY)
def ping(self, payload=""):
@ -693,34 +704,10 @@ class WebSocket(object):
reason: the reason to close. This must be string.
"""
try:
self.sock.shutdown(socket.SHUT_RDWR)
except:
pass
'''
if self.connected:
if status < 0 or status >= ABNF.LENGTH_16:
raise ValueError("code is invalid range")
try:
self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
timeout = self.sock.gettimeout()
self.sock.settimeout(3)
try:
frame = self.recv_frame()
if log.isEnabledFor(logging.ERROR):
recv_status = struct.unpack("!H", frame.data)[0]
if recv_status != STATUS_NORMAL:
log.error("close status: " + repr(recv_status))
except:
pass
self.sock.settimeout(timeout)
self.sock.shutdown(socket.SHUT_RDWR)
except:
pass
'''
self._closeInternal()
def _closeInternal(self):
@ -752,7 +739,6 @@ class WebSocket(object):
raise WebSocketConnectionClosedException()
return bytes_
def _recv_strict(self, bufsize):
shortage = bufsize - sum(len(x) for x in self._recv_buffer)
while shortage > 0:
@ -767,7 +753,6 @@ class WebSocket(object):
self._recv_buffer = [unified[bufsize:]]
return unified[:bufsize]
def _recv_line(self):
line = []
while True:
@ -846,9 +831,11 @@ class WebSocketApp(object):
run event loop for WebSocket framework.
This loop is infinite loop and is alive during websocket is available.
sockopt: values for socket.setsockopt.
sockopt must be tuple and each element is argument of sock.setscokopt.
sockopt must be tuple and each element is argument of
sock.setscokopt.
sslopt: ssl socket optional dict.
ping_interval: automatically send "ping" command every specified period(second)
ping_interval: automatically send "ping" command every specified
period(second)
if set to 0, not send automatically.
"""
if sockopt is None:
@ -861,26 +848,26 @@ class WebSocketApp(object):
self.keep_running = True
try:
self.sock = WebSocket(self.get_mask_key, sockopt=sockopt, sslopt=sslopt)
self.sock.settimeout(default_timeout)
self.sock = WebSocket(self.get_mask_key,
sockopt=sockopt,
sslopt=sslopt)
self.sock.settimeout(DEFAULT_TIMEOUT)
self.sock.connect(self.url, header=self.header)
self._callback(self.on_open)
if ping_interval:
thread = threading.Thread(target=self._send_ping, args=(ping_interval,))
thread = threading.Thread(target=self._send_ping,
args=(ping_interval,))
thread.setDaemon(True)
thread.start()
while self.keep_running:
try:
data = self.sock.recv()
if data is None or self.keep_running == False:
if data is None or self.keep_running is False:
break
self._callback(self.on_message, data)
except Exception, e:
#print str(e.args[0])
if "timed out" not in e.args[0]:
raise e
@ -898,19 +885,18 @@ class WebSocketApp(object):
try:
callback(self, *args)
except Exception, e:
log.error(e)
if True:#log.isEnabledFor(logging.DEBUG):
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
LOG.error(e)
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
if __name__ == "__main__":
enableTrace(True)
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()
enable_trace(True)
WEBSOCKET = create_connection("ws://echo.websocket.org/")
LOG.info("Sending 'Hello, World'...")
WEBSOCKET.send("Hello, World")
LOG.info("Sent")
LOG.info("Receiving...")
RESULT = WEBSOCKET.recv()
LOG.info("Received '%s'", RESULT)
WEBSOCKET.close()