Merge pull request #1574 from croneter/python3-beta

Bump Python 3 master
This commit is contained in:
croneter 2021-08-01 16:00:57 +02:00 committed by GitHub
commit ab954350b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 1340 additions and 1647 deletions

View file

@ -1,5 +1,4 @@
exclude_paths: exclude_paths:
- 'resources/lib/watchdog/**' - 'resources/lib/watchdog/**'
- 'resources/lib/pathtools/**' - 'resources/lib/pathtools/**'
- 'resources/lib/pathtools/**' - 'resources/lib/defusedxml/**'
- 'resources/lib/defused_etree.py'

View file

@ -1,9 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="plugin.video.plexkodiconnect" name="PlexKodiConnect" version="3.3.2" provider-name="croneter"> <addon id="plugin.video.plexkodiconnect" name="PlexKodiConnect" version="3.4.0" provider-name="croneter">
<requires> <requires>
<import addon="xbmc.python" version="3.0.0"/> <import addon="xbmc.python" version="3.0.0"/>
<import addon="script.module.requests" version="2.22.0+matrix.1" /> <import addon="script.module.requests" version="2.22.0+matrix.1" />
<import addon="script.module.defusedxml" version="0.6.0+matrix.1"/>
<import addon="plugin.video.plexkodiconnect.movies" version="3.0.1" /> <import addon="plugin.video.plexkodiconnect.movies" version="3.0.1" />
<import addon="plugin.video.plexkodiconnect.tvshows" version="3.0.1" /> <import addon="plugin.video.plexkodiconnect.tvshows" version="3.0.1" />
<import addon="metadata.themoviedb.org.python" version="1.3.1+matrix.1" /> <import addon="metadata.themoviedb.org.python" version="1.3.1+matrix.1" />
@ -92,7 +91,28 @@
<summary lang="ko_KR">Plex를 Kodi에 기본 통합</summary> <summary lang="ko_KR">Plex를 Kodi에 기본 통합</summary>
<description lang="ko_KR">Kodi를 Plex Media Server에 연결합니다. 이 플러그인은 Plex로 모든 비디오를 관리하고 Kodi로는 관리하지 않는다고 가정합니다. Kodi 비디오 및 음악 데이터베이스에 이미 저장된 데이터가 손실 될 수 있습니다 (이 플러그인이 직접 변경하므로). 자신의 책임하에 사용하십시오!</description> <description lang="ko_KR">Kodi를 Plex Media Server에 연결합니다. 이 플러그인은 Plex로 모든 비디오를 관리하고 Kodi로는 관리하지 않는다고 가정합니다. Kodi 비디오 및 음악 데이터베이스에 이미 저장된 데이터가 손실 될 수 있습니다 (이 플러그인이 직접 변경하므로). 자신의 책임하에 사용하십시오!</description>
<disclaimer lang="ko_KR">자신의 책임하에 사용</disclaimer> <disclaimer lang="ko_KR">자신의 책임하에 사용</disclaimer>
<news>version 3.3.2: <news>version 3.4.0:
- Improve logging for converting Unix timestamps
- Remove dependency on script.module.defusedxml - that module is now included in PKC
- version 3.3.3-3.3.5 for everyone
version 3.3.5 (beta only):
- Rewire defusedxml and xml.etree.ElementTree: Fix AttributeError: module 'resources.lib.utils' has no attribute 'ParseError'
- Fix errors when PKC tries to edit files that don't exist yet
version 3.3.4 (beta only):
- Fix a racing condition that could lead to the sync getting stuck
- Fix RecursionError: maximum recursion depth exceeded
- Bump websocket client: fix AttributeError: 'NoneType' object has no attribute 'is_ssl'
version 3.3.3 (beta only):
- Fix a racing condition that could lead to the sync process getting stuck
- Fix likelyhood of `database is locked` error occuring
- Fix AttributeError: module 'urllib' has no attribute 'parse'
- Support for the Plex HAMA agent to let Kodi identify animes (using Kodi's uniqueID 'anidb')
- Support forced HAMA IDs when using tvdb uniqueID
version 3.3.2:
- version 3.3.1 for everyone - version 3.3.1 for everyone
version 3.3.1 (beta only): version 3.3.1 (beta only):

View file

@ -1,3 +1,24 @@
version 3.4.0:
- Improve logging for converting Unix timestamps
- Remove dependency on script.module.defusedxml - that module is now included in PKC
- version 3.3.3-3.3.5 for everyone
version 3.3.5 (beta only):
- Rewire defusedxml and xml.etree.ElementTree: Fix AttributeError: module 'resources.lib.utils' has no attribute 'ParseError'
- Fix errors when PKC tries to edit files that don't exist yet
version 3.3.4 (beta only):
- Fix a racing condition that could lead to the sync getting stuck
- Fix RecursionError: maximum recursion depth exceeded
- Bump websocket client: fix AttributeError: 'NoneType' object has no attribute 'is_ssl'
version 3.3.3 (beta only):
- Fix a racing condition that could lead to the sync process getting stuck
- Fix likelyhood of `database is locked` error occuring
- Fix AttributeError: module 'urllib' has no attribute 'parse'
- Support for the Plex HAMA agent to let Kodi identify animes (using Kodi's uniqueID 'anidb')
- Support forced HAMA IDs when using tvdb uniqueID
version 3.3.2: version 3.3.2:
- version 3.3.1 for everyone - version 3.3.1 for everyone

View file

@ -135,43 +135,6 @@ class ProcessingQueue(queue.Queue, object):
def _qsize(self): def _qsize(self):
return self._current_queue._qsize() if self._current_queue else 0 return self._current_queue._qsize() if self._current_queue else 0
def _total_qsize(self):
"""
This method is BROKEN as it can lead to a deadlock when a single item
from the current section takes longer to download then any new items
coming in
"""
return sum(q._qsize() for q in self._queues) if self._queues else 0
def put(self, item, block=True, timeout=None):
"""
PKC customization of Queue.put. item needs to be the tuple
(count [int], {'section': [Section], 'xml': [etree xml]})
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise queue.Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise queue.Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def _put(self, item): def _put(self, item):
for i, section in enumerate(self._sections): for i, section in enumerate(self._sections):
if item[1]['section'] == section: if item[1]['section'] == section:
@ -188,16 +151,13 @@ class ProcessingQueue(queue.Queue, object):
Once the get()-method returns None, you've received the sentinel and Once the get()-method returns None, you've received the sentinel and
you've thus exhausted the queue you've thus exhausted the queue
""" """
self.not_full.acquire() with self.not_full:
try:
section.number_of_items = 1 section.number_of_items = 1
self._add_section(section) self._add_section(section)
# Add the actual sentinel to the queue we just added # Add the actual sentinel to the queue we just added
self._queues[-1]._put((None, None)) self._queues[-1]._put((None, None))
self.unfinished_tasks += 1 self.unfinished_tasks += 1
self.not_empty.notify() self.not_empty.notify()
finally:
self.not_full.release()
def add_section(self, section): def add_section(self, section):
""" """
@ -207,11 +167,26 @@ class ProcessingQueue(queue.Queue, object):
Be sure to set section.number_of_items correctly as it will signal Be sure to set section.number_of_items correctly as it will signal
when processing is completely done for a specific section! when processing is completely done for a specific section!
""" """
self.mutex.acquire() with self.mutex:
try:
self._add_section(section) self._add_section(section)
finally:
self.mutex.release() def change_section_number_of_items(self, section, number_of_items):
"""
Hit this method if you've reset section.number_of_items to make
sure we're not blocking
"""
with self.mutex:
self._change_section_number_of_items(section, number_of_items)
def _change_section_number_of_items(self, section, number_of_items):
section.number_of_items = number_of_items
if (self._current_section == section
and self._counter == number_of_items):
# We were actually waiting for more items to come in - but there
# aren't any!
self._init_next_section()
if self._qsize() > 0:
self.not_empty.notify()
def _add_section(self, section): def _add_section(self, section):
self._sections.append(section) self._sections.append(section)

View file

@ -6,6 +6,7 @@ from functools import wraps
from . import variables as v, app from . import variables as v, app
DB_WRITE_ATTEMPTS = 100 DB_WRITE_ATTEMPTS = 100
DB_WRITE_ATTEMPTS_TIMEOUT = 1 # in seconds
DB_CONNECTION_TIMEOUT = 10 DB_CONNECTION_TIMEOUT = 10
@ -43,7 +44,7 @@ def catch_operationalerrors(method):
self.kodiconn.commit() self.kodiconn.commit()
if self.artconn: if self.artconn:
self.artconn.commit() self.artconn.commit()
if app.APP.monitor.waitForAbort(0.1): if app.APP.monitor.waitForAbort(DB_WRITE_ATTEMPTS_TIMEOUT):
# PKC needs to quit # PKC needs to quit
return return
# Start new transactions # Start new transactions

View file

@ -1,41 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
xml.etree.ElementTree tries to encode with text.encode('ascii') - which is
just plain BS. This etree will always return unicode, not string
"""
# Originally tried faster cElementTree, but does NOT work reliably with Kodi
from defusedxml.ElementTree import DefusedXMLParser, _generate_etree_functions
from xml.etree.ElementTree import TreeBuilder as _TreeBuilder
from xml.etree.ElementTree import parse as _parse
from xml.etree.ElementTree import iterparse as _iterparse
from xml.etree.ElementTree import tostring
# Enable creation of new xmls and xml elements
from xml.etree.ElementTree import ElementTree, Element, SubElement, ParseError
class UnicodeXMLParser(DefusedXMLParser):
"""
PKC Hack to ensure we're always receiving unicode, not str
"""
@staticmethod
def _fixtext(text):
"""
Do NOT try to convert every entry to str with entry.encode('ascii')!
"""
return text
# aliases
XMLTreeBuilder = XMLParse = UnicodeXMLParser
parse, iterparse, fromstring = _generate_etree_functions(UnicodeXMLParser,
_TreeBuilder, _parse,
_iterparse)
XML = fromstring
__all__ = ['XML', 'XMLParse', 'XMLTreeBuilder', 'fromstring', 'iterparse',
'parse', 'tostring']

View file

@ -0,0 +1,188 @@
# defusedxml
#
# Copyright (c) 2013-2020 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.etree.ElementTree facade
"""
from __future__ import print_function, absolute_import
import sys
import warnings
from xml.etree.ElementTree import ParseError
from xml.etree.ElementTree import TreeBuilder as _TreeBuilder
from xml.etree.ElementTree import parse as _parse
from xml.etree.ElementTree import tostring
import importlib
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xml.etree.ElementTree"
def _get_py3_cls():
"""Python 3.3 hides the pure Python code but defusedxml requires it.
The code is based on test.support.import_fresh_module().
"""
pymodname = "xml.etree.ElementTree"
cmodname = "_elementtree"
pymod = sys.modules.pop(pymodname, None)
cmod = sys.modules.pop(cmodname, None)
sys.modules[cmodname] = None
try:
pure_pymod = importlib.import_module(pymodname)
finally:
# restore module
sys.modules[pymodname] = pymod
if cmod is not None:
sys.modules[cmodname] = cmod
else:
sys.modules.pop(cmodname, None)
# restore attribute on original package
etree_pkg = sys.modules["xml.etree"]
if pymod is not None:
etree_pkg.ElementTree = pymod
elif hasattr(etree_pkg, "ElementTree"):
del etree_pkg.ElementTree
_XMLParser = pure_pymod.XMLParser
_iterparse = pure_pymod.iterparse
# patch pure module to use ParseError from C extension
pure_pymod.ParseError = ParseError
return _XMLParser, _iterparse
_XMLParser, _iterparse = _get_py3_cls()
_sentinel = object()
class DefusedXMLParser(_XMLParser):
def __init__(
self,
html=_sentinel,
target=None,
encoding=None,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
super().__init__(target=target, encoding=encoding)
if html is not _sentinel:
# the 'html' argument has been deprecated and ignored in all
# supported versions of Python. Python 3.8 finally removed it.
if html:
raise TypeError("'html=True' is no longer supported.")
else:
warnings.warn(
"'html' keyword argument is no longer supported. Pass "
"in arguments as keyword arguments.",
category=DeprecationWarning,
)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
parser = self.parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
# aliases
# XMLParse is a typo, keep it for backwards compatibility
XMLTreeBuilder = XMLParse = XMLParser = DefusedXMLParser
def parse(source, parser=None, forbid_dtd=False, forbid_entities=True, forbid_external=True):
if parser is None:
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
return _parse(source, parser)
def iterparse(
source,
events=None,
parser=None,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
if parser is None:
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
return _iterparse(source, events, parser)
def fromstring(text, forbid_dtd=False, forbid_entities=True, forbid_external=True):
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
parser.feed(text)
return parser.close()
XML = fromstring
def fromstringlist(sequence, forbid_dtd=False, forbid_entities=True, forbid_external=True):
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
for text in sequence:
parser.feed(text)
return parser.close()
__all__ = [
"ParseError",
"XML",
"XMLParse",
"XMLParser",
"XMLTreeBuilder",
"fromstring",
"fromstringlist",
"iterparse",
"parse",
"tostring",
]

View file

@ -0,0 +1,67 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defuse XML bomb denial of service vulnerabilities
"""
from __future__ import print_function, absolute_import
import warnings
from .common import (
DefusedXmlException,
DTDForbidden,
EntitiesForbidden,
ExternalReferenceForbidden,
NotSupportedError,
_apply_defusing,
)
def defuse_stdlib():
"""Monkey patch and defuse all stdlib packages
:warning: The monkey patch is an EXPERIMETNAL feature.
"""
defused = {}
with warnings.catch_warnings():
from . import cElementTree
from . import ElementTree
from . import minidom
from . import pulldom
from . import sax
from . import expatbuilder
from . import expatreader
from . import xmlrpc
xmlrpc.monkey_patch()
defused[xmlrpc] = None
defused_mods = [
cElementTree,
ElementTree,
minidom,
pulldom,
sax,
expatbuilder,
expatreader,
]
for defused_mod in defused_mods:
stdlib_mod = _apply_defusing(defused_mod)
defused[defused_mod] = stdlib_mod
return defused
__version__ = "0.8.0.dev1"
__all__ = [
"DefusedXmlException",
"DTDForbidden",
"EntitiesForbidden",
"ExternalReferenceForbidden",
"NotSupportedError",
]

View file

@ -0,0 +1,47 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.etree.cElementTree
"""
import warnings
# This module is an alias for ElementTree just like xml.etree.cElementTree
from .ElementTree import (
XML,
XMLParse,
XMLParser,
XMLTreeBuilder,
fromstring,
fromstringlist,
iterparse,
parse,
tostring,
DefusedXMLParser,
ParseError,
)
__origin__ = "xml.etree.cElementTree"
warnings.warn(
"defusedxml.cElementTree is deprecated, import from defusedxml.ElementTree instead.",
category=DeprecationWarning,
stacklevel=2,
)
__all__ = [
"ParseError",
"XML",
"XMLParse",
"XMLParser",
"XMLTreeBuilder",
"fromstring",
"fromstringlist",
"iterparse",
"parse",
"tostring",
# backwards compatibility
"DefusedXMLParser",
]

View file

@ -0,0 +1,85 @@
# defusedxml
#
# Copyright (c) 2013-2020 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Common constants, exceptions and helpe functions
"""
import sys
import xml.parsers.expat
PY3 = True
# Fail early when pyexpat is not installed correctly
if not hasattr(xml.parsers.expat, "ParserCreate"):
raise ImportError("pyexpat") # pragma: no cover
class DefusedXmlException(ValueError):
"""Base exception"""
def __repr__(self):
return str(self)
class DTDForbidden(DefusedXmlException):
"""Document type definition is forbidden"""
def __init__(self, name, sysid, pubid):
super().__init__()
self.name = name
self.sysid = sysid
self.pubid = pubid
def __str__(self):
tpl = "DTDForbidden(name='{}', system_id={!r}, public_id={!r})"
return tpl.format(self.name, self.sysid, self.pubid)
class EntitiesForbidden(DefusedXmlException):
"""Entity definition is forbidden"""
def __init__(self, name, value, base, sysid, pubid, notation_name):
super().__init__()
self.name = name
self.value = value
self.base = base
self.sysid = sysid
self.pubid = pubid
self.notation_name = notation_name
def __str__(self):
tpl = "EntitiesForbidden(name='{}', system_id={!r}, public_id={!r})"
return tpl.format(self.name, self.sysid, self.pubid)
class ExternalReferenceForbidden(DefusedXmlException):
"""Resolving an external reference is forbidden"""
def __init__(self, context, base, sysid, pubid):
super().__init__()
self.context = context
self.base = base
self.sysid = sysid
self.pubid = pubid
def __str__(self):
tpl = "ExternalReferenceForbidden(system_id='{}', public_id={})"
return tpl.format(self.sysid, self.pubid)
class NotSupportedError(DefusedXmlException):
"""The operation is not supported"""
def _apply_defusing(defused_mod):
assert defused_mod is sys.modules[defused_mod.__name__]
stdlib_name = defused_mod.__origin__
__import__(stdlib_name, {}, {}, ["*"])
stdlib_mod = sys.modules[stdlib_name]
stdlib_names = set(dir(stdlib_mod))
for name, obj in vars(defused_mod).items():
if name.startswith("_") or name not in stdlib_names:
continue
setattr(stdlib_mod, name, obj)
return stdlib_mod

View file

@ -0,0 +1,107 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.dom.expatbuilder
"""
from __future__ import print_function, absolute_import
from xml.dom.expatbuilder import ExpatBuilder as _ExpatBuilder
from xml.dom.expatbuilder import Namespaces as _Namespaces
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xml.dom.expatbuilder"
class DefusedExpatBuilder(_ExpatBuilder):
"""Defused document builder"""
def __init__(
self, options=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
_ExpatBuilder.__init__(self, options)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def install(self, parser):
_ExpatBuilder.install(self, parser)
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
# if self._options.entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
class DefusedExpatBuilderNS(_Namespaces, DefusedExpatBuilder):
"""Defused document builder that supports namespaces."""
def install(self, parser):
DefusedExpatBuilder.install(self, parser)
if self._options.namespace_declarations:
parser.StartNamespaceDeclHandler = self.start_namespace_decl_handler
def reset(self):
DefusedExpatBuilder.reset(self)
self._initNamespaces()
def parse(file, namespaces=True, forbid_dtd=False, forbid_entities=True, forbid_external=True):
"""Parse a document, returning the resulting Document node.
'file' may be either a file name or an open file object.
"""
if namespaces:
build_builder = DefusedExpatBuilderNS
else:
build_builder = DefusedExpatBuilder
builder = build_builder(
forbid_dtd=forbid_dtd, forbid_entities=forbid_entities, forbid_external=forbid_external
)
if isinstance(file, str):
fp = open(file, "rb")
try:
result = builder.parseFile(fp)
finally:
fp.close()
else:
result = builder.parseFile(file)
return result
def parseString(
string, namespaces=True, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
"""Parse a document from a string, returning the resulting
Document node.
"""
if namespaces:
build_builder = DefusedExpatBuilderNS
else:
build_builder = DefusedExpatBuilder
builder = build_builder(
forbid_dtd=forbid_dtd, forbid_entities=forbid_entities, forbid_external=forbid_external
)
return builder.parseString(string)

View file

@ -0,0 +1,61 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.sax.expatreader
"""
from __future__ import print_function, absolute_import
from xml.sax.expatreader import ExpatParser as _ExpatParser
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xml.sax.expatreader"
class DefusedExpatParser(_ExpatParser):
"""Defused SAX driver for the pyexpat C module."""
def __init__(
self,
namespaceHandling=0,
bufsize=2 ** 16 - 20,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
super().__init__(namespaceHandling, bufsize)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def reset(self):
super().reset()
parser = self._parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def create_parser(*args, **kwargs):
return DefusedExpatParser(*args, **kwargs)

View file

@ -0,0 +1,153 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""DEPRECATED Example code for lxml.etree protection
The code has NO protection against decompression bombs.
"""
from __future__ import print_function, absolute_import
import threading
import warnings
from lxml import etree as _etree
from .common import DTDForbidden, EntitiesForbidden, NotSupportedError
LXML3 = _etree.LXML_VERSION[0] >= 3
__origin__ = "lxml.etree"
tostring = _etree.tostring
warnings.warn(
"defusedxml.lxml is no longer supported and will be removed in a future release.",
category=DeprecationWarning,
stacklevel=2,
)
class RestrictedElement(_etree.ElementBase):
"""A restricted Element class that filters out instances of some classes"""
__slots__ = ()
# blacklist = (etree._Entity, etree._ProcessingInstruction, etree._Comment)
blacklist = _etree._Entity
def _filter(self, iterator):
blacklist = self.blacklist
for child in iterator:
if isinstance(child, blacklist):
continue
yield child
def __iter__(self):
iterator = super(RestrictedElement, self).__iter__()
return self._filter(iterator)
def iterchildren(self, tag=None, reversed=False):
iterator = super(RestrictedElement, self).iterchildren(tag=tag, reversed=reversed)
return self._filter(iterator)
def iter(self, tag=None, *tags):
iterator = super(RestrictedElement, self).iter(tag=tag, *tags)
return self._filter(iterator)
def iterdescendants(self, tag=None, *tags):
iterator = super(RestrictedElement, self).iterdescendants(tag=tag, *tags)
return self._filter(iterator)
def itersiblings(self, tag=None, preceding=False):
iterator = super(RestrictedElement, self).itersiblings(tag=tag, preceding=preceding)
return self._filter(iterator)
def getchildren(self):
iterator = super(RestrictedElement, self).__iter__()
return list(self._filter(iterator))
def getiterator(self, tag=None):
iterator = super(RestrictedElement, self).getiterator(tag)
return self._filter(iterator)
class GlobalParserTLS(threading.local):
"""Thread local context for custom parser instances"""
parser_config = {
"resolve_entities": False,
# 'remove_comments': True,
# 'remove_pis': True,
}
element_class = RestrictedElement
def createDefaultParser(self):
parser = _etree.XMLParser(**self.parser_config)
element_class = self.element_class
if self.element_class is not None:
lookup = _etree.ElementDefaultClassLookup(element=element_class)
parser.set_element_class_lookup(lookup)
return parser
def setDefaultParser(self, parser):
self._default_parser = parser
def getDefaultParser(self):
parser = getattr(self, "_default_parser", None)
if parser is None:
parser = self.createDefaultParser()
self.setDefaultParser(parser)
return parser
_parser_tls = GlobalParserTLS()
getDefaultParser = _parser_tls.getDefaultParser
def check_docinfo(elementtree, forbid_dtd=False, forbid_entities=True):
"""Check docinfo of an element tree for DTD and entity declarations
The check for entity declarations needs lxml 3 or newer. lxml 2.x does
not support dtd.iterentities().
"""
docinfo = elementtree.docinfo
if docinfo.doctype:
if forbid_dtd:
raise DTDForbidden(docinfo.doctype, docinfo.system_url, docinfo.public_id)
if forbid_entities and not LXML3:
# lxml < 3 has no iterentities()
raise NotSupportedError("Unable to check for entity declarations " "in lxml 2.x")
if forbid_entities:
for dtd in docinfo.internalDTD, docinfo.externalDTD:
if dtd is None:
continue
for entity in dtd.iterentities():
raise EntitiesForbidden(entity.name, entity.content, None, None, None, None)
def parse(source, parser=None, base_url=None, forbid_dtd=False, forbid_entities=True):
if parser is None:
parser = getDefaultParser()
elementtree = _etree.parse(source, parser, base_url=base_url)
check_docinfo(elementtree, forbid_dtd, forbid_entities)
return elementtree
def fromstring(text, parser=None, base_url=None, forbid_dtd=False, forbid_entities=True):
if parser is None:
parser = getDefaultParser()
rootelement = _etree.fromstring(text, parser, base_url=base_url)
elementtree = rootelement.getroottree()
check_docinfo(elementtree, forbid_dtd, forbid_entities)
return rootelement
XML = fromstring
def iterparse(*args, **kwargs):
raise NotSupportedError("defused lxml.etree.iterparse not available")

View file

@ -0,0 +1,63 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.dom.minidom
"""
from __future__ import print_function, absolute_import
from xml.dom.minidom import _do_pulldom_parse
from . import expatbuilder as _expatbuilder
from . import pulldom as _pulldom
__origin__ = "xml.dom.minidom"
def parse(
file, parser=None, bufsize=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
"""Parse a file into a DOM by filename or file object."""
if parser is None and not bufsize:
return _expatbuilder.parse(
file,
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
else:
return _do_pulldom_parse(
_pulldom.parse,
(file,),
{
"parser": parser,
"bufsize": bufsize,
"forbid_dtd": forbid_dtd,
"forbid_entities": forbid_entities,
"forbid_external": forbid_external,
},
)
def parseString(
string, parser=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
"""Parse a file into a DOM from a string."""
if parser is None:
return _expatbuilder.parseString(
string,
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
else:
return _do_pulldom_parse(
_pulldom.parseString,
(string,),
{
"parser": parser,
"forbid_dtd": forbid_dtd,
"forbid_entities": forbid_entities,
"forbid_external": forbid_external,
},
)

View file

@ -0,0 +1,41 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.dom.pulldom
"""
from __future__ import print_function, absolute_import
from xml.dom.pulldom import parse as _parse
from xml.dom.pulldom import parseString as _parseString
from .sax import make_parser
__origin__ = "xml.dom.pulldom"
def parse(
stream_or_string,
parser=None,
bufsize=None,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
if parser is None:
parser = make_parser()
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
return _parse(stream_or_string, parser, bufsize)
def parseString(
string, parser=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
if parser is None:
parser = make_parser()
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
return _parseString(string, parser)

View file

@ -0,0 +1,60 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.sax
"""
from __future__ import print_function, absolute_import
from xml.sax import InputSource as _InputSource
from xml.sax import ErrorHandler as _ErrorHandler
from . import expatreader
__origin__ = "xml.sax"
def parse(
source,
handler,
errorHandler=_ErrorHandler(),
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
parser = make_parser()
parser.setContentHandler(handler)
parser.setErrorHandler(errorHandler)
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
parser.parse(source)
def parseString(
string,
handler,
errorHandler=_ErrorHandler(),
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
from io import BytesIO
if errorHandler is None:
errorHandler = _ErrorHandler()
parser = make_parser()
parser.setContentHandler(handler)
parser.setErrorHandler(errorHandler)
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
inpsrc = _InputSource()
inpsrc.setByteStream(BytesIO(string))
parser.parse(inpsrc)
def make_parser(parser_list=[]):
return expatreader.create_parser()

View file

@ -0,0 +1,144 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xmlrpclib
Also defuses gzip bomb
"""
from __future__ import print_function, absolute_import
import io
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xmlrpc.client"
from xmlrpc.client import ExpatParser
from xmlrpc import client as xmlrpc_client
from xmlrpc import server as xmlrpc_server
from xmlrpc.client import gzip_decode as _orig_gzip_decode
from xmlrpc.client import GzipDecodedResponse as _OrigGzipDecodedResponse
try:
import gzip
except ImportError: # pragma: no cover
gzip = None
# Limit maximum request size to prevent resource exhaustion DoS
# Also used to limit maximum amount of gzip decoded data in order to prevent
# decompression bombs
# A value of -1 or smaller disables the limit
MAX_DATA = 30 * 1024 * 1024 # 30 MB
def defused_gzip_decode(data, limit=None):
"""gzip encoded data -> unencoded data
Decode data using the gzip content encoding as described in RFC 1952
"""
if not gzip: # pragma: no cover
raise NotImplementedError
if limit is None:
limit = MAX_DATA
f = io.BytesIO(data)
gzf = gzip.GzipFile(mode="rb", fileobj=f)
try:
if limit < 0: # no limit
decoded = gzf.read()
else:
decoded = gzf.read(limit + 1)
except IOError: # pragma: no cover
raise ValueError("invalid data")
f.close()
gzf.close()
if limit >= 0 and len(decoded) > limit:
raise ValueError("max gzipped payload length exceeded")
return decoded
class DefusedGzipDecodedResponse(gzip.GzipFile if gzip else object):
"""a file-like object to decode a response encoded with the gzip
method, as described in RFC 1952.
"""
def __init__(self, response, limit=None):
# response doesn't support tell() and read(), required by
# GzipFile
if not gzip: # pragma: no cover
raise NotImplementedError
self.limit = limit = limit if limit is not None else MAX_DATA
if limit < 0: # no limit
data = response.read()
self.readlength = None
else:
data = response.read(limit + 1)
self.readlength = 0
if limit >= 0 and len(data) > limit:
raise ValueError("max payload length exceeded")
self.stringio = io.BytesIO(data)
super().__init__(mode="rb", fileobj=self.stringio)
def read(self, n):
if self.limit >= 0:
left = self.limit - self.readlength
n = min(n, left + 1)
data = gzip.GzipFile.read(self, n)
self.readlength += len(data)
if self.readlength > self.limit:
raise ValueError("max payload length exceeded")
return data
else:
return super().read(n)
def close(self):
super().close()
self.stringio.close()
class DefusedExpatParser(ExpatParser):
def __init__(self, target, forbid_dtd=False, forbid_entities=True, forbid_external=True):
super().__init__(target)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
parser = self._parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def monkey_patch():
xmlrpc_client.FastParser = DefusedExpatParser
xmlrpc_client.GzipDecodedResponse = DefusedGzipDecodedResponse
xmlrpc_client.gzip_decode = defused_gzip_decode
if xmlrpc_server:
xmlrpc_server.gzip_decode = defused_gzip_decode
def unmonkey_patch():
xmlrpc_client.FastParser = None
xmlrpc_client.GzipDecodedResponse = _OrigGzipDecodedResponse
xmlrpc_client.gzip_decode = _orig_gzip_decode
if xmlrpc_server:
xmlrpc_server.gzip_decode = _orig_gzip_decode

View file

@ -7,6 +7,7 @@ e.g. plugin://... calls. Hence be careful to only rely on window variables.
from logging import getLogger from logging import getLogger
import sys import sys
import copy import copy
import xml.etree.ElementTree as etree
import xbmc import xbmc
import xbmcplugin import xbmcplugin
@ -509,7 +510,7 @@ def browse_plex(key=None, plex_type=None, section_id=None, synched=True,
return return
if xml[0].tag == 'Hub': if xml[0].tag == 'Hub':
# E.g. when hitting the endpoint '/hubs/search' # E.g. when hitting the endpoint '/hubs/search'
answ = utils.etree.Element(xml.tag, attrib=xml.attrib) answ = etree.Element(xml.tag, attrib=xml.attrib)
for hub in xml: for hub in xml:
if not utils.cast(int, hub.get('size')): if not utils.cast(int, hub.get('size')):
# Empty category # Empty category

View file

@ -5,7 +5,7 @@ from logging import getLogger
from xbmc import executebuiltin from xbmc import executebuiltin
from . import utils from . import utils
from .utils import etree import xml.etree.ElementTree as etree
from . import path_ops from . import path_ops
from . import migration from . import migration
from .downloadutils import DownloadUtils as DU, exceptions from .downloadutils import DownloadUtils as DU, exceptions

View file

@ -45,6 +45,10 @@ class FillMetadataQueue(common.LibrarySyncMixin,
if (not self.repair and if (not self.repair and
plexdb.checksum(plex_id, section.plex_type) == checksum): plexdb.checksum(plex_id, section.plex_type) == checksum):
continue continue
if not do_process_section:
do_process_section = True
self.processing_queue.add_section(section)
LOG.debug('Put section in processing queue: %s', section)
try: try:
self.get_metadata_queue.put((count, plex_id, section), self.get_metadata_queue.put((count, plex_id, section),
timeout=QUEUE_TIMEOUT) timeout=QUEUE_TIMEOUT)
@ -53,16 +57,14 @@ class FillMetadataQueue(common.LibrarySyncMixin,
'aborting sync now', plex_id) 'aborting sync now', plex_id)
section.sync_successful = False section.sync_successful = False
break break
count += 1 else:
if not do_process_section: count += 1
do_process_section = True
self.processing_queue.add_section(section)
LOG.debug('Put section in queue with %s items: %s',
section.number_of_items, section)
# We might have received LESS items from the PMS than anticipated. # We might have received LESS items from the PMS than anticipated.
# Ensures that our queues finish # Ensures that our queues finish
LOG.debug('%s items to process for section %s', count, section) self.processing_queue.change_section_number_of_items(section,
section.number_of_items = count count)
LOG.debug('%s items to process for section %s',
section.number_of_items, section)
def _run(self): def _run(self):
while not self.should_cancel(): while not self.should_cancel():

View file

@ -3,7 +3,7 @@
import urllib.request, urllib.parse, urllib.error import urllib.request, urllib.parse, urllib.error
import copy import copy
from ..utils import etree import xml.etree.ElementTree as etree
from .. import variables as v, utils from .. import variables as v, utils
ICON_PATH = 'special://home/addons/plugin.video.plexkodiconnect/icon.png' ICON_PATH = 'special://home/addons/plugin.video.plexkodiconnect/icon.png'

View file

@ -9,7 +9,7 @@ from ..plex_api import API
from .. import kodi_db from .. import kodi_db
from .. import itemtypes, path_ops from .. import itemtypes, path_ops
from .. import plex_functions as PF, music, utils, variables as v, app from .. import plex_functions as PF, music, utils, variables as v, app
from ..utils import etree import xml.etree.ElementTree as etree
LOG = getLogger('PLEX.sync.sections') LOG = getLogger('PLEX.sync.sections')
@ -92,6 +92,7 @@ class Section(object):
"'name': '{self.name}', " "'name': '{self.name}', "
"'section_id': {self.section_id}, " "'section_id': {self.section_id}, "
"'section_type': '{self.section_type}', " "'section_type': '{self.section_type}', "
"'plex_type': '{self.plex_type}', "
"'sync_to_kodi': {self.sync_to_kodi}, " "'sync_to_kodi': {self.sync_to_kodi}, "
"'last_sync': {self.last_sync}" "'last_sync': {self.last_sync}"
"}}").format(self=self) "}}").format(self=self)
@ -105,6 +106,8 @@ class Section(object):
def __eq__(self, section): def __eq__(self, section):
"""Sections compare equal if their section_id, name and plex_type (first prio) OR section_type (if there is no plex_type is set) compare equal. """Sections compare equal if their section_id, name and plex_type (first prio) OR section_type (if there is no plex_type is set) compare equal.
""" """
if not isinstance(section, Section):
return False
return (self.section_id == section.section_id and return (self.section_id == section.section_id and
self.name == section.name and self.name == section.name and
(self.plex_type == section.plex_type if self.plex_type else (self.plex_type == section.plex_type if self.plex_type else

View file

@ -14,8 +14,8 @@ LOG = getLogger('PLEX.api')
METADATA_PROVIDERS = (('imdb', utils.REGEX_IMDB), METADATA_PROVIDERS = (('imdb', utils.REGEX_IMDB),
('tvdb', utils.REGEX_TVDB), ('tvdb', utils.REGEX_TVDB),
('tmdb', utils.REGEX_TMDB)) ('tmdb', utils.REGEX_TMDB),
('anidb', utils.REGEX_ANIDB))
class Base(object): class Base(object):
""" """
Processes a Plex media server's XML response Processes a Plex media server's XML response

View file

@ -33,10 +33,11 @@ def unix_date_to_kodi(unix_kodi_time):
""" """
try: try:
return strftime('%Y-%m-%d %H:%M:%S', localtime(float(unix_kodi_time))) return strftime('%Y-%m-%d %H:%M:%S', localtime(float(unix_kodi_time)))
except Exception: except Exception as exception:
LOG.exception('Received an illegal timestamp from Plex: %s. ' LOG.error(exception)
'Using 1970-01-01 12:00:00', LOG.error('Received an illegal timestamp from Plex: %s, type %s. '
unix_kodi_time) 'Using 1970-01-01 12:00:00',
unix_kodi_time, type(unix_kodi_time))
return '1970-01-01 12:00:00' return '1970-01-01 12:00:00'

View file

@ -9,9 +9,15 @@ from datetime import datetime
from unicodedata import normalize from unicodedata import normalize
from threading import Lock from threading import Lock
import urllib import urllib
# even with the above import urllib, Python3 sometimes runs into this issue
# AttributeError: module 'urllib' has no attribute 'parse'
# Hence import explicitly
import urllib.parse
# Originally tried faster cElementTree, but does NOT work reliably with Kodi # Originally tried faster cElementTree, but does NOT work reliably with Kodi
# etree parse unsafe; make sure we're always receiving unicode # etree parse unsafe; make sure we're always receiving unicode
from . import defused_etree as etree from .defusedxml import ElementTree as etree
from .defusedxml.ElementTree import ParseError
import xml.etree.ElementTree as undefused_etree
from functools import wraps from functools import wraps
import re import re
import gc import gc
@ -44,8 +50,9 @@ REGEX_END_DIGITS = re.compile(r'''/(.+)/(\d+)$''')
REGEX_PLEX_DIRECT = re.compile(r'''\.plex\.direct:\d+$''') REGEX_PLEX_DIRECT = re.compile(r'''\.plex\.direct:\d+$''')
# Plex API # Plex API
REGEX_IMDB = re.compile(r'''/(tt\d+)''') REGEX_IMDB = re.compile(r'''/(tt\d+)''')
REGEX_TVDB = re.compile(r'''thetvdb:\/\/(.+?)\?''') REGEX_TVDB = re.compile(r'''(?:the)?tvdb(?::\/\/|[2-5]?-)(\d+?)\?''')
REGEX_TMDB = re.compile(r'''themoviedb:\/\/(.+?)\?''') REGEX_TMDB = re.compile(r'''themoviedb:\/\/(.+?)\?''')
REGEX_ANIDB = re.compile(r'''anidb[2-4]?-(\d+?)\?''')
# Plex music # Plex music
REGEX_MUSICPATH = re.compile(r'''^\^(.+)\$$''') REGEX_MUSICPATH = re.compile(r'''^\^(.+)\$$''')
# Grab Plex id from an URL-encoded string # Grab Plex id from an URL-encoded string
@ -692,19 +699,19 @@ class XmlKodiSetting(object):
# Document is blank or missing # Document is blank or missing
if self.force_create is False: if self.force_create is False:
LOG.debug('%s does not seem to exist; not creating', self.path) LOG.debug('%s does not seem to exist; not creating', self.path)
# This will abort __enter__ raise
self.__exit__(IOError('File not found'), None, None)
# Create topmost xml entry # Create topmost xml entry
self.tree = etree.ElementTree(etree.Element(self.top_element)) self.tree = undefused_etree.ElementTree(
undefused_etree.Element(self.top_element))
self.write_xml = True self.write_xml = True
except etree.ParseError: except ParseError:
LOG.error('Error parsing %s', self.path) LOG.error('Error parsing %s', self.path)
# "Kodi cannot parse {0}. PKC will not function correctly. Please # "Kodi cannot parse {0}. PKC will not function correctly. Please
# visit {1} and correct your file!" # visit {1} and correct your file!"
messageDialog(lang(29999), lang(39716).format( messageDialog(lang(29999), lang(39716).format(
self.filename, self.filename,
'http://kodi.wiki')) 'http://kodi.wiki'))
self.__exit__(etree.ParseError('Error parsing XML'), None, None) raise
self.root = self.tree.getroot() self.root = self.tree.getroot()
return self return self
@ -730,6 +737,7 @@ class XmlKodiSetting(object):
lang(30417).format(self.filename, err)) lang(30417).format(self.filename, err))
settings('%s_ioerror' % self.filename, settings('%s_ioerror' % self.filename,
value='warning_shown') value='warning_shown')
return True
def _is_empty(self, element, empty_elements): def _is_empty(self, element, empty_elements):
empty = True empty = True
@ -766,7 +774,7 @@ class XmlKodiSetting(object):
""" """
answ = element.find(subelement) answ = element.find(subelement)
if answ is None: if answ is None:
answ = etree.SubElement(element, subelement) answ = undefused_etree.SubElement(element, subelement)
return answ return answ
def get_setting(self, node_list): def get_setting(self, node_list):
@ -842,7 +850,7 @@ class XmlKodiSetting(object):
for node in nodes: for node in nodes:
element = self._set_sub_element(element, node) element = self._set_sub_element(element, node)
if append: if append:
element = etree.SubElement(element, node_list[-1]) element = undefused_etree.SubElement(element, node_list[-1])
# Write new values # Write new values
element.text = value element.text = value
if attrib: if attrib:

View file

@ -25,4 +25,4 @@ from ._exceptions import *
from ._logging import * from ._logging import *
from ._socket import * from ._socket import *
__version__ = "1.0.0" __version__ = "1.1.0"

View file

@ -25,30 +25,31 @@ Copyright (C) 2010 Hiroki Ohtani(liris)
import array import array
import os import os
import struct import struct
import sys
from ._exceptions import * from ._exceptions import *
from ._utils import validate_utf8 from ._utils import validate_utf8
from threading import Lock from threading import Lock
try: try:
import numpy # If wsaccel is available, use compiled routines to mask data.
except ImportError: # wsaccel only provides around a 10% speed boost compared
numpy = None # to the websocket-client _mask() implementation.
# Note that wsaccel is unmaintained.
from wsaccel.xormask import XorMaskerSimple
try:
# If wsaccel is available we use compiled routines to mask data.
if not numpy:
from wsaccel.xormask import XorMaskerSimple
def _mask(_m, _d):
return XorMaskerSimple(_m).process(_d)
except ImportError:
# wsaccel is not available, we rely on python implementations.
def _mask(_m, _d): def _mask(_m, _d):
for i in range(len(_d)): return XorMaskerSimple(_m).process(_d)
_d[i] ^= _m[i % 4]
return _d.tobytes() except ImportError:
# wsaccel is not available, use websocket-client _mask()
native_byteorder = sys.byteorder
def _mask(mask_value, data_value):
datalen = len(data_value)
data_value = int.from_bytes(data_value, native_byteorder)
mask_value = int.from_bytes(mask_value * (datalen // 4) + mask_value[: datalen % 4], native_byteorder)
return (data_value ^ mask_value).to_bytes(datalen, native_byteorder)
__all__ = [ __all__ = [
@ -266,19 +267,7 @@ class ABNF(object):
if isinstance(data, str): if isinstance(data, str):
data = data.encode('latin-1') data = data.encode('latin-1')
if numpy: return _mask(array.array("B", mask_key), array.array("B", data))
origlen = len(data)
_mask_key = mask_key[3] << 24 | mask_key[2] << 16 | mask_key[1] << 8 | mask_key[0]
# We need data to be a multiple of four...
data += b' ' * (4 - (len(data) % 4))
a = numpy.frombuffer(data, dtype="uint32")
masked = numpy.bitwise_xor(a, [_mask_key]).astype("uint32")
if len(data) > origlen:
return masked.tobytes()[:origlen]
return masked.tobytes()
else:
return _mask(array.array("B", mask_key), array.array("B", data))
class frame_buffer(object): class frame_buffer(object):
@ -374,7 +363,7 @@ class frame_buffer(object):
return frame return frame
def recv_strict(self, bufsize): def recv_strict(self, bufsize):
shortage = bufsize - sum(len(x) for x in self.recv_buffer) shortage = bufsize - sum(map(len, self.recv_buffer))
while shortage > 0: while shortage > 0:
# Limit buffer size that we pass to socket.recv() to avoid # Limit buffer size that we pass to socket.recv() to avoid
# fragmenting the heap -- the number of bytes recv() actually # fragmenting the heap -- the number of bytes recv() actually

View file

@ -105,52 +105,56 @@ class WebSocketApp(object):
Parameters Parameters
---------- ----------
url: <type> url: str
websocket url. Websocket url.
header: list or dict header: list or dict
custom header for websocket handshake. Custom header for websocket handshake.
on_open: <type> on_open: function
callable object which is called at opening websocket. Callback object which is called at opening websocket.
this function has one argument. The argument is this class object. on_open has one argument.
on_message: <type> The 1st argument is this class object.
callable object which is called when received data. on_message: function
Callback object which is called when received data.
on_message has 2 arguments. on_message has 2 arguments.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server. The 2nd argument is utf-8 data received from the server.
on_error: <type> on_error: function
callable object which is called when we get error. Callback object which is called when we get error.
on_error has 2 arguments. on_error has 2 arguments.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is exception object. The 2nd argument is exception object.
on_close: <type> on_close: function
callable object which is called when closed the connection. Callback object which is called when connection is closed.
this function has one argument. The argument is this class object. on_close has 3 arguments.
on_cont_message: <type> The 1st argument is this class object.
callback object which is called when receive continued The 2nd argument is close_status_code.
frame data. The 3rd argument is close_msg.
on_cont_message: function
Callback object which is called when a continuation
frame is received.
on_cont_message has 3 arguments. on_cont_message has 3 arguments.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server. The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is continue flag. if 0, the data continue The 3rd argument is continue flag. if 0, the data continue
to next frame data to next frame data
on_data: <type> on_data: function
callback object which is called when a message received. Callback object which is called when a message received.
This is called before on_message or on_cont_message, This is called before on_message or on_cont_message,
and then on_message or on_cont_message is called. and then on_message or on_cont_message is called.
on_data has 4 argument. on_data has 4 argument.
The 1st argument is this class object. The 1st argument is this class object.
The 2nd argument is utf-8 string which we get from the server. The 2nd argument is utf-8 string which we get from the server.
The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
The 4th argument is continue flag. if 0, the data continue The 4th argument is continue flag. If 0, the data continue
keep_running: <type> keep_running: bool
this parameter is obsolete and ignored. This parameter is obsolete and ignored.
get_mask_key: func get_mask_key: function
a callable to produce new mask keys, A callable function to get new mask keys, see the
see the WebSocket.set_mask_key's docstring for more information WebSocket.set_mask_key's docstring for more information.
cookie: str cookie: str
cookie value. Cookie value.
subprotocols: <type> subprotocols: list
array of available sub protocols. default is None. List of available sub protocols. Default is None.
""" """
self.url = url self.url = url
self.header = header if header is not None else [] self.header = header if header is not None else []
@ -177,11 +181,11 @@ class WebSocketApp(object):
Parameters Parameters
---------- ----------
data: <type> data: str
Message to send. If you set opcode to OPCODE_TEXT, Message to send. If you set opcode to OPCODE_TEXT,
data must be utf-8 string or unicode. data must be utf-8 string or unicode.
opcode: <type> opcode: int
Operation code of data. default is OPCODE_TEXT. Operation code of data. Default is OPCODE_TEXT.
""" """
if not self.sock or self.sock.send(data, opcode) == 0: if not self.sock or self.sock.send(data, opcode) == 0:
@ -214,8 +218,7 @@ class WebSocketApp(object):
http_no_proxy=None, http_proxy_auth=None, http_no_proxy=None, http_proxy_auth=None,
skip_utf8_validation=False, skip_utf8_validation=False,
host=None, origin=None, dispatcher=None, host=None, origin=None, dispatcher=None,
suppress_origin=False, proxy_type=None, suppress_origin=False, proxy_type=None):
enable_multithread=True):
""" """
Run event loop for WebSocket framework. Run event loop for WebSocket framework.
@ -224,32 +227,32 @@ class WebSocketApp(object):
Parameters Parameters
---------- ----------
sockopt: tuple sockopt: tuple
values for socket.setsockopt. Values for socket.setsockopt.
sockopt must be tuple sockopt must be tuple
and each element is argument of sock.setsockopt. and each element is argument of sock.setsockopt.
sslopt: dict sslopt: dict
optional dict object for ssl socket option. Optional dict object for ssl socket option.
ping_interval: int or float ping_interval: int or float
automatically send "ping" command Automatically send "ping" command
every specified period (in seconds) every specified period (in seconds).
if set to 0, not send automatically. If set to 0, no ping is sent periodically.
ping_timeout: int or float ping_timeout: int or float
timeout (in seconds) if the pong message is not received. Timeout (in seconds) if the pong message is not received.
ping_payload: str ping_payload: str
payload message to send with each ping. Payload message to send with each ping.
http_proxy_host: <type> http_proxy_host: str
http proxy host name. HTTP proxy host name.
http_proxy_port: <type> http_proxy_port: int or str
http proxy port. If not set, set to 80. HTTP proxy port. If not set, set to 80.
http_no_proxy: <type> http_no_proxy: list
host names, which doesn't use proxy. Whitelisted host names that don't use the proxy.
skip_utf8_validation: bool skip_utf8_validation: bool
skip utf8 validation. skip utf8 validation.
host: str host: str
update host header. update host header.
origin: str origin: str
update origin header. update origin header.
dispatcher: <type> dispatcher: Dispatcher object
customize reading data from socket. customize reading data from socket.
suppress_origin: bool suppress_origin: bool
suppress outputting origin header. suppress outputting origin header.
@ -281,8 +284,11 @@ class WebSocketApp(object):
""" """
Tears down the connection. Tears down the connection.
If close_frame is set, we will invoke the on_close handler with the Parameters
statusCode and reason from there. ----------
close_frame: ABNF frame
If close_frame is set, the on_close handler is invoked
with the statusCode and reason from the provided frame.
""" """
if thread and thread.is_alive(): if thread and thread.is_alive():
@ -301,7 +307,7 @@ class WebSocketApp(object):
self.get_mask_key, sockopt=sockopt, sslopt=sslopt, self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=self.on_cont_message is not None, fire_cont_frame=self.on_cont_message is not None,
skip_utf8_validation=skip_utf8_validation, skip_utf8_validation=skip_utf8_validation,
enable_multithread=enable_multithread) enable_multithread=True)
self.sock.settimeout(getdefaulttimeout()) self.sock.settimeout(getdefaulttimeout())
self.sock.connect( self.sock.connect(
self.url, header=self.header, cookie=self.cookie, self.url, header=self.header, cookie=self.cookie,

View file

@ -62,30 +62,31 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
get_mask_key: func get_mask_key: func
a callable to produce new mask keys, see the set_mask_key A callable function to get new mask keys, see the
function's docstring for more details WebSocket.set_mask_key's docstring for more information.
sockopt: tuple sockopt: tuple
values for socket.setsockopt. Values for socket.setsockopt.
sockopt must be tuple and each element is argument of sock.setsockopt. sockopt must be tuple and each element is argument of sock.setsockopt.
sslopt: dict sslopt: dict
optional dict object for ssl socket option. Optional dict object for ssl socket options.
fire_cont_frame: bool fire_cont_frame: bool
fire recv event for each cont frame. default is False Fire recv event for each cont frame. Default is False.
enable_multithread: bool enable_multithread: bool
if set to True, lock send method. If set to True, lock send method.
skip_utf8_validation: bool skip_utf8_validation: bool
skip utf8 validation. Skip utf8 validation.
""" """
def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, def __init__(self, get_mask_key=None, sockopt=None, sslopt=None,
fire_cont_frame=False, enable_multithread=False, fire_cont_frame=False, enable_multithread=True,
skip_utf8_validation=False, **_): skip_utf8_validation=False, **_):
""" """
Initialize WebSocket object. Initialize WebSocket object.
Parameters Parameters
---------- ----------
sslopt: specify ssl certification verification options sslopt: dict
Optional dict object for ssl socket options.
""" """
self.sock_opt = sock_opt(sockopt, sslopt) self.sock_opt = sock_opt(sockopt, sslopt)
self.handshake_response = None self.handshake_response = None
@ -194,7 +195,10 @@ class WebSocket(object):
return None return None
def is_ssl(self): def is_ssl(self):
return isinstance(self.sock, ssl.SSLSocket) try:
return isinstance(self.sock, ssl.SSLSocket)
except:
return False
headers = property(getheaders) headers = property(getheaders)
@ -210,40 +214,38 @@ class WebSocket(object):
... header=["User-Agent: MyProgram", ... header=["User-Agent: MyProgram",
... "x-custom: header"]) ... "x-custom: header"])
timeout: <type>
socket timeout time. This value is an integer or float.
if you set None for this value, it means "use default_timeout value"
Parameters Parameters
---------- ----------
options: header: list or dict
- header: list or dict Custom http header list or dict.
custom http header list or dict. cookie: str
- cookie: str Cookie value.
cookie value. origin: str
- origin: str Custom origin url.
custom origin url. connection: str
- connection: str Custom connection header value.
custom connection header value. Default value "Upgrade" set in _handshake.py
default value "Upgrade" set in _handshake.py suppress_origin: bool
- suppress_origin: bool Suppress outputting origin header.
suppress outputting origin header. host: str
- host: str Custom host header string.
custom host header string. timeout: int or float
- http_proxy_host: <type> Socket timeout time. This value is an integer or float.
http proxy host name. If you set None for this value, it means "use default_timeout value"
- http_proxy_port: <type> http_proxy_host: str
http proxy port. If not set, set to 80. HTTP proxy host name.
- http_no_proxy: <type> http_proxy_port: str or int
host names, which doesn't use proxy. HTTP proxy port. Default is 80.
- http_proxy_auth: <type> http_no_proxy: list
http proxy auth information. tuple of username and password. default is None Whitelisted host names that don't use the proxy.
- redirect_limit: <type> http_proxy_auth: tuple
number of redirects to follow. HTTP proxy auth information. Tuple of username and password. Default is None.
- subprotocols: <type> redirect_limit: int
array of available sub protocols. default is None. Number of redirects to follow.
- socket: <type> subprotocols: list
pre-initialized stream socket. List of available subprotocols. Default is None.
socket: socket
Pre-initialized stream socket.
""" """
self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout)
self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options),
@ -271,12 +273,12 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
payload: str payload: str
Payload must be utf-8 string or unicode, Payload must be utf-8 string or unicode,
if the opcode is OPCODE_TEXT. If the opcode is OPCODE_TEXT.
Otherwise, it must be string(byte array) Otherwise, it must be string(byte array).
opcode: int opcode: int
operation code to send. Please see OPCODE_XXX. Operation code (opcode) to send.
""" """
frame = ABNF.create_frame(payload, opcode) frame = ABNF.create_frame(payload, opcode)
@ -440,10 +442,10 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
status: <type> status: int
status code to send. see STATUS_XXX. Status code to send. See STATUS_XXX.
reason: str or bytes reason: str or bytes
the reason to close. This must be string or bytes. The reason to close. This must be string or bytes.
""" """
if status < 0 or status >= ABNF.LENGTH_16: if status < 0 or status >= ABNF.LENGTH_16:
raise ValueError("code is invalid range") raise ValueError("code is invalid range")
@ -457,11 +459,11 @@ class WebSocket(object):
Parameters Parameters
---------- ----------
status: int status: int
status code to send. see STATUS_XXX. Status code to send. See STATUS_XXX.
reason: bytes reason: bytes
the reason to close. The reason to close.
timeout: int or float timeout: int or float
timeout until receive a close frame. Timeout until receive a close frame.
If None, it will wait forever until receive a close frame. If None, it will wait forever until receive a close frame.
""" """
if self.connected: if self.connected:
@ -490,10 +492,8 @@ class WebSocket(object):
break break
self.sock.settimeout(sock_timeout) self.sock.settimeout(sock_timeout)
self.sock.shutdown(socket.SHUT_RDWR) self.sock.shutdown(socket.SHUT_RDWR)
except OSError: # This happens often on Mac
pass
except: except:
raise pass
self.shutdown() self.shutdown()
@ -544,52 +544,51 @@ def create_connection(url, timeout=None, class_=WebSocket, **options):
Parameters Parameters
---------- ----------
class_: class
class to instantiate when creating the connection. It has to implement
settimeout and connect. It's __init__ should be compatible with
WebSocket.__init__, i.e. accept all of it's kwargs.
header: list or dict
custom http header list or dict.
cookie: str
Cookie value.
origin: str
custom origin url.
suppress_origin: bool
suppress outputting origin header.
host: str
custom host header string.
timeout: int or float timeout: int or float
socket timeout time. This value could be either float/integer. socket timeout time. This value could be either float/integer.
if you set None for this value, If set to None, it uses the default_timeout value.
it means "use default_timeout value" http_proxy_host: str
class_: <type> HTTP proxy host name.
class to instantiate when creating the connection. It has to implement http_proxy_port: str or int
settimeout and connect. It's __init__ should be compatible with HTTP proxy port. If not set, set to 80.
WebSocket.__init__, i.e. accept all of it's kwargs. http_no_proxy: list
options: <type> Whitelisted host names that don't use the proxy.
- header: list or dict http_proxy_auth: tuple
custom http header list or dict. HTTP proxy auth information. tuple of username and password. Default is None.
- cookie: str enable_multithread: bool
cookie value. Enable lock for multithread.
- origin: str redirect_limit: int
custom origin url. Number of redirects to follow.
- suppress_origin: bool sockopt: tuple
suppress outputting origin header. Values for socket.setsockopt.
- host: <type> sockopt must be a tuple and each element is an argument of sock.setsockopt.
custom host header string. sslopt: dict
- http_proxy_host: <type> Optional dict object for ssl socket options.
http proxy host name. subprotocols: list
- http_proxy_port: <type> List of available subprotocols. Default is None.
http proxy port. If not set, set to 80. skip_utf8_validation: bool
- http_no_proxy: <type> Skip utf8 validation.
host names, which doesn't use proxy. socket: socket
- http_proxy_auth: <type> Pre-initialized stream socket.
http proxy auth information. tuple of username and password. default is None
- enable_multithread: bool
enable lock for multithread.
- redirect_limit: <type>
number of redirects to follow.
- sockopt: <type>
socket options
- sslopt: <type>
ssl option
- subprotocols: <type>
array of available sub protocols. default is None.
- skip_utf8_validation: bool
skip utf8 validation.
- socket: <type>
pre-initialized stream socket.
""" """
sockopt = options.pop("sockopt", []) sockopt = options.pop("sockopt", [])
sslopt = options.pop("sslopt", {}) sslopt = options.pop("sslopt", {})
fire_cont_frame = options.pop("fire_cont_frame", False) fire_cont_frame = options.pop("fire_cont_frame", False)
enable_multithread = options.pop("enable_multithread", False) enable_multithread = options.pop("enable_multithread", True)
skip_utf8_validation = options.pop("skip_utf8_validation", False) skip_utf8_validation = options.pop("skip_utf8_validation", False)
websock = class_(sockopt=sockopt, sslopt=sslopt, websock = class_(sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=fire_cont_frame, fire_cont_frame=fire_cont_frame,

View file

@ -79,7 +79,7 @@ def _open_proxied_socket(url, options, proxy):
(hostname, port), (hostname, port),
proxy_type=ptype, proxy_type=ptype,
proxy_addr=proxy.host, proxy_addr=proxy.host,
proxy_port=proxy.port, proxy_port=int(proxy.port),
proxy_rdns=rdns, proxy_rdns=rdns,
proxy_username=proxy.auth[0] if proxy.auth else None, proxy_username=proxy.auth[0] if proxy.auth else None,
proxy_password=proxy.auth[1] if proxy.auth else None, proxy_password=proxy.auth[1] if proxy.auth else None,
@ -200,7 +200,7 @@ def _open_socket(addrinfo_list, sockopt, timeout):
def _wrap_sni_socket(sock, sslopt, hostname, check_hostname): def _wrap_sni_socket(sock, sslopt, hostname, check_hostname):
context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_SSLv23)) context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_TLS))
if sslopt.get('cert_reqs', ssl.CERT_NONE) != ssl.CERT_NONE: if sslopt.get('cert_reqs', ssl.CERT_NONE) != ssl.CERT_NONE:
cafile = sslopt.get('ca_certs', None) cafile = sslopt.get('ca_certs', None)
@ -248,6 +248,9 @@ def _ssl_socket(sock, user_sslopt, hostname):
and user_sslopt.get('ca_cert_path', None) is None: and user_sslopt.get('ca_cert_path', None) is None:
sslopt['ca_cert_path'] = certPath sslopt['ca_cert_path'] = certPath
if sslopt.get('server_hostname', None):
hostname = sslopt['server_hostname']
check_hostname = sslopt["cert_reqs"] != ssl.CERT_NONE and sslopt.pop( check_hostname = sslopt["cert_reqs"] != ssl.CERT_NONE and sslopt.pop(
'check_hostname', True) 'check_hostname', True)
sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname) sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname)

View file

@ -125,7 +125,7 @@ def recv(sock, bufsize):
if not bytes_: if not bytes_:
raise WebSocketConnectionClosedException( raise WebSocketConnectionClosedException(
"Connection is already closed.") "Connection to remote host was lost.")
return bytes_ return bytes_

View file

@ -26,7 +26,7 @@ import os
import socket import socket
import struct import struct
from urllib.parse import urlparse from urllib.parse import unquote, urlparse
__all__ = ["parse_url", "get_proxy_info"] __all__ = ["parse_url", "get_proxy_info"]
@ -109,7 +109,7 @@ def _is_address_in_network(ip, net):
def _is_no_proxy_host(hostname, no_proxy): def _is_no_proxy_host(hostname, no_proxy):
if not no_proxy: if not no_proxy:
v = os.environ.get("no_proxy", "").replace(" ", "") v = os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(" ", "")
if v: if v:
no_proxy = v.split(",") no_proxy = v.split(",")
if not no_proxy: if not no_proxy:
@ -139,22 +139,21 @@ def get_proxy_info(
Parameters Parameters
---------- ----------
hostname: <type> hostname: str
websocket server name. Websocket server name.
is_secure: <type> is_secure: bool
is the connection secure? (wss) looks for "https_proxy" in env Is the connection secure? (wss) looks for "https_proxy" in env
before falling back to "http_proxy" before falling back to "http_proxy"
options: <type> proxy_host: str
- http_proxy_host: <type> http proxy host name.
http proxy host name. http_proxy_port: str or int
- http_proxy_port: <type> http proxy port.
http proxy port. http_no_proxy: list
- http_no_proxy: <type> Whitelisted host names that don't use the proxy.
host names, which doesn't use proxy. http_proxy_auth: tuple
- http_proxy_auth: <type> HTTP proxy auth information. Tuple of username and password. Default is None.
http proxy auth information. tuple of username and password. default is None proxy_type: str
- proxy_type: <type> If set to "socks4" or "socks5", a PySocks wrapper will be used in place of a HTTP proxy. Default is "http".
if set to "socks5" PySocks wrapper will be used in place of a http proxy. default is "http"
""" """
if _is_no_proxy_host(hostname, no_proxy): if _is_no_proxy_host(hostname, no_proxy):
return None, 0, None return None, 0, None
@ -169,10 +168,10 @@ def get_proxy_info(
env_keys.insert(0, "https_proxy") env_keys.insert(0, "https_proxy")
for key in env_keys: for key in env_keys:
value = os.environ.get(key, None) value = os.environ.get(key, os.environ.get(key.upper(), "")).replace(" ", "")
if value: if value:
proxy = urlparse(value) proxy = urlparse(value)
auth = (proxy.username, proxy.password) if proxy.username else None auth = (unquote(proxy.username), unquote(proxy.password)) if proxy.username else None
return proxy.hostname, proxy.port, auth return proxy.hostname, proxy.port, auth
return None, 0, None return None, 0, None

View file

@ -1,6 +0,0 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade
Upgrade: WebSocket
Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
some_header: something

View file

@ -1,6 +0,0 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade
Upgrade WebSocket
Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
some_header: something

View file

@ -1,7 +0,0 @@
HTTP/1.1 101 WebSocket Protocol Handshake
Connection: Upgrade, Keep-Alive
Upgrade: WebSocket
Sec-WebSocket-Accept: Kxep+hNu9n51529fGidYu7a3wO0=
Set-Cookie: Token=ABCDE
some_header: something

View file

@ -1,94 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import websocket as ws
from websocket._abnf import *
import sys
import unittest
sys.path[0:0] = [""]
class ABNFTest(unittest.TestCase):
def testInit(self):
a = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
self.assertEqual(a.fin, 0)
self.assertEqual(a.rsv1, 0)
self.assertEqual(a.rsv2, 0)
self.assertEqual(a.rsv3, 0)
self.assertEqual(a.opcode, 9)
self.assertEqual(a.data, '')
a_bad = ABNF(0,1,0,0, opcode=77)
self.assertEqual(a_bad.rsv1, 1)
self.assertEqual(a_bad.opcode, 77)
def testValidate(self):
a_invalid_ping = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING)
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_invalid_ping.validate, skip_utf8_validation=False)
a_bad_rsv_value = ABNF(0,1,0,0, opcode=ABNF.OPCODE_TEXT)
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_rsv_value.validate, skip_utf8_validation=False)
a_bad_opcode = ABNF(0,0,0,0, opcode=77)
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_opcode.validate, skip_utf8_validation=False)
a_bad_close_frame = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01')
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame.validate, skip_utf8_validation=False)
a_bad_close_frame_2 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x01\x8a\xaa\xff\xdd')
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_2.validate, skip_utf8_validation=False)
a_bad_close_frame_3 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_CLOSE, data=b'\x03\xe7')
self.assertRaises(ws._exceptions.WebSocketProtocolException, a_bad_close_frame_3.validate, skip_utf8_validation=True)
def testMask(self):
abnf_none_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data=None)
bytes_val = bytes("aaaa", 'utf-8')
self.assertEqual(abnf_none_data._get_masked(bytes_val), bytes_val)
abnf_str_data = ABNF(0,0,0,0, opcode=ABNF.OPCODE_PING, mask=1, data="a")
self.assertEqual(abnf_str_data._get_masked(bytes_val), b'aaaa\x00')
def testFormat(self):
abnf_bad_rsv_bits = ABNF(2,0,0,0, opcode=ABNF.OPCODE_TEXT)
self.assertRaises(ValueError, abnf_bad_rsv_bits.format)
abnf_bad_opcode = ABNF(0,0,0,0, opcode=5)
self.assertRaises(ValueError, abnf_bad_opcode.format)
abnf_length_10 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, data="abcdefghij")
self.assertEqual(b'\x01', abnf_length_10.format()[0].to_bytes(1, 'big'))
self.assertEqual(b'\x8a', abnf_length_10.format()[1].to_bytes(1, 'big'))
self.assertEqual("fin=0 opcode=1 data=abcdefghij", abnf_length_10.__str__())
abnf_length_20 = ABNF(0,0,0,0, opcode=ABNF.OPCODE_BINARY, data="abcdefghijabcdefghij")
self.assertEqual(b'\x02', abnf_length_20.format()[0].to_bytes(1, 'big'))
self.assertEqual(b'\x94', abnf_length_20.format()[1].to_bytes(1, 'big'))
abnf_no_mask = ABNF(0,0,0,0, opcode=ABNF.OPCODE_TEXT, mask=0, data=b'\x01\x8a\xcc')
self.assertEqual(b'\x01\x03\x01\x8a\xcc', abnf_no_mask.format())
def testFrameBuffer(self):
fb = frame_buffer(0, True)
self.assertEqual(fb.recv, 0)
self.assertEqual(fb.skip_utf8_validation, True)
fb.clear
self.assertEqual(fb.header, None)
self.assertEqual(fb.length, None)
self.assertEqual(fb.mask, None)
self.assertEqual(fb.has_mask(), False)
if __name__ == "__main__":
unittest.main()

View file

@ -1,176 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import os.path
import websocket as ws
import sys
import ssl
import unittest
sys.path[0:0] = [""]
# Skip test to access the internet.
TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
TRACEABLE = True
class WebSocketAppTest(unittest.TestCase):
class NotSetYet(object):
""" A marker class for signalling that a value hasn't been set yet.
"""
def setUp(self):
ws.enableTrace(TRACEABLE)
WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
def tearDown(self):
WebSocketAppTest.keep_running_open = WebSocketAppTest.NotSetYet()
WebSocketAppTest.keep_running_close = WebSocketAppTest.NotSetYet()
WebSocketAppTest.get_mask_key_id = WebSocketAppTest.NotSetYet()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testKeepRunning(self):
""" A WebSocketApp should keep running as long as its self.keep_running
is not False (in the boolean context).
"""
def on_open(self, *args, **kwargs):
""" Set the keep_running flag for later inspection and immediately
close the connection.
"""
WebSocketAppTest.keep_running_open = self.keep_running
self.close()
def on_close(self, *args, **kwargs):
""" Set the keep_running flag for the test to use.
"""
WebSocketAppTest.keep_running_close = self.keep_running
self.send("connection should be closed here")
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
app.run_forever()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSockMaskKey(self):
""" A WebSocketApp should forward the received mask_key function down
to the actual socket.
"""
def my_mask_key_func():
return "\x00\x00\x00\x00"
app = ws.WebSocketApp('wss://stream.meetup.com/2/rsvps', get_mask_key=my_mask_key_func)
# if numpy is installed, this assertion fail
# Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
self.assertEqual(id(app.get_mask_key), id(my_mask_key_func))
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testInvalidPingIntervalPingTimeout(self):
""" Test exception handling if ping_interval < ping_timeout
"""
def on_ping(app, msg):
print("Got a ping!")
app.close()
def on_pong(app, msg):
print("Got a pong! No need to respond")
app.close()
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong)
self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=1, ping_timeout=2, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testPingInterval(self):
""" Test WebSocketApp proper ping functionality
"""
def on_ping(app, msg):
print("Got a ping!")
app.close()
def on_pong(app, msg):
print("Got a pong! No need to respond")
app.close()
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1', on_ping=on_ping, on_pong=on_pong)
app.run_forever(ping_interval=2, ping_timeout=1, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testOpcodeClose(self):
""" Test WebSocketApp close opcode
"""
app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect')
app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testOpcodeBinary(self):
""" Test WebSocketApp binary opcode
"""
app = ws.WebSocketApp('streaming.vn.teslamotors.com/streaming/')
app.run_forever(ping_interval=2, ping_timeout=1, ping_payload="Ping payload")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testBadPingInterval(self):
""" A WebSocketApp handling of negative ping_interval
"""
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1')
self.assertRaises(ws.WebSocketException, app.run_forever, ping_interval=-5, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testBadPingTimeout(self):
""" A WebSocketApp handling of negative ping_timeout
"""
app = ws.WebSocketApp('wss://api-pub.bitfinex.com/ws/1')
self.assertRaises(ws.WebSocketException, app.run_forever, ping_timeout=-3, sslopt={"cert_reqs": ssl.CERT_NONE})
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testCloseStatusCode(self):
""" Test extraction of close frame status code and close reason in WebSocketApp
"""
def on_close(wsapp, close_status_code, close_msg):
print("on_close reached")
app = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect', on_close=on_close)
closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'\x03\xe8no-init-from-client')
self.assertEqual([1000, 'no-init-from-client'], app._get_close_args(closeframe))
closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'')
self.assertEqual([None, None], app._get_close_args(closeframe))
app2 = ws.WebSocketApp('wss://tsock.us1.twilio.com/v3/wsconnect')
closeframe = ws.ABNF(opcode=ws.ABNF.OPCODE_CLOSE, data=b'')
self.assertEqual([None, None], app2._get_close_args(closeframe))
self.assertRaises(ws.WebSocketConnectionClosedException, app.send, data="test if connection is closed")
if __name__ == "__main__":
unittest.main()

View file

@ -1,118 +0,0 @@
"""
"""
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import unittest
from websocket._cookiejar import SimpleCookieJar
class CookieJarTest(unittest.TestCase):
def testAdd(self):
cookie_jar = SimpleCookieJar()
cookie_jar.add("")
self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b")
self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; domain=.abc")
self.assertTrue(".abc" in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; domain=abc")
self.assertTrue(".abc" in cookie_jar.jar)
self.assertTrue("abc" not in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
self.assertEqual(cookie_jar.get(None), "")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
cookie_jar.add("e=f; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
cookie_jar.add("e=f; domain=.abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d; e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.add("a=b; c=d; domain=abc")
cookie_jar.add("e=f; domain=xyz")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
self.assertEqual(cookie_jar.get("xyz"), "e=f")
self.assertEqual(cookie_jar.get("something"), "")
def testSet(self):
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b")
self.assertFalse(cookie_jar.jar, "Cookie with no domain should not be added to the jar")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; domain=.abc")
self.assertTrue(".abc" in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; domain=abc")
self.assertTrue(".abc" in cookie_jar.jar)
self.assertTrue("abc" not in cookie_jar.jar)
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
cookie_jar.set("e=f; domain=abc")
self.assertEqual(cookie_jar.get("abc"), "e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
cookie_jar.set("e=f; domain=.abc")
self.assertEqual(cookie_jar.get("abc"), "e=f")
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc")
cookie_jar.set("e=f; domain=xyz")
self.assertEqual(cookie_jar.get("abc"), "a=b; c=d")
self.assertEqual(cookie_jar.get("xyz"), "e=f")
self.assertEqual(cookie_jar.get("something"), "")
def testGet(self):
cookie_jar = SimpleCookieJar()
cookie_jar.set("a=b; c=d; domain=abc.com")
self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("abc.com.es"), "")
self.assertEqual(cookie_jar.get("xabc.com"), "")
cookie_jar.set("a=b; c=d; domain=.abc.com")
self.assertEqual(cookie_jar.get("abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("x.abc.com"), "a=b; c=d")
self.assertEqual(cookie_jar.get("abc.com.es"), "")
self.assertEqual(cookie_jar.get("xabc.com"), "")

View file

@ -1,150 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import os.path
import websocket as ws
from websocket._http import proxy_info, read_headers, _open_proxied_socket, _tunnel, _get_addrinfo_list, connect
import sys
import unittest
import ssl
import websocket
import socks
import socket
sys.path[0:0] = [""]
# Skip test to access the internet.
TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
class SockMock(object):
def __init__(self):
self.data = []
self.sent = []
def add_packet(self, data):
self.data.append(data)
def gettimeout(self):
return None
def recv(self, bufsize):
if self.data:
e = self.data.pop(0)
if isinstance(e, Exception):
raise e
if len(e) > bufsize:
self.data.insert(0, e[bufsize:])
return e[:bufsize]
def send(self, data):
self.sent.append(data)
return len(data)
def close(self):
pass
class HeaderSockMock(SockMock):
def __init__(self, fname):
SockMock.__init__(self)
path = os.path.join(os.path.dirname(__file__), fname)
with open(path, "rb") as f:
self.add_packet(f.read())
class OptsList():
def __init__(self):
self.timeout = 1
self.sockopt = []
class HttpTest(unittest.TestCase):
def testReadHeader(self):
status, header, status_message = read_headers(HeaderSockMock("data/header01.txt"))
self.assertEqual(status, 101)
self.assertEqual(header["connection"], "Upgrade")
# header02.txt is intentionally malformed
self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
def testTunnel(self):
self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header01.txt"), "example.com", 80, ("username", "password"))
self.assertRaises(ws.WebSocketProxyException, _tunnel, HeaderSockMock("data/header02.txt"), "example.com", 80, ("username", "password"))
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testConnect(self):
# Not currently testing an actual proxy connection, so just check whether TypeError is raised. This requires internet for a DNS lookup
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host=None, http_proxy_port=None, proxy_type=None))
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http"))
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks4"))
self.assertRaises(TypeError, _open_proxied_socket, "wss://example.com", OptsList(), proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="socks5h"))
self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"))
self.assertRaises(TypeError, _get_addrinfo_list, None, 80, True, proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http"))
self.assertRaises(socks.ProxyConnectionError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=8080, proxy_type="socks4"), None)
self.assertRaises(socket.timeout, connect, "wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), None)
self.assertEqual(
connect("wss://google.com", OptsList(), proxy_info(http_proxy_host="8.8.8.8", http_proxy_port=8080, proxy_type="http"), True),
(True, ("google.com", 443, "/")))
# The following test fails on Mac OS with a gaierror, not an OverflowError
# self.assertRaises(OverflowError, connect, "wss://example.com", OptsList(), proxy_info(http_proxy_host="127.0.0.1", http_proxy_port=99999, proxy_type="socks4", timeout=2), False)
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSSLopt(self):
ssloptions = {
"cert_reqs": ssl.CERT_NONE,
"check_hostname": False,
"ssl_version": ssl.PROTOCOL_SSLv23,
"ciphers": "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:\
TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:\
ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:\
ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:\
DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:\
ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES128-GCM-SHA256:\
ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:\
DHE-RSA-AES256-SHA256:ECDHE-ECDSA-AES128-SHA256:\
ECDHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA256:\
ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA",
"ecdh_curve": "prime256v1"
}
ws_ssl1 = websocket.WebSocket(sslopt=ssloptions)
ws_ssl1.connect("wss://api.bitfinex.com/ws/2")
ws_ssl1.send("Hello")
ws_ssl1.close()
ws_ssl2 = websocket.WebSocket(sslopt={"check_hostname": True})
ws_ssl2.connect("wss://api.bitfinex.com/ws/2")
ws_ssl2.close
def testProxyInfo(self):
self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").type, "http")
self.assertRaises(ValueError, proxy_info, http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="badval")
self.assertEqual(proxy_info(http_proxy_host="example.com", http_proxy_port="8080", proxy_type="http").host, "example.com")
self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").port, "8080")
self.assertEqual(proxy_info(http_proxy_host="127.0.0.1", http_proxy_port="8080", proxy_type="http").auth, None)
if __name__ == "__main__":
unittest.main()

View file

@ -1,301 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import sys
import os
import unittest
sys.path[0:0] = [""]
from websocket._url import get_proxy_info, parse_url, _is_address_in_network, _is_no_proxy_host
class UrlTest(unittest.TestCase):
def test_address_in_network(self):
self.assertTrue(_is_address_in_network('127.0.0.1', '127.0.0.0/8'))
self.assertTrue(_is_address_in_network('127.1.0.1', '127.0.0.0/8'))
self.assertFalse(_is_address_in_network('127.1.0.1', '127.0.0.0/24'))
def testParseUrl(self):
p = parse_url("ws://www.example.com/r")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com/r/")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/r/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com/")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com:8080/r")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com:8080/")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("ws://www.example.com:8080")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/")
self.assertEqual(p[3], False)
p = parse_url("wss://www.example.com:8080/r")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], True)
p = parse_url("wss://www.example.com:8080/r?key=value")
self.assertEqual(p[0], "www.example.com")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r?key=value")
self.assertEqual(p[3], True)
self.assertRaises(ValueError, parse_url, "http://www.example.com/r")
p = parse_url("ws://[2a03:4000:123:83::3]/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 80)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("ws://[2a03:4000:123:83::3]:8080/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], False)
p = parse_url("wss://[2a03:4000:123:83::3]/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 443)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], True)
p = parse_url("wss://[2a03:4000:123:83::3]:8080/r")
self.assertEqual(p[0], "2a03:4000:123:83::3")
self.assertEqual(p[1], 8080)
self.assertEqual(p[2], "/r")
self.assertEqual(p[3], True)
class IsNoProxyHostTest(unittest.TestCase):
def setUp(self):
self.no_proxy = os.environ.get("no_proxy", None)
if "no_proxy" in os.environ:
del os.environ["no_proxy"]
def tearDown(self):
if self.no_proxy:
os.environ["no_proxy"] = self.no_proxy
elif "no_proxy" in os.environ:
del os.environ["no_proxy"]
def testMatchAll(self):
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['*']))
self.assertTrue(_is_no_proxy_host("192.168.0.1", ['*']))
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['other.websocket.org', '*']))
os.environ['no_proxy'] = '*'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
self.assertTrue(_is_no_proxy_host("192.168.0.1", None))
os.environ['no_proxy'] = 'other.websocket.org, *'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
def testIpAddress(self):
self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.1']))
self.assertFalse(_is_no_proxy_host("127.0.0.2", ['127.0.0.1']))
self.assertTrue(_is_no_proxy_host("127.0.0.1", ['other.websocket.org', '127.0.0.1']))
self.assertFalse(_is_no_proxy_host("127.0.0.2", ['other.websocket.org', '127.0.0.1']))
os.environ['no_proxy'] = '127.0.0.1'
self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
os.environ['no_proxy'] = 'other.websocket.org, 127.0.0.1'
self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
self.assertFalse(_is_no_proxy_host("127.0.0.2", None))
def testIpAddressInRange(self):
self.assertTrue(_is_no_proxy_host("127.0.0.1", ['127.0.0.0/8']))
self.assertTrue(_is_no_proxy_host("127.0.0.2", ['127.0.0.0/8']))
self.assertFalse(_is_no_proxy_host("127.1.0.1", ['127.0.0.0/24']))
os.environ['no_proxy'] = '127.0.0.0/8'
self.assertTrue(_is_no_proxy_host("127.0.0.1", None))
self.assertTrue(_is_no_proxy_host("127.0.0.2", None))
os.environ['no_proxy'] = '127.0.0.0/24'
self.assertFalse(_is_no_proxy_host("127.1.0.1", None))
def testHostnameMatch(self):
self.assertTrue(_is_no_proxy_host("my.websocket.org", ['my.websocket.org']))
self.assertTrue(_is_no_proxy_host("my.websocket.org", ['other.websocket.org', 'my.websocket.org']))
self.assertFalse(_is_no_proxy_host("my.websocket.org", ['other.websocket.org']))
os.environ['no_proxy'] = 'my.websocket.org'
self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
self.assertFalse(_is_no_proxy_host("other.websocket.org", None))
os.environ['no_proxy'] = 'other.websocket.org, my.websocket.org'
self.assertTrue(_is_no_proxy_host("my.websocket.org", None))
def testHostnameMatchDomain(self):
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['.websocket.org']))
self.assertTrue(_is_no_proxy_host("my.other.websocket.org", ['.websocket.org']))
self.assertTrue(_is_no_proxy_host("any.websocket.org", ['my.websocket.org', '.websocket.org']))
self.assertFalse(_is_no_proxy_host("any.websocket.com", ['.websocket.org']))
os.environ['no_proxy'] = '.websocket.org'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
self.assertTrue(_is_no_proxy_host("my.other.websocket.org", None))
self.assertFalse(_is_no_proxy_host("any.websocket.com", None))
os.environ['no_proxy'] = 'my.websocket.org, .websocket.org'
self.assertTrue(_is_no_proxy_host("any.websocket.org", None))
class ProxyInfoTest(unittest.TestCase):
def setUp(self):
self.http_proxy = os.environ.get("http_proxy", None)
self.https_proxy = os.environ.get("https_proxy", None)
self.no_proxy = os.environ.get("no_proxy", None)
if "http_proxy" in os.environ:
del os.environ["http_proxy"]
if "https_proxy" in os.environ:
del os.environ["https_proxy"]
if "no_proxy" in os.environ:
del os.environ["no_proxy"]
def tearDown(self):
if self.http_proxy:
os.environ["http_proxy"] = self.http_proxy
elif "http_proxy" in os.environ:
del os.environ["http_proxy"]
if self.https_proxy:
os.environ["https_proxy"] = self.https_proxy
elif "https_proxy" in os.environ:
del os.environ["https_proxy"]
if self.no_proxy:
os.environ["no_proxy"] = self.no_proxy
elif "no_proxy" in os.environ:
del os.environ["no_proxy"]
def testProxyFromArgs(self):
self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost"), ("localhost", 0, None))
self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128),
("localhost", 3128, None))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost"), ("localhost", 0, None))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128),
("localhost", 3128, None))
self.assertEqual(get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_auth=("a", "b")),
("localhost", 0, ("a", "b")))
self.assertEqual(
get_proxy_info("echo.websocket.org", False, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
("localhost", 3128, ("a", "b")))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_auth=("a", "b")),
("localhost", 0, ("a", "b")))
self.assertEqual(
get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128, proxy_auth=("a", "b")),
("localhost", 3128, ("a", "b")))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128,
no_proxy=["example.com"], proxy_auth=("a", "b")),
("localhost", 3128, ("a", "b")))
self.assertEqual(get_proxy_info("echo.websocket.org", True, proxy_host="localhost", proxy_port=3128,
no_proxy=["echo.websocket.org"], proxy_auth=("a", "b")),
(None, 0, None))
def testProxyFromEnv(self):
os.environ["http_proxy"] = "http://localhost/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
os.environ["http_proxy"] = "http://localhost:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
os.environ["http_proxy"] = "http://localhost/"
os.environ["https_proxy"] = "http://localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, None))
os.environ["http_proxy"] = "http://localhost:3128/"
os.environ["https_proxy"] = "http://localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, None))
os.environ["http_proxy"] = "http://localhost/"
os.environ["https_proxy"] = "http://localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, None))
os.environ["http_proxy"] = "http://localhost:3128/"
os.environ["https_proxy"] = "http://localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, None))
os.environ["http_proxy"] = "http://a:b@localhost/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost/"
os.environ["https_proxy"] = "http://a:b@localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", False), ("localhost", 3128, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost/"
os.environ["https_proxy"] = "http://a:b@localhost2/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
self.assertEqual(get_proxy_info("echo.websocket.org", True), ("localhost2", 3128, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost/"
os.environ["https_proxy"] = "http://a:b@localhost2/"
os.environ["no_proxy"] = "example1.com,example2.com"
self.assertEqual(get_proxy_info("example.1.com", True), ("localhost2", None, ("a", "b")))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
os.environ["no_proxy"] = "example1.com,example2.com, echo.websocket.org"
self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
os.environ["no_proxy"] = "example1.com,example2.com, .websocket.org"
self.assertEqual(get_proxy_info("echo.websocket.org", True), (None, 0, None))
os.environ["http_proxy"] = "http://a:b@localhost:3128/"
os.environ["https_proxy"] = "http://a:b@localhost2:3128/"
os.environ["no_proxy"] = "127.0.0.0/8, 192.168.0.0/16"
self.assertEqual(get_proxy_info("127.0.0.1", False), (None, 0, None))
self.assertEqual(get_proxy_info("192.168.1.1", False), (None, 0, None))
if __name__ == "__main__":
unittest.main()

View file

@ -1,452 +0,0 @@
# -*- coding: utf-8 -*-
#
"""
"""
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""
import sys
sys.path[0:0] = [""]
import os
import os.path
import socket
import websocket as ws
from websocket._handshake import _create_sec_websocket_key, \
_validate as _validate_header
from websocket._http import read_headers
from websocket._utils import validate_utf8
from base64 import decodebytes as base64decode
import unittest
try:
import ssl
from ssl import SSLError
except ImportError:
# dummy class of SSLError for ssl none-support environment.
class SSLError(Exception):
pass
# Skip test to access the internet.
TEST_WITH_INTERNET = os.environ.get('TEST_WITH_INTERNET', '0') == '1'
TRACEABLE = True
def create_mask_key(_):
return "abcd"
class SockMock(object):
def __init__(self):
self.data = []
self.sent = []
def add_packet(self, data):
self.data.append(data)
def gettimeout(self):
return None
def recv(self, bufsize):
if self.data:
e = self.data.pop(0)
if isinstance(e, Exception):
raise e
if len(e) > bufsize:
self.data.insert(0, e[bufsize:])
return e[:bufsize]
def send(self, data):
self.sent.append(data)
return len(data)
def close(self):
pass
class HeaderSockMock(SockMock):
def __init__(self, fname):
SockMock.__init__(self)
path = os.path.join(os.path.dirname(__file__), fname)
with open(path, "rb") as f:
self.add_packet(f.read())
class WebSocketTest(unittest.TestCase):
def setUp(self):
ws.enableTrace(TRACEABLE)
def tearDown(self):
pass
def testDefaultTimeout(self):
self.assertEqual(ws.getdefaulttimeout(), None)
ws.setdefaulttimeout(10)
self.assertEqual(ws.getdefaulttimeout(), 10)
ws.setdefaulttimeout(None)
def testWSKey(self):
key = _create_sec_websocket_key()
self.assertTrue(key != 24)
self.assertTrue(str("¥n") not in key)
def testNonce(self):
""" WebSocket key should be a random 16-byte nonce.
"""
key = _create_sec_websocket_key()
nonce = base64decode(key.encode("utf-8"))
self.assertEqual(16, len(nonce))
def testWsUtils(self):
key = "c6b8hTg4EeGb2gQMztV1/g=="
required_header = {
"upgrade": "websocket",
"connection": "upgrade",
"sec-websocket-accept": "Kxep+hNu9n51529fGidYu7a3wO0="}
self.assertEqual(_validate_header(required_header, key, None), (True, None))
header = required_header.copy()
header["upgrade"] = "http"
self.assertEqual(_validate_header(header, key, None), (False, None))
del header["upgrade"]
self.assertEqual(_validate_header(header, key, None), (False, None))
header = required_header.copy()
header["connection"] = "something"
self.assertEqual(_validate_header(header, key, None), (False, None))
del header["connection"]
self.assertEqual(_validate_header(header, key, None), (False, None))
header = required_header.copy()
header["sec-websocket-accept"] = "something"
self.assertEqual(_validate_header(header, key, None), (False, None))
del header["sec-websocket-accept"]
self.assertEqual(_validate_header(header, key, None), (False, None))
header = required_header.copy()
header["sec-websocket-protocol"] = "sub1"
self.assertEqual(_validate_header(header, key, ["sub1", "sub2"]), (True, "sub1"))
# This case will print out a logging error using the error() function, but that is expected
self.assertEqual(_validate_header(header, key, ["sub2", "sub3"]), (False, None))
header = required_header.copy()
header["sec-websocket-protocol"] = "sUb1"
self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (True, "sub1"))
header = required_header.copy()
# This case will print out a logging error using the error() function, but that is expected
self.assertEqual(_validate_header(header, key, ["Sub1", "suB2"]), (False, None))
def testReadHeader(self):
status, header, status_message = read_headers(HeaderSockMock("data/header01.txt"))
self.assertEqual(status, 101)
self.assertEqual(header["connection"], "Upgrade")
status, header, status_message = read_headers(HeaderSockMock("data/header03.txt"))
self.assertEqual(status, 101)
self.assertEqual(header["connection"], "Upgrade, Keep-Alive")
HeaderSockMock("data/header02.txt")
self.assertRaises(ws.WebSocketException, read_headers, HeaderSockMock("data/header02.txt"))
def testSend(self):
# TODO: add longer frame data
sock = ws.WebSocket()
sock.set_mask_key(create_mask_key)
s = sock.sock = HeaderSockMock("data/header01.txt")
sock.send("Hello")
self.assertEqual(s.sent[0], b'\x81\x85abcd)\x07\x0f\x08\x0e')
sock.send("こんにちは")
self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc')
# sock.send("x" * 5000)
# self.assertEqual(s.sent[1], b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc")
self.assertEqual(sock.send_binary(b'1111111111101'), 19)
def testRecv(self):
# TODO: add longer frame data
sock = ws.WebSocket()
s = sock.sock = SockMock()
something = b'\x81\x8fabcd\x82\xe3\xf0\x87\xe3\xf1\x80\xe5\xca\x81\xe2\xc5\x82\xe3\xcc'
s.add_packet(something)
data = sock.recv()
self.assertEqual(data, "こんにちは")
s.add_packet(b'\x81\x85abcd)\x07\x0f\x08\x0e')
data = sock.recv()
self.assertEqual(data, "Hello")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testIter(self):
count = 2
for _ in ws.create_connection('wss://stream.meetup.com/2/rsvps'):
count -= 1
if count == 0:
break
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testNext(self):
sock = ws.create_connection('wss://stream.meetup.com/2/rsvps')
self.assertEqual(str, type(next(sock)))
def testInternalRecvStrict(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
s.add_packet(b'foo')
s.add_packet(socket.timeout())
s.add_packet(b'bar')
# s.add_packet(SSLError("The read operation timed out"))
s.add_packet(b'baz')
with self.assertRaises(ws.WebSocketTimeoutException):
sock.frame_buffer.recv_strict(9)
# with self.assertRaises(SSLError):
# data = sock._recv_strict(9)
data = sock.frame_buffer.recv_strict(9)
self.assertEqual(data, b'foobarbaz')
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.frame_buffer.recv_strict(1)
def testRecvTimeout(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
s.add_packet(b'\x81')
s.add_packet(socket.timeout())
s.add_packet(b'\x8dabcd\x29\x07\x0f\x08\x0e')
s.add_packet(socket.timeout())
s.add_packet(b'\x4e\x43\x33\x0e\x10\x0f\x00\x40')
with self.assertRaises(ws.WebSocketTimeoutException):
sock.recv()
with self.assertRaises(ws.WebSocketTimeoutException):
sock.recv()
data = sock.recv()
self.assertEqual(data, "Hello, World!")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testRecvWithSimpleFragmentation(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17')
data = sock.recv()
self.assertEqual(data, "Brevity is the soul of wit")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testRecvWithFireEventOfFragmentation(self):
sock = ws.WebSocket(fire_cont_frame=True)
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x01\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
# OPCODE=CONT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x00\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17')
_, data = sock.recv_data()
self.assertEqual(data, b'Brevity is ')
_, data = sock.recv_data()
self.assertEqual(data, b'Brevity is ')
_, data = sock.recv_data()
self.assertEqual(data, b'the soul of wit')
# OPCODE=CONT, FIN=0, MSG="Brevity is "
s.add_packet(b'\x80\x8babcd#\x10\x06\x12\x08\x16\x1aD\x08\x11C')
with self.assertRaises(ws.WebSocketException):
sock.recv_data()
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testClose(self):
sock = ws.WebSocket()
sock.connected = True
self.assertRaises(ws._exceptions.WebSocketConnectionClosedException, sock.close)
sock = ws.WebSocket()
s = sock.sock = SockMock()
sock.connected = True
s.add_packet(b'\x88\x80\x17\x98p\x84')
sock.recv()
self.assertEqual(sock.connected, False)
def testRecvContFragmentation(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
# OPCODE=CONT, FIN=1, MSG="the soul of wit"
s.add_packet(b'\x80\x8fabcd\x15\n\x06D\x12\r\x16\x08A\r\x05D\x16\x0b\x17')
self.assertRaises(ws.WebSocketException, sock.recv)
def testRecvWithProlongedFragmentation(self):
sock = ws.WebSocket()
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Once more unto the breach, "
s.add_packet(b'\x01\x9babcd.\x0c\x00\x01A\x0f\x0c\x16\x04B\x16\n\x15\rC\x10\t\x07C\x06\x13\x07\x02\x07\tNC')
# OPCODE=CONT, FIN=0, MSG="dear friends, "
s.add_packet(b'\x00\x8eabcd\x05\x07\x02\x16A\x04\x11\r\x04\x0c\x07\x17MB')
# OPCODE=CONT, FIN=1, MSG="once more"
s.add_packet(b'\x80\x89abcd\x0e\x0c\x00\x01A\x0f\x0c\x16\x04')
data = sock.recv()
self.assertEqual(
data,
"Once more unto the breach, dear friends, once more")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
def testRecvWithFragmentationAndControlFrame(self):
sock = ws.WebSocket()
sock.set_mask_key(create_mask_key)
s = sock.sock = SockMock()
# OPCODE=TEXT, FIN=0, MSG="Too much "
s.add_packet(b'\x01\x89abcd5\r\x0cD\x0c\x17\x00\x0cA')
# OPCODE=PING, FIN=1, MSG="Please PONG this"
s.add_packet(b'\x89\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17')
# OPCODE=CONT, FIN=1, MSG="of a good thing"
s.add_packet(b'\x80\x8fabcd\x0e\x04C\x05A\x05\x0c\x0b\x05B\x17\x0c\x08\x0c\x04')
data = sock.recv()
self.assertEqual(data, "Too much of a good thing")
with self.assertRaises(ws.WebSocketConnectionClosedException):
sock.recv()
self.assertEqual(
s.sent[0],
b'\x8a\x90abcd1\x0e\x06\x05\x12\x07C4.,$D\x15\n\n\x17')
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testWebSocket(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.send("こにゃにゃちは、世界")
result = s.recv()
self.assertEqual(result, "こにゃにゃちは、世界")
self.assertRaises(ValueError, s.send_close, -1, "")
s.close()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testPingPong(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.ping("Hello")
s.pong("Hi")
s.close()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSecureWebSocket(self):
import ssl
s = ws.create_connection("wss://api.bitfinex.com/ws/2")
self.assertNotEqual(s, None)
self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
self.assertEqual(s.getstatus(), 101)
self.assertNotEqual(s.getheaders(), None)
s.settimeout(10)
self.assertEqual(s.gettimeout(), 10)
self.assertEqual(s.getsubprotocol(), None)
s.abort()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testWebSocketWithCustomHeader(self):
s = ws.create_connection("ws://echo.websocket.org/",
headers={"User-Agent": "PythonWebsocketClient"})
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
self.assertRaises(ValueError, s.close, -1, "")
s.close()
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testAfterClose(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.close()
self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
class SockOptTest(unittest.TestCase):
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testSockOpt(self):
sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt)
self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
s.close()
class UtilsTest(unittest.TestCase):
def testUtf8Validator(self):
state = validate_utf8(b'\xf0\x90\x80\x80')
self.assertEqual(state, True)
state = validate_utf8(b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80edited')
self.assertEqual(state, False)
state = validate_utf8(b'')
self.assertEqual(state, True)
class HandshakeTest(unittest.TestCase):
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def test_http_SSL(self):
websock1 = ws.WebSocket(sslopt={"cert_chain": ssl.get_default_verify_paths().capath})
self.assertRaises(ValueError,
websock1.connect, "wss://api.bitfinex.com/ws/2")
websock2 = ws.WebSocket(sslopt={"certfile": "myNonexistentCertFile"})
self.assertRaises(FileNotFoundError,
websock2.connect, "wss://api.bitfinex.com/ws/2")
@unittest.skipUnless(TEST_WITH_INTERNET, "Internet-requiring tests are disabled")
def testManualHeaders(self):
websock3 = ws.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE,
"ca_certs": ssl.get_default_verify_paths().capath,
"ca_cert_path": ssl.get_default_verify_paths().openssl_cafile})
self.assertRaises(ws._exceptions.WebSocketBadStatusException,
websock3.connect, "wss://api.bitfinex.com/ws/2", cookie="chocolate",
origin="testing_websockets.com",
host="echo.websocket.org/websocket-client-test",
subprotocols=["testproto"],
connection="Upgrade",
header={"CustomHeader1":"123",
"Cookie":"TestValue",
"Sec-WebSocket-Key":"k9kFAUWNAMmf5OEMfTlOEA==",
"Sec-WebSocket-Protocol":"newprotocol"})
def testIPv6(self):
websock2 = ws.WebSocket()
self.assertRaises(ValueError, websock2.connect, "2001:4860:4860::8888")
def testBadURLs(self):
websock3 = ws.WebSocket()
self.assertRaises(ValueError, websock3.connect, "ws//example.com")
self.assertRaises(ws.WebSocketAddressException, websock3.connect, "ws://example")
self.assertRaises(ValueError, websock3.connect, "example.com")
if __name__ == "__main__":
unittest.main()

View file

@ -7,6 +7,7 @@
from logging import getLogger from logging import getLogger
import re import re
import socket import socket
import xml.etree.ElementTree as etree
import xbmc import xbmc
@ -25,7 +26,7 @@ def get_etree(topelement):
except IOError: except IOError:
# Document is blank or missing # Document is blank or missing
LOG.info('%s.xml is missing or blank, creating it', topelement) LOG.info('%s.xml is missing or blank, creating it', topelement)
root = utils.etree.Element(topelement) root = etree.Element(topelement)
except utils.ParseError: except utils.ParseError:
LOG.error('Error parsing %s', topelement) LOG.error('Error parsing %s', topelement)
# "Kodi cannot parse {0}. PKC will not function correctly. Please visit # "Kodi cannot parse {0}. PKC will not function correctly. Please visit
@ -107,10 +108,10 @@ def start():
top_element='sources') as xml: top_element='sources') as xml:
files = xml.root.find('files') files = xml.root.find('files')
if files is None: if files is None:
files = utils.etree.SubElement(xml.root, 'files') files = etree.SubElement(xml.root, 'files')
utils.etree.SubElement(files, etree.SubElement(files,
'default', 'default',
attrib={'pathversion': '1'}) attrib={'pathversion': '1'})
for source in files: for source in files:
entry = source.find('path') entry = source.find('path')
if entry is None: if entry is None:
@ -123,12 +124,12 @@ def start():
else: else:
# Need to add an element for our hostname # Need to add an element for our hostname
LOG.debug('Adding subelement to sources.xml for %s', hostname) LOG.debug('Adding subelement to sources.xml for %s', hostname)
source = utils.etree.SubElement(files, 'source') source = etree.SubElement(files, 'source')
utils.etree.SubElement(source, 'name').text = 'PKC %s' % hostname etree.SubElement(source, 'name').text = 'PKC %s' % hostname
utils.etree.SubElement(source, etree.SubElement(source,
'path', 'path',
attrib={'pathversion': '1'}).text = '%s/' % path attrib={'pathversion': '1'}).text = '%s/' % path
utils.etree.SubElement(source, 'allowsharing').text = 'false' etree.SubElement(source, 'allowsharing').text = 'false'
xml.write_xml = True xml.write_xml = True
except utils.ParseError: except utils.ParseError:
return return
@ -146,7 +147,7 @@ def start():
'replacing it', 'replacing it',
path) path)
xml.root.remove(entry) xml.root.remove(entry)
entry = utils.etree.SubElement(xml.root, 'path') entry = etree.SubElement(xml.root, 'path')
# "Username" # "Username"
user = utils.dialog('input', utils.lang(1014)) user = utils.dialog('input', utils.lang(1014))
if user is None: if user is None:
@ -162,13 +163,13 @@ def start():
type='{alphanum}', type='{alphanum}',
option='{hide}') option='{hide}')
password = utils.quote(password) password = utils.quote(password)
utils.etree.SubElement(entry, etree.SubElement(entry,
'from', 'from',
attrib={'pathversion': '1'}).text = f'{path}/' attrib={'pathversion': '1'}).text = f'{path}/'
login = f'{protocol}://{user}:{password}@{hostname}/' login = f'{protocol}://{user}:{password}@{hostname}/'
utils.etree.SubElement(entry, etree.SubElement(entry,
'to', 'to',
attrib={'pathversion': '1'}).text = login attrib={'pathversion': '1'}).text = login
xml.write_xml = True xml.write_xml = True
except utils.ParseError: except utils.ParseError:
return return