Merge pull request #1562 from croneter/py3-fix-attributeerror

Rewire defusedxml and xml.etree.ElementTree: Fix AttributeError: module 'resources.lib.utils' has no attribute 'ParseError'
This commit is contained in:
croneter 2021-07-25 15:25:09 +02:00 committed by GitHub
commit 0835869256
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1051 additions and 71 deletions

View file

@ -1,5 +1,4 @@
exclude_paths: exclude_paths:
- 'resources/lib/watchdog/**' - 'resources/lib/watchdog/**'
- 'resources/lib/pathtools/**' - 'resources/lib/pathtools/**'
- 'resources/lib/pathtools/**' - 'resources/lib/defusedxml/**'
- 'resources/lib/defused_etree.py'

View file

@ -1,41 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
xml.etree.ElementTree tries to encode with text.encode('ascii') - which is
just plain BS. This etree will always return unicode, not string
"""
# Originally tried faster cElementTree, but does NOT work reliably with Kodi
from defusedxml.ElementTree import DefusedXMLParser, _generate_etree_functions
from xml.etree.ElementTree import TreeBuilder as _TreeBuilder
from xml.etree.ElementTree import parse as _parse
from xml.etree.ElementTree import iterparse as _iterparse
from xml.etree.ElementTree import tostring
# Enable creation of new xmls and xml elements
from xml.etree.ElementTree import ElementTree, Element, SubElement, ParseError
class UnicodeXMLParser(DefusedXMLParser):
"""
PKC Hack to ensure we're always receiving unicode, not str
"""
@staticmethod
def _fixtext(text):
"""
Do NOT try to convert every entry to str with entry.encode('ascii')!
"""
return text
# aliases
XMLTreeBuilder = XMLParse = UnicodeXMLParser
parse, iterparse, fromstring = _generate_etree_functions(UnicodeXMLParser,
_TreeBuilder, _parse,
_iterparse)
XML = fromstring
__all__ = ['XML', 'XMLParse', 'XMLTreeBuilder', 'fromstring', 'iterparse',
'parse', 'tostring']

View file

@ -0,0 +1,188 @@
# defusedxml
#
# Copyright (c) 2013-2020 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.etree.ElementTree facade
"""
from __future__ import print_function, absolute_import
import sys
import warnings
from xml.etree.ElementTree import ParseError
from xml.etree.ElementTree import TreeBuilder as _TreeBuilder
from xml.etree.ElementTree import parse as _parse
from xml.etree.ElementTree import tostring
import importlib
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xml.etree.ElementTree"
def _get_py3_cls():
"""Python 3.3 hides the pure Python code but defusedxml requires it.
The code is based on test.support.import_fresh_module().
"""
pymodname = "xml.etree.ElementTree"
cmodname = "_elementtree"
pymod = sys.modules.pop(pymodname, None)
cmod = sys.modules.pop(cmodname, None)
sys.modules[cmodname] = None
try:
pure_pymod = importlib.import_module(pymodname)
finally:
# restore module
sys.modules[pymodname] = pymod
if cmod is not None:
sys.modules[cmodname] = cmod
else:
sys.modules.pop(cmodname, None)
# restore attribute on original package
etree_pkg = sys.modules["xml.etree"]
if pymod is not None:
etree_pkg.ElementTree = pymod
elif hasattr(etree_pkg, "ElementTree"):
del etree_pkg.ElementTree
_XMLParser = pure_pymod.XMLParser
_iterparse = pure_pymod.iterparse
# patch pure module to use ParseError from C extension
pure_pymod.ParseError = ParseError
return _XMLParser, _iterparse
_XMLParser, _iterparse = _get_py3_cls()
_sentinel = object()
class DefusedXMLParser(_XMLParser):
def __init__(
self,
html=_sentinel,
target=None,
encoding=None,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
super().__init__(target=target, encoding=encoding)
if html is not _sentinel:
# the 'html' argument has been deprecated and ignored in all
# supported versions of Python. Python 3.8 finally removed it.
if html:
raise TypeError("'html=True' is no longer supported.")
else:
warnings.warn(
"'html' keyword argument is no longer supported. Pass "
"in arguments as keyword arguments.",
category=DeprecationWarning,
)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
parser = self.parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
# aliases
# XMLParse is a typo, keep it for backwards compatibility
XMLTreeBuilder = XMLParse = XMLParser = DefusedXMLParser
def parse(source, parser=None, forbid_dtd=False, forbid_entities=True, forbid_external=True):
if parser is None:
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
return _parse(source, parser)
def iterparse(
source,
events=None,
parser=None,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
if parser is None:
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
return _iterparse(source, events, parser)
def fromstring(text, forbid_dtd=False, forbid_entities=True, forbid_external=True):
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
parser.feed(text)
return parser.close()
XML = fromstring
def fromstringlist(sequence, forbid_dtd=False, forbid_entities=True, forbid_external=True):
parser = DefusedXMLParser(
target=_TreeBuilder(),
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
for text in sequence:
parser.feed(text)
return parser.close()
__all__ = [
"ParseError",
"XML",
"XMLParse",
"XMLParser",
"XMLTreeBuilder",
"fromstring",
"fromstringlist",
"iterparse",
"parse",
"tostring",
]

View file

@ -0,0 +1,67 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defuse XML bomb denial of service vulnerabilities
"""
from __future__ import print_function, absolute_import
import warnings
from .common import (
DefusedXmlException,
DTDForbidden,
EntitiesForbidden,
ExternalReferenceForbidden,
NotSupportedError,
_apply_defusing,
)
def defuse_stdlib():
"""Monkey patch and defuse all stdlib packages
:warning: The monkey patch is an EXPERIMETNAL feature.
"""
defused = {}
with warnings.catch_warnings():
from . import cElementTree
from . import ElementTree
from . import minidom
from . import pulldom
from . import sax
from . import expatbuilder
from . import expatreader
from . import xmlrpc
xmlrpc.monkey_patch()
defused[xmlrpc] = None
defused_mods = [
cElementTree,
ElementTree,
minidom,
pulldom,
sax,
expatbuilder,
expatreader,
]
for defused_mod in defused_mods:
stdlib_mod = _apply_defusing(defused_mod)
defused[defused_mod] = stdlib_mod
return defused
__version__ = "0.8.0.dev1"
__all__ = [
"DefusedXmlException",
"DTDForbidden",
"EntitiesForbidden",
"ExternalReferenceForbidden",
"NotSupportedError",
]

View file

@ -0,0 +1,47 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.etree.cElementTree
"""
import warnings
# This module is an alias for ElementTree just like xml.etree.cElementTree
from .ElementTree import (
XML,
XMLParse,
XMLParser,
XMLTreeBuilder,
fromstring,
fromstringlist,
iterparse,
parse,
tostring,
DefusedXMLParser,
ParseError,
)
__origin__ = "xml.etree.cElementTree"
warnings.warn(
"defusedxml.cElementTree is deprecated, import from defusedxml.ElementTree instead.",
category=DeprecationWarning,
stacklevel=2,
)
__all__ = [
"ParseError",
"XML",
"XMLParse",
"XMLParser",
"XMLTreeBuilder",
"fromstring",
"fromstringlist",
"iterparse",
"parse",
"tostring",
# backwards compatibility
"DefusedXMLParser",
]

View file

@ -0,0 +1,85 @@
# defusedxml
#
# Copyright (c) 2013-2020 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Common constants, exceptions and helpe functions
"""
import sys
import xml.parsers.expat
PY3 = True
# Fail early when pyexpat is not installed correctly
if not hasattr(xml.parsers.expat, "ParserCreate"):
raise ImportError("pyexpat") # pragma: no cover
class DefusedXmlException(ValueError):
"""Base exception"""
def __repr__(self):
return str(self)
class DTDForbidden(DefusedXmlException):
"""Document type definition is forbidden"""
def __init__(self, name, sysid, pubid):
super().__init__()
self.name = name
self.sysid = sysid
self.pubid = pubid
def __str__(self):
tpl = "DTDForbidden(name='{}', system_id={!r}, public_id={!r})"
return tpl.format(self.name, self.sysid, self.pubid)
class EntitiesForbidden(DefusedXmlException):
"""Entity definition is forbidden"""
def __init__(self, name, value, base, sysid, pubid, notation_name):
super().__init__()
self.name = name
self.value = value
self.base = base
self.sysid = sysid
self.pubid = pubid
self.notation_name = notation_name
def __str__(self):
tpl = "EntitiesForbidden(name='{}', system_id={!r}, public_id={!r})"
return tpl.format(self.name, self.sysid, self.pubid)
class ExternalReferenceForbidden(DefusedXmlException):
"""Resolving an external reference is forbidden"""
def __init__(self, context, base, sysid, pubid):
super().__init__()
self.context = context
self.base = base
self.sysid = sysid
self.pubid = pubid
def __str__(self):
tpl = "ExternalReferenceForbidden(system_id='{}', public_id={})"
return tpl.format(self.sysid, self.pubid)
class NotSupportedError(DefusedXmlException):
"""The operation is not supported"""
def _apply_defusing(defused_mod):
assert defused_mod is sys.modules[defused_mod.__name__]
stdlib_name = defused_mod.__origin__
__import__(stdlib_name, {}, {}, ["*"])
stdlib_mod = sys.modules[stdlib_name]
stdlib_names = set(dir(stdlib_mod))
for name, obj in vars(defused_mod).items():
if name.startswith("_") or name not in stdlib_names:
continue
setattr(stdlib_mod, name, obj)
return stdlib_mod

View file

@ -0,0 +1,107 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.dom.expatbuilder
"""
from __future__ import print_function, absolute_import
from xml.dom.expatbuilder import ExpatBuilder as _ExpatBuilder
from xml.dom.expatbuilder import Namespaces as _Namespaces
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xml.dom.expatbuilder"
class DefusedExpatBuilder(_ExpatBuilder):
"""Defused document builder"""
def __init__(
self, options=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
_ExpatBuilder.__init__(self, options)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def install(self, parser):
_ExpatBuilder.install(self, parser)
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
# if self._options.entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
class DefusedExpatBuilderNS(_Namespaces, DefusedExpatBuilder):
"""Defused document builder that supports namespaces."""
def install(self, parser):
DefusedExpatBuilder.install(self, parser)
if self._options.namespace_declarations:
parser.StartNamespaceDeclHandler = self.start_namespace_decl_handler
def reset(self):
DefusedExpatBuilder.reset(self)
self._initNamespaces()
def parse(file, namespaces=True, forbid_dtd=False, forbid_entities=True, forbid_external=True):
"""Parse a document, returning the resulting Document node.
'file' may be either a file name or an open file object.
"""
if namespaces:
build_builder = DefusedExpatBuilderNS
else:
build_builder = DefusedExpatBuilder
builder = build_builder(
forbid_dtd=forbid_dtd, forbid_entities=forbid_entities, forbid_external=forbid_external
)
if isinstance(file, str):
fp = open(file, "rb")
try:
result = builder.parseFile(fp)
finally:
fp.close()
else:
result = builder.parseFile(file)
return result
def parseString(
string, namespaces=True, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
"""Parse a document from a string, returning the resulting
Document node.
"""
if namespaces:
build_builder = DefusedExpatBuilderNS
else:
build_builder = DefusedExpatBuilder
builder = build_builder(
forbid_dtd=forbid_dtd, forbid_entities=forbid_entities, forbid_external=forbid_external
)
return builder.parseString(string)

View file

@ -0,0 +1,61 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.sax.expatreader
"""
from __future__ import print_function, absolute_import
from xml.sax.expatreader import ExpatParser as _ExpatParser
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xml.sax.expatreader"
class DefusedExpatParser(_ExpatParser):
"""Defused SAX driver for the pyexpat C module."""
def __init__(
self,
namespaceHandling=0,
bufsize=2 ** 16 - 20,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
super().__init__(namespaceHandling, bufsize)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def reset(self):
super().reset()
parser = self._parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def create_parser(*args, **kwargs):
return DefusedExpatParser(*args, **kwargs)

View file

@ -0,0 +1,153 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""DEPRECATED Example code for lxml.etree protection
The code has NO protection against decompression bombs.
"""
from __future__ import print_function, absolute_import
import threading
import warnings
from lxml import etree as _etree
from .common import DTDForbidden, EntitiesForbidden, NotSupportedError
LXML3 = _etree.LXML_VERSION[0] >= 3
__origin__ = "lxml.etree"
tostring = _etree.tostring
warnings.warn(
"defusedxml.lxml is no longer supported and will be removed in a future release.",
category=DeprecationWarning,
stacklevel=2,
)
class RestrictedElement(_etree.ElementBase):
"""A restricted Element class that filters out instances of some classes"""
__slots__ = ()
# blacklist = (etree._Entity, etree._ProcessingInstruction, etree._Comment)
blacklist = _etree._Entity
def _filter(self, iterator):
blacklist = self.blacklist
for child in iterator:
if isinstance(child, blacklist):
continue
yield child
def __iter__(self):
iterator = super(RestrictedElement, self).__iter__()
return self._filter(iterator)
def iterchildren(self, tag=None, reversed=False):
iterator = super(RestrictedElement, self).iterchildren(tag=tag, reversed=reversed)
return self._filter(iterator)
def iter(self, tag=None, *tags):
iterator = super(RestrictedElement, self).iter(tag=tag, *tags)
return self._filter(iterator)
def iterdescendants(self, tag=None, *tags):
iterator = super(RestrictedElement, self).iterdescendants(tag=tag, *tags)
return self._filter(iterator)
def itersiblings(self, tag=None, preceding=False):
iterator = super(RestrictedElement, self).itersiblings(tag=tag, preceding=preceding)
return self._filter(iterator)
def getchildren(self):
iterator = super(RestrictedElement, self).__iter__()
return list(self._filter(iterator))
def getiterator(self, tag=None):
iterator = super(RestrictedElement, self).getiterator(tag)
return self._filter(iterator)
class GlobalParserTLS(threading.local):
"""Thread local context for custom parser instances"""
parser_config = {
"resolve_entities": False,
# 'remove_comments': True,
# 'remove_pis': True,
}
element_class = RestrictedElement
def createDefaultParser(self):
parser = _etree.XMLParser(**self.parser_config)
element_class = self.element_class
if self.element_class is not None:
lookup = _etree.ElementDefaultClassLookup(element=element_class)
parser.set_element_class_lookup(lookup)
return parser
def setDefaultParser(self, parser):
self._default_parser = parser
def getDefaultParser(self):
parser = getattr(self, "_default_parser", None)
if parser is None:
parser = self.createDefaultParser()
self.setDefaultParser(parser)
return parser
_parser_tls = GlobalParserTLS()
getDefaultParser = _parser_tls.getDefaultParser
def check_docinfo(elementtree, forbid_dtd=False, forbid_entities=True):
"""Check docinfo of an element tree for DTD and entity declarations
The check for entity declarations needs lxml 3 or newer. lxml 2.x does
not support dtd.iterentities().
"""
docinfo = elementtree.docinfo
if docinfo.doctype:
if forbid_dtd:
raise DTDForbidden(docinfo.doctype, docinfo.system_url, docinfo.public_id)
if forbid_entities and not LXML3:
# lxml < 3 has no iterentities()
raise NotSupportedError("Unable to check for entity declarations " "in lxml 2.x")
if forbid_entities:
for dtd in docinfo.internalDTD, docinfo.externalDTD:
if dtd is None:
continue
for entity in dtd.iterentities():
raise EntitiesForbidden(entity.name, entity.content, None, None, None, None)
def parse(source, parser=None, base_url=None, forbid_dtd=False, forbid_entities=True):
if parser is None:
parser = getDefaultParser()
elementtree = _etree.parse(source, parser, base_url=base_url)
check_docinfo(elementtree, forbid_dtd, forbid_entities)
return elementtree
def fromstring(text, parser=None, base_url=None, forbid_dtd=False, forbid_entities=True):
if parser is None:
parser = getDefaultParser()
rootelement = _etree.fromstring(text, parser, base_url=base_url)
elementtree = rootelement.getroottree()
check_docinfo(elementtree, forbid_dtd, forbid_entities)
return rootelement
XML = fromstring
def iterparse(*args, **kwargs):
raise NotSupportedError("defused lxml.etree.iterparse not available")

View file

@ -0,0 +1,63 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.dom.minidom
"""
from __future__ import print_function, absolute_import
from xml.dom.minidom import _do_pulldom_parse
from . import expatbuilder as _expatbuilder
from . import pulldom as _pulldom
__origin__ = "xml.dom.minidom"
def parse(
file, parser=None, bufsize=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
"""Parse a file into a DOM by filename or file object."""
if parser is None and not bufsize:
return _expatbuilder.parse(
file,
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
else:
return _do_pulldom_parse(
_pulldom.parse,
(file,),
{
"parser": parser,
"bufsize": bufsize,
"forbid_dtd": forbid_dtd,
"forbid_entities": forbid_entities,
"forbid_external": forbid_external,
},
)
def parseString(
string, parser=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
"""Parse a file into a DOM from a string."""
if parser is None:
return _expatbuilder.parseString(
string,
forbid_dtd=forbid_dtd,
forbid_entities=forbid_entities,
forbid_external=forbid_external,
)
else:
return _do_pulldom_parse(
_pulldom.parseString,
(string,),
{
"parser": parser,
"forbid_dtd": forbid_dtd,
"forbid_entities": forbid_entities,
"forbid_external": forbid_external,
},
)

View file

@ -0,0 +1,41 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.dom.pulldom
"""
from __future__ import print_function, absolute_import
from xml.dom.pulldom import parse as _parse
from xml.dom.pulldom import parseString as _parseString
from .sax import make_parser
__origin__ = "xml.dom.pulldom"
def parse(
stream_or_string,
parser=None,
bufsize=None,
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
if parser is None:
parser = make_parser()
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
return _parse(stream_or_string, parser, bufsize)
def parseString(
string, parser=None, forbid_dtd=False, forbid_entities=True, forbid_external=True
):
if parser is None:
parser = make_parser()
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
return _parseString(string, parser)

View file

@ -0,0 +1,60 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xml.sax
"""
from __future__ import print_function, absolute_import
from xml.sax import InputSource as _InputSource
from xml.sax import ErrorHandler as _ErrorHandler
from . import expatreader
__origin__ = "xml.sax"
def parse(
source,
handler,
errorHandler=_ErrorHandler(),
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
parser = make_parser()
parser.setContentHandler(handler)
parser.setErrorHandler(errorHandler)
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
parser.parse(source)
def parseString(
string,
handler,
errorHandler=_ErrorHandler(),
forbid_dtd=False,
forbid_entities=True,
forbid_external=True,
):
from io import BytesIO
if errorHandler is None:
errorHandler = _ErrorHandler()
parser = make_parser()
parser.setContentHandler(handler)
parser.setErrorHandler(errorHandler)
parser.forbid_dtd = forbid_dtd
parser.forbid_entities = forbid_entities
parser.forbid_external = forbid_external
inpsrc = _InputSource()
inpsrc.setByteStream(BytesIO(string))
parser.parse(inpsrc)
def make_parser(parser_list=[]):
return expatreader.create_parser()

View file

@ -0,0 +1,144 @@
# defusedxml
#
# Copyright (c) 2013 by Christian Heimes <christian@python.org>
# Licensed to PSF under a Contributor Agreement.
# See https://www.python.org/psf/license for licensing details.
"""Defused xmlrpclib
Also defuses gzip bomb
"""
from __future__ import print_function, absolute_import
import io
from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden
__origin__ = "xmlrpc.client"
from xmlrpc.client import ExpatParser
from xmlrpc import client as xmlrpc_client
from xmlrpc import server as xmlrpc_server
from xmlrpc.client import gzip_decode as _orig_gzip_decode
from xmlrpc.client import GzipDecodedResponse as _OrigGzipDecodedResponse
try:
import gzip
except ImportError: # pragma: no cover
gzip = None
# Limit maximum request size to prevent resource exhaustion DoS
# Also used to limit maximum amount of gzip decoded data in order to prevent
# decompression bombs
# A value of -1 or smaller disables the limit
MAX_DATA = 30 * 1024 * 1024 # 30 MB
def defused_gzip_decode(data, limit=None):
"""gzip encoded data -> unencoded data
Decode data using the gzip content encoding as described in RFC 1952
"""
if not gzip: # pragma: no cover
raise NotImplementedError
if limit is None:
limit = MAX_DATA
f = io.BytesIO(data)
gzf = gzip.GzipFile(mode="rb", fileobj=f)
try:
if limit < 0: # no limit
decoded = gzf.read()
else:
decoded = gzf.read(limit + 1)
except IOError: # pragma: no cover
raise ValueError("invalid data")
f.close()
gzf.close()
if limit >= 0 and len(decoded) > limit:
raise ValueError("max gzipped payload length exceeded")
return decoded
class DefusedGzipDecodedResponse(gzip.GzipFile if gzip else object):
"""a file-like object to decode a response encoded with the gzip
method, as described in RFC 1952.
"""
def __init__(self, response, limit=None):
# response doesn't support tell() and read(), required by
# GzipFile
if not gzip: # pragma: no cover
raise NotImplementedError
self.limit = limit = limit if limit is not None else MAX_DATA
if limit < 0: # no limit
data = response.read()
self.readlength = None
else:
data = response.read(limit + 1)
self.readlength = 0
if limit >= 0 and len(data) > limit:
raise ValueError("max payload length exceeded")
self.stringio = io.BytesIO(data)
super().__init__(mode="rb", fileobj=self.stringio)
def read(self, n):
if self.limit >= 0:
left = self.limit - self.readlength
n = min(n, left + 1)
data = gzip.GzipFile.read(self, n)
self.readlength += len(data)
if self.readlength > self.limit:
raise ValueError("max payload length exceeded")
return data
else:
return super().read(n)
def close(self):
super().close()
self.stringio.close()
class DefusedExpatParser(ExpatParser):
def __init__(self, target, forbid_dtd=False, forbid_entities=True, forbid_external=True):
super().__init__(target)
self.forbid_dtd = forbid_dtd
self.forbid_entities = forbid_entities
self.forbid_external = forbid_external
parser = self._parser
if self.forbid_dtd:
parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl
if self.forbid_entities:
parser.EntityDeclHandler = self.defused_entity_decl
parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl
if self.forbid_external:
parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
raise DTDForbidden(name, sysid, pubid)
def defused_entity_decl(
self, name, is_parameter_entity, value, base, sysid, pubid, notation_name
):
raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name)
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
# expat 1.2
raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
raise ExternalReferenceForbidden(context, base, sysid, pubid)
def monkey_patch():
xmlrpc_client.FastParser = DefusedExpatParser
xmlrpc_client.GzipDecodedResponse = DefusedGzipDecodedResponse
xmlrpc_client.gzip_decode = defused_gzip_decode
if xmlrpc_server:
xmlrpc_server.gzip_decode = defused_gzip_decode
def unmonkey_patch():
xmlrpc_client.FastParser = None
xmlrpc_client.GzipDecodedResponse = _OrigGzipDecodedResponse
xmlrpc_client.gzip_decode = _orig_gzip_decode
if xmlrpc_server:
xmlrpc_server.gzip_decode = _orig_gzip_decode

View file

@ -7,6 +7,7 @@ e.g. plugin://... calls. Hence be careful to only rely on window variables.
from logging import getLogger from logging import getLogger
import sys import sys
import copy import copy
import xml.etree.ElementTree as etree
import xbmc import xbmc
import xbmcplugin import xbmcplugin
@ -509,7 +510,7 @@ def browse_plex(key=None, plex_type=None, section_id=None, synched=True,
return return
if xml[0].tag == 'Hub': if xml[0].tag == 'Hub':
# E.g. when hitting the endpoint '/hubs/search' # E.g. when hitting the endpoint '/hubs/search'
answ = utils.etree.Element(xml.tag, attrib=xml.attrib) answ = etree.Element(xml.tag, attrib=xml.attrib)
for hub in xml: for hub in xml:
if not utils.cast(int, hub.get('size')): if not utils.cast(int, hub.get('size')):
# Empty category # Empty category

View file

@ -5,7 +5,7 @@ from logging import getLogger
from xbmc import executebuiltin from xbmc import executebuiltin
from . import utils from . import utils
from .utils import etree import xml.etree.ElementTree as etree
from . import path_ops from . import path_ops
from . import migration from . import migration
from .downloadutils import DownloadUtils as DU, exceptions from .downloadutils import DownloadUtils as DU, exceptions

View file

@ -3,7 +3,7 @@
import urllib.request, urllib.parse, urllib.error import urllib.request, urllib.parse, urllib.error
import copy import copy
from ..utils import etree import xml.etree.ElementTree as etree
from .. import variables as v, utils from .. import variables as v, utils
ICON_PATH = 'special://home/addons/plugin.video.plexkodiconnect/icon.png' ICON_PATH = 'special://home/addons/plugin.video.plexkodiconnect/icon.png'

View file

@ -9,7 +9,7 @@ from ..plex_api import API
from .. import kodi_db from .. import kodi_db
from .. import itemtypes, path_ops from .. import itemtypes, path_ops
from .. import plex_functions as PF, music, utils, variables as v, app from .. import plex_functions as PF, music, utils, variables as v, app
from ..utils import etree import xml.etree.ElementTree as etree
LOG = getLogger('PLEX.sync.sections') LOG = getLogger('PLEX.sync.sections')

View file

@ -15,7 +15,9 @@ import urllib
import urllib.parse import urllib.parse
# Originally tried faster cElementTree, but does NOT work reliably with Kodi # Originally tried faster cElementTree, but does NOT work reliably with Kodi
# etree parse unsafe; make sure we're always receiving unicode # etree parse unsafe; make sure we're always receiving unicode
from . import defused_etree as etree from .defusedxml import ElementTree as etree
from .defusedxml.ElementTree import ParseError
import xml.etree.ElementTree as undefused_etree
from functools import wraps from functools import wraps
import re import re
import gc import gc
@ -700,16 +702,17 @@ class XmlKodiSetting(object):
# This will abort __enter__ # This will abort __enter__
self.__exit__(IOError('File not found'), None, None) self.__exit__(IOError('File not found'), None, None)
# Create topmost xml entry # Create topmost xml entry
self.tree = etree.ElementTree(etree.Element(self.top_element)) self.tree = undefused_etree.ElementTree(
undefused_etree.Element(self.top_element))
self.write_xml = True self.write_xml = True
except etree.ParseError: except ParseError:
LOG.error('Error parsing %s', self.path) LOG.error('Error parsing %s', self.path)
# "Kodi cannot parse {0}. PKC will not function correctly. Please # "Kodi cannot parse {0}. PKC will not function correctly. Please
# visit {1} and correct your file!" # visit {1} and correct your file!"
messageDialog(lang(29999), lang(39716).format( messageDialog(lang(29999), lang(39716).format(
self.filename, self.filename,
'http://kodi.wiki')) 'http://kodi.wiki'))
self.__exit__(etree.ParseError('Error parsing XML'), None, None) raise
self.root = self.tree.getroot() self.root = self.tree.getroot()
return self return self
@ -735,6 +738,7 @@ class XmlKodiSetting(object):
lang(30417).format(self.filename, err)) lang(30417).format(self.filename, err))
settings('%s_ioerror' % self.filename, settings('%s_ioerror' % self.filename,
value='warning_shown') value='warning_shown')
return True
def _is_empty(self, element, empty_elements): def _is_empty(self, element, empty_elements):
empty = True empty = True
@ -771,7 +775,7 @@ class XmlKodiSetting(object):
""" """
answ = element.find(subelement) answ = element.find(subelement)
if answ is None: if answ is None:
answ = etree.SubElement(element, subelement) answ = undefused_etree.SubElement(element, subelement)
return answ return answ
def get_setting(self, node_list): def get_setting(self, node_list):
@ -847,7 +851,7 @@ class XmlKodiSetting(object):
for node in nodes: for node in nodes:
element = self._set_sub_element(element, node) element = self._set_sub_element(element, node)
if append: if append:
element = etree.SubElement(element, node_list[-1]) element = undefused_etree.SubElement(element, node_list[-1])
# Write new values # Write new values
element.text = value element.text = value
if attrib: if attrib:

View file

@ -7,6 +7,7 @@
from logging import getLogger from logging import getLogger
import re import re
import socket import socket
import xml.etree.ElementTree as etree
import xbmc import xbmc
@ -25,7 +26,7 @@ def get_etree(topelement):
except IOError: except IOError:
# Document is blank or missing # Document is blank or missing
LOG.info('%s.xml is missing or blank, creating it', topelement) LOG.info('%s.xml is missing or blank, creating it', topelement)
root = utils.etree.Element(topelement) root = etree.Element(topelement)
except utils.ParseError: except utils.ParseError:
LOG.error('Error parsing %s', topelement) LOG.error('Error parsing %s', topelement)
# "Kodi cannot parse {0}. PKC will not function correctly. Please visit # "Kodi cannot parse {0}. PKC will not function correctly. Please visit
@ -107,10 +108,10 @@ def start():
top_element='sources') as xml: top_element='sources') as xml:
files = xml.root.find('files') files = xml.root.find('files')
if files is None: if files is None:
files = utils.etree.SubElement(xml.root, 'files') files = etree.SubElement(xml.root, 'files')
utils.etree.SubElement(files, etree.SubElement(files,
'default', 'default',
attrib={'pathversion': '1'}) attrib={'pathversion': '1'})
for source in files: for source in files:
entry = source.find('path') entry = source.find('path')
if entry is None: if entry is None:
@ -123,12 +124,12 @@ def start():
else: else:
# Need to add an element for our hostname # Need to add an element for our hostname
LOG.debug('Adding subelement to sources.xml for %s', hostname) LOG.debug('Adding subelement to sources.xml for %s', hostname)
source = utils.etree.SubElement(files, 'source') source = etree.SubElement(files, 'source')
utils.etree.SubElement(source, 'name').text = 'PKC %s' % hostname etree.SubElement(source, 'name').text = 'PKC %s' % hostname
utils.etree.SubElement(source, etree.SubElement(source,
'path', 'path',
attrib={'pathversion': '1'}).text = '%s/' % path attrib={'pathversion': '1'}).text = '%s/' % path
utils.etree.SubElement(source, 'allowsharing').text = 'false' etree.SubElement(source, 'allowsharing').text = 'false'
xml.write_xml = True xml.write_xml = True
except utils.ParseError: except utils.ParseError:
return return
@ -146,7 +147,7 @@ def start():
'replacing it', 'replacing it',
path) path)
xml.root.remove(entry) xml.root.remove(entry)
entry = utils.etree.SubElement(xml.root, 'path') entry = etree.SubElement(xml.root, 'path')
# "Username" # "Username"
user = utils.dialog('input', utils.lang(1014)) user = utils.dialog('input', utils.lang(1014))
if user is None: if user is None:
@ -162,13 +163,13 @@ def start():
type='{alphanum}', type='{alphanum}',
option='{hide}') option='{hide}')
password = utils.quote(password) password = utils.quote(password)
utils.etree.SubElement(entry, etree.SubElement(entry,
'from', 'from',
attrib={'pathversion': '1'}).text = f'{path}/' attrib={'pathversion': '1'}).text = f'{path}/'
login = f'{protocol}://{user}:{password}@{hostname}/' login = f'{protocol}://{user}:{password}@{hostname}/'
utils.etree.SubElement(entry, etree.SubElement(entry,
'to', 'to',
attrib={'pathversion': '1'}).text = login attrib={'pathversion': '1'}).text = login
xml.write_xml = True xml.write_xml = True
except utils.ParseError: except utils.ParseError:
return return