Source code for pysparkling.fileio.fs.local

from __future__ import absolute_import, unicode_literals

import glob
from fnmatch import fnmatch
import io
import logging
import os

from ...utils import Tokenizer
from .file_system import FileSystem

log = logging.getLogger(__name__)


[docs]class Local(FileSystem): """:class:`.FileSystem` implementation for the local file system."""
[docs] @staticmethod def resolve_filenames(expr): if expr.startswith('file://'): expr = expr[7:] if os.path.isfile(expr): return [expr] if '/' not in expr: expr = '.' + os.path.sep + expr t = Tokenizer(expr) prefix = t.next(['*', '?']) if not prefix.endswith('/') and '/' in prefix: prefix = os.path.dirname(prefix) files = [] for root, _, filenames in os.walk(prefix): for filename in filenames: path = os.path.join(root, filename) if fnmatch(path, expr) or fnmatch(path, expr + '/part*'): files.append(path) return files
[docs] @staticmethod def resolve_content(expr): if expr.startswith('file://'): expr = expr[7:] matches = glob.glob(expr) file_paths = [] for match in matches: if os.path.isfile(match): file_paths.append(match) else: file_paths += [ os.path.join(root, f) for root, _, files in os.walk(match) for f in files if not f.startswith(("_", ".")) ] return file_paths
@property def file_path(self): if self.file_name.startswith('file://'): return self.file_name[7:] return self.file_name
[docs] def exists(self): return os.path.exists(self.file_path)
[docs] def load(self): with io.open(self.file_path, 'rb') as f: return io.BytesIO(f.read())
[docs] def load_text(self, encoding='utf8', encoding_errors='ignore'): with io.open(self.file_path, 'r', encoding=encoding, errors=encoding_errors) as f: return io.StringIO(f.read())
[docs] def dump(self, stream): file_path = self.file_path # caching # making sure directory exists dirname = os.path.dirname(file_path) if dirname and not os.path.exists(dirname): log.debug('creating local directory {0}'.format(dirname)) os.makedirs(dirname) log.debug('writing file {0}'.format(file_path)) with io.open(file_path, 'wb') as f: for c in stream: f.write(c) return self