Source code for pysparkling.fileio.fs.hdfs

from __future__ import absolute_import, unicode_literals

from fnmatch import fnmatch
import logging
from io import BytesIO, StringIO

from ...exceptions import FileSystemNotSupported
from ...utils import Tokenizer
from .file_system import FileSystem

log = logging.getLogger(__name__)

try:
    import hdfs
except ImportError:
    hdfs = None


[docs]class Hdfs(FileSystem): """:class:`.FileSystem` implementation for HDFS.""" _conn = {} def __init__(self, file_name): if hdfs is None: raise FileSystemNotSupported( 'hdfs not supported. Install the python package "hdfs".' ) super(Hdfs, self).__init__(file_name) @staticmethod def client_and_path(path): # obtain key t = Tokenizer(path) t.next('://') # skip scheme domain = t.next('/') path = t.next() if ':' not in domain: port = 50070 else: domain, port = domain.split(':') port = int(port) cache_id = domain + '__' + str(port) if cache_id not in Hdfs._conn: Hdfs._conn[cache_id] = hdfs.InsecureClient( 'http://{0}:{1}'.format(domain, port) ) return Hdfs._conn[cache_id], '/' + path def exists(self): c, p = Hdfs.client_and_path(self.file_name) try: c.status(p) except hdfs.util.HdfsError: return False return True @staticmethod def resolve_filenames(expr): c, _ = Hdfs.client_and_path(expr) t = Tokenizer(expr) scheme = t.next('://') domain = t.next('/') fixed_path = t.next(['*', '?']) file_expr = t.next() if file_expr and '/' in fixed_path: file_expr = fixed_path[fixed_path.rfind('/') + 1:] + file_expr fixed_path = fixed_path[:fixed_path.rfind('/')] # file_expr is only the actual file expression if there was a * or ?. # Handle this case. if not file_expr: if '/' in fixed_path: file_expr = fixed_path[fixed_path.rfind('/') + 1:] fixed_path = fixed_path[:fixed_path.rfind('/')] else: file_expr = fixed_path fixed_path = '' files = [] for fn in c.list('{}/'.format(fixed_path), status=False): if fnmatch(fn, file_expr) or fnmatch(fn, file_expr + '/part*'): files.append('{0}://{1}/{2}/{3}'.format( scheme, domain, fixed_path, fn)) return files def load(self): log.debug('Hdfs read for {0}.'.format(self.file_name)) c, path = Hdfs.client_and_path(self.file_name) with c.read(path) as reader: r = BytesIO(reader.read()) return r def load_text(self, encoding='utf8', encoding_errors='ignore'): log.debug('Hdfs text read for {0}.'.format(self.file_name)) c, path = Hdfs.client_and_path(self.file_name) with c.read(path) as reader: r = StringIO(reader.read().decode(encoding, encoding_errors)) return r def dump(self, stream): log.debug('Dump to {0} with hdfs write.'.format(self.file_name)) c, path = Hdfs.client_and_path(self.file_name) c.write(path, stream) return self