Source code for pykickstart.load

#
# Tomas Radej <tradej@redhat.com>
#
# Copyright 2015 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use, modify,
# copy, or redistribute it subject to the terms and conditions of the GNU
# General Public License v.2.  This program is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY expressed or implied, including the
# implied warranties of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.  Any Red Hat
# trademarks that are incorporated in the source code or documentation are not
# subject to the GNU General Public License and may only be used or replicated
# with the express permission of Red Hat, Inc.
#
import requests
import shutil

from pykickstart.errors import KickstartError
from pykickstart.i18n import _
from requests.exceptions import SSLError, RequestException

is_url = lambda location: '://' in location  # RFC 3986

SSL_VERIFY = True

[docs] def load_to_str(location): '''Load a destination URL or file into a string. Type of input is inferred automatically. Arguments: location -- URL or file name to load Returns: string with contents Raises: KickstartError on error reading''' if is_url(location): return _load_url(location) else: return _load_file(location)
[docs] def load_to_file(location, destination): '''Load a destination URL or file into a file name. Type of input is inferred automatically. Arguments: location -- URL or file name to load destination -- destination file name to write to Returns: file name with contents Raises: KickstartError on error reading or writing''' if is_url(location): contents = _load_url(location) # Write to file try: with open(destination, 'w') as fh: fh.write(contents) except IOError as e: raise KickstartError(_('Error writing file "%s":') % location + ': {e}'.format(e=str(e))) return destination else: _copy_file(location, destination) return destination
def _load_url(location): '''Load a location (URL or filename) and return contents as string''' try: request = requests.get(location, verify=SSL_VERIFY, timeout=120) except SSLError as e: raise KickstartError(_('Error securely accessing URL "%s"') % location + ': {e}'.format(e=str(e))) except RequestException as e: raise KickstartError(_('Error accessing URL "%s"') % location + ': {e}'.format(e=str(e))) if request.status_code != requests.codes.ok: # pylint: disable=no-member raise KickstartError(_('Error accessing URL "%s"') % location + ': {c}'.format(c=str(request.status_code))) return request.text def _load_file(filename): '''Load a file's contents and return them as a string''' try: with open(filename, 'rb') as fh: contents = fh.read().decode("utf-8") except IOError as e: raise KickstartError(_('Error opening file: %s') % str(e)) return contents def _copy_file(filename, destination): '''Copy file to destination''' try: shutil.copyfile(filename, destination) except (OSError, IOError) as e: raise KickstartError(_('Error copying file: %s') % str(e))