Source code for pysparkling.fileio.fs.hdfs

from __future__ import absolute_import, unicode_literals

from fnmatch import fnmatch
import logging
from io import BytesIO, StringIO

from ...exceptions import FileSystemNotSupported
from ...utils import parse_file_uri, format_file_uri
from .file_system import FileSystem

log = logging.getLogger(__name__)

try:
    import hdfs
except ImportError:
    hdfs = None


[docs]class Hdfs(FileSystem): """:class:`.FileSystem` implementation for HDFS.""" _conn = {} def __init__(self, file_name): if hdfs is None: raise FileSystemNotSupported( 'hdfs not supported. Install the python package "hdfs".' ) super(Hdfs, self).__init__(file_name) @staticmethod def client_and_path(path): _, domain, folder_path, file_pattern = parse_file_uri(path) if ':' not in domain: port = 50070 else: domain, port = domain.split(':') port = int(port) cache_id = domain + '__' + str(port) if cache_id not in Hdfs._conn: if hdfs is None: raise FileSystemNotSupported( 'hdfs not supported. Install the python package "hdfs".' ) Hdfs._conn[cache_id] = hdfs.InsecureClient( # pylint: disable=no-member 'http://{0}:{1}'.format(domain, port) ) return Hdfs._conn[cache_id], folder_path + file_pattern def exists(self): c, p = Hdfs.client_and_path(self.file_name) try: c.status(p) except hdfs.util.HdfsError: # pylint: disable=no-member return False return True @staticmethod def resolve_filenames(expr): c, _ = Hdfs.client_and_path(expr) scheme, domain, folder_path, _ = parse_file_uri(expr) files = [] for fn, file_status in c.list(folder_path, status=True): file_local_path = '{0}{1}'.format(folder_path, fn) file_path = format_file_uri(scheme, domain, file_local_path) part_file_expr = expr + ("" if expr.endswith("/") else "/") + 'part*' if fnmatch(file_path, expr): if file_status["type"] != "DIRECTORY": files.append(file_path) else: files += Hdfs._get_folder_part_files( c, scheme, domain, file_local_path, part_file_expr ) elif fnmatch(file_path, part_file_expr): files.append(file_path) return files @staticmethod def _get_folder_part_files(c, scheme, domain, folder_local_path, expr_with_part): files = [] for fn, file_status in c.list(folder_local_path, status=True): sub_file_path = format_file_uri(scheme, domain, folder_local_path, fn) if fnmatch(sub_file_path, expr_with_part) and file_status["type"] != "DIRECTORY": files.append(sub_file_path) return files @classmethod def _get_folder_files_by_expr(cls, c, scheme, domain, folder_path, expr=None): """ Using client c, retrieves all files located in the folder `folder_path` that matches `expr` :param c: An HDFS client :param scheme: a scheme such as hdfs :param domain: a DFS web server :param folder_path: a folder path without patterns :param expr: a pattern :return: list of matching files absolute paths prefixed with the scheme and domain """ file_paths = [] for fn, file_status in c.list(folder_path, status=True): file_local_path = '{0}{1}'.format(folder_path, fn) if expr is None or fnmatch(file_local_path, expr): if file_status["type"] == "DIRECTORY": file_paths += cls._get_folder_files_by_expr( c, scheme, domain, file_local_path + "/", expr=None ) else: file_path = format_file_uri(scheme, domain, file_local_path) file_paths.append(file_path) elif file_status["type"] == "DIRECTORY": file_paths += cls._get_folder_files_by_expr( c, scheme, domain, file_local_path + "/", expr ) return file_paths @classmethod def resolve_content(cls, expr): c, _ = cls.client_and_path(expr) scheme, domain, folder_path, pattern = parse_file_uri(expr) expr = folder_path + pattern return cls._get_folder_files_by_expr(c, scheme, domain, folder_path, expr) def load(self): log.debug('Hdfs read for {0}.'.format(self.file_name)) c, path = Hdfs.client_and_path(self.file_name) with c.read(path) as reader: r = BytesIO(reader.read()) return r def load_text(self, encoding='utf8', encoding_errors='ignore'): log.debug('Hdfs text read for {0}.'.format(self.file_name)) c, path = Hdfs.client_and_path(self.file_name) with c.read(path) as reader: r = StringIO(reader.read().decode(encoding, encoding_errors)) return r def dump(self, stream): log.debug('Dump to {0} with hdfs write.'.format(self.file_name)) c, path = Hdfs.client_and_path(self.file_name) c.write(path, stream) return self