# -*- coding: utf-8 -*-
#
# This file is part of xrootdfs
# Copyright (C) 2015 CERN.
#
# xrootdfs is free software; you can redistribute it and/or modify it under the
# terms of the Revised BSD License; see LICENSE file for more details.
"""File-like interface for interacting with files over the XRootD protocol."""
from __future__ import absolute_import, print_function
import sys
from fs import SEEK_CUR, SEEK_END, SEEK_SET
from fs.errors import InvalidPathError, PathError, ResourceNotFoundError, \
UnsupportedError
from fs.path import basename
from six import b, binary_type, text_type
from XRootD.client import File
from .utils import is_valid_path, is_valid_url, spliturl, \
translate_file_mode_to_flags
[docs]class XRootDFile(object):
r"""File-like interface for working with files over XRootD protocol.
This class understands and will accept the following mode strings,
with any additional characters being ignored:
* ``r`` - Open the file for reading only.
* ``r+`` - Open the file for reading and writing.
* ``r-`` - Open the file for streamed reading; do not allow seek/tell.
* ``w`` - Open the file for writing only; create the file if
it doesn't exist; truncate it to zero length.
* ``w+`` - Open the file for reading and writing; create the file
if it doesn't exist; truncate it to zero length.
* ``w-`` - Open the file for streamed writing; do not allow seek/tell.
* ``a`` - Open the file for writing only; create the file if it
doesn't exist; place pointer at end of file.
* ``a+`` - Open the file for reading and writing; create the file
if it doesn't exist; place pointer at end of file.
.. note::
Streamed reading/writing modes has no performance advantages over
non-streamed reading/writing for XRootD.
:param path: Path to file that should be opened.
:type path: string
:param mode: Mode of file to open, identical to the mode string used
in 'file' and 'open' builtins.
:type mode: string
:param buffering: An optional integer used to set the buffering policy.
Pass 0 to switch buffering off (only allowed in binary mode),
1 to select line buffering (only usable in text mode), and
an integer > 1 to indicate the size of a fixed-size chunk buffer.
:param encoding: Determines encoding used when writing unicode data.
:param errors: An optional string that specifies how encoding and
decoding errors are to be handled (e.g. ``strict``, ``ignore`` or
``replace``).
:param newline: Newline character to use (either ``\\n``, ``\\r``,
``\\r\\n``, ``''`` or ``None``).
:param line_buffering: Unsupported. Anything by False will raise and
error.
:param buffer_size: Buffer size used when reading files (defaults to 64K).
This can likely be optimized to chunks up to 2MB depending on your
desired memory usage.
"""
def __init__(self, path, mode='r', buffering=-1, encoding=None,
errors=None, newline=None, line_buffering=False,
buffer_size=None, **kwargs):
"""XRootDFile constructor.
Raises PathError if the given path isn't a valid XRootD URL,
and InvalidPathError if it isn't a valid XRootD file path.
"""
if not is_valid_url(path):
raise PathError(path)
xpath = spliturl(path)[1]
if not is_valid_path(xpath):
raise InvalidPathError(xpath)
if newline not in [None, '', '\n', '\r', '\r\n']:
raise UnsupportedError(
"Newline character {0} not supported".format(newline))
if line_buffering is not False:
raise NotImplementedError("Line buffering for writing is not "
"supported.")
buffering = int(buffering)
if buffering == 1 and 'b' in mode:
raise UnsupportedError(
"Line buffering is not supported for "
"binary files.")
# PyFS attributes
self.mode = mode
# XRootD attributes & internals
self.path = path
self.encoding = encoding or sys.getdefaultencoding()
self.errors = errors or 'strict'
self.buffer_size = buffer_size or 64*1024
self.buffering = buffering
self._file = File()
self._ipp = 0
self._size = -1
self._iterator = None
self._newline = newline or b("\n")
self._buffer = b('')
self._buffer_pos = 0
# flag translation
self._flags = translate_file_mode_to_flags(mode)
statmsg, response = self._file.open(path, flags=self._flags)
if not statmsg.ok:
self._raise_status(self.path, statmsg,
"instantiating file ({0})".format(path))
# Deal with the modes
if 'a' in self.mode:
self.seek(self.size, SEEK_SET)
def _raise_status(self, path, status, source=None):
"""Raise error based on status."""
if status.errno == 3011:
raise ResourceNotFoundError(path)
else:
if source:
errstr = "XRootD error {0}file: {1}".format(
source+' ', status.message)
raise IOError(errstr)
def __del__(self):
"""Close file on object deletion."""
self.close()
def __iter__(self):
"""Initialize the internal iterator."""
self._next_func = self.read
self._next_args = ([], dict(sizehint=self.buffer_size))
if self.buffering == 1 or \
(self.buffering == -1 and 'b' not in self.mode):
self._next_func = self.readline
self._next_args = ([], dict())
elif self.buffering > 1:
self._next_args = ([], dict(sizehint=self.buffering))
return self
def __enter__(self):
"""Enter context manager method."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Exit context manager method."""
self.close()
[docs] def next(self):
"""Return next item for file iteration."""
item = self._next_func(*self._next_args[0], **self._next_args[1])
if not item:
raise StopIteration
return item
[docs] def read(self, sizehint=-1):
"""Read ``sizehint`` bytes from the file object.
If no ``sizehint`` is provided the entire file is read! Multiple calls
to this method after EOF as been reached, will return an empty string.
:oaram sizehint: Number of bytes to read from file object.
"""
if self.closed:
raise ValueError("I/O operation on closed file.")
self._assert_mode("r-")
chunksize = sizehint if sizehint > 0 else self.size
# Read data
statmsg, res = self._file.read(
offset=self._ipp,
size=chunksize,
)
if not statmsg.ok:
self._raise_status(self.path, statmsg, "reading")
# Increment internal file pointer.
self._ipp = min(
self._ipp + chunksize,
self.size if self.size > self._ipp else self._ipp
)
return res
[docs] def readline(self):
"""Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent
when a file ends with an incomplete line).
"""
bits = [self._buffer if self._buffer_pos == self.tell() else b("")]
indx = bits[-1].find(self._newline)
if indx == -1:
# Read chunks until first newline is found or entire file is read.
while indx == -1:
bit = self.read(self.buffer_size)
bits.append(bit)
if not bit:
break
indx = bit.find(self._newline)
if indx == -1:
return b("").join(bits)
indx += len(self._newline)
extra = bits[-1][indx:]
bits[-1] = bits[-1][:indx]
self._buffer = extra
self._buffer_pos = self.tell()
return b("").join(bits)
[docs] def readlines(self):
"""Read until EOF using readline().
.. warning::
This methods reads the entire file into memory! You are probably
better off using either ``xreadlines`` or just normal iteration
over the file object.
"""
return list(self.xreadlines())
[docs] def xreadlines(self, sizehint=-1):
"""Get an iterator over number of lines."""
line = True
while line:
line = self.readline()
if not line:
break
yield line
[docs] def write(self, data, flushing=False):
"""Write the given string to the file.
If the keyword argument 'flushing' is true, it indicates that the
internal write buffers are being flushed, and *all* the given data
is expected to be written to the file.
"""
self._assert_mode("w-")
if 'a' in self.mode:
self.seek(0, SEEK_END)
if not isinstance(data, binary_type):
if isinstance(data, bytearray):
data = bytes(data)
elif isinstance(data, text_type):
data = data.encode(self.encoding, self.errors)
statmsg, res = self._file.write(data, offset=self._ipp)
if not statmsg.ok:
self._raise_status(self.path, statmsg, "writing")
self._ipp += len(data)
self._size = max(self.size, self.tell())
if flushing:
self.flush()
[docs] def writelines(self, sequence):
"""Write an sequence of lines to file."""
for s in sequence:
self.write(s)
[docs] def seek(self, offset, whence=SEEK_SET):
"""Set the file's internal position pointer, approximately.
The possible values of whence and their meaning are defined
in the Linux man pages for `lseek()`:
http://man7.org/linux/man-pages/man2/lseek.2.html
``SEEK_SET``
The internal position pointer is set to offset bytes.
``SEEK_CUR``
The ipp is set to its current position plus offset bytes.
``SEEK_END``
The ipp is set to the size of the file plus offset bytes.
"""
if "-" in self.mode:
raise IOError("File is not seekable.")
# Convert to integer by rounding down/omitting everything after
# the decimal point
offset = int(offset)
if offset < 0:
raise IOError("Invalid argument.")
if whence == SEEK_SET:
self._ipp = offset
elif whence == SEEK_CUR:
self._ipp += offset
elif whence == SEEK_END:
self._ipp = self.size + offset
else:
raise NotImplementedError(whence)
[docs] def tell(self):
"""Get the location of the file's internal position pointer."""
return self._ipp
[docs] def truncate(self, size=None):
"""Truncate the file's size to ``size``.
Note that ``size`` will never be None; if it was not specified by the
user the current file position is used.
"""
self._assert_mode('w')
if size is None:
size = self.tell()
statmsg = self._file.truncate(size)[0]
if not statmsg.ok:
self._raise_status(self.path, statmsg, "truncating")
self._size = size
[docs] def close(self):
"""Close the file, including flushing the write buffers.
The file may not be accessed further once it is closed.
"""
if not self.closed:
self._file.close()
[docs] def flush(self):
"""Flush write buffers."""
if not self.closed:
statmsg, dummy = self._file.sync()
if not statmsg.ok:
self._raise_status(self.path, statmsg, "flushing write buffer")
[docs] def seekable(self):
"""Check if file is seekable."""
return '-' not in self.mode
[docs] def readable(self):
"""Check if file is readable."""
return 'r' in self.mode or '+' in self.mode
[docs] def writable(self):
"""Check if file is writable."""
return 'w' in self.mode or '+' in self.mode or 'a' in self.mode
[docs] def isatty(self):
"""Check if file is a TTY (false always).
Added for ``io`` module compatibility.
"""
return False
[docs] def fileno(self):
"""Get the underlying file descriptor.
Unsupported by XRootDFS (added for ``io`` module compatibility).
"""
raise IOError("File descriptor is unsupported by xrootd.")
@property
def name(self):
"""Get filename."""
return basename(self.path)
@property
def closed(self):
"""Check if file is closed."""
return not self._file.is_open()
@property
def size(self):
"""Get file size."""
if self._size == -1:
statmsg, res = self._file.stat()
if not statmsg.ok:
self._raise_status(self.path, statmsg, "retrieving size")
self._size = res.size
return self._size
def _assert_mode(self, mode, mstr=None):
"""Check whether the file may be accessed in the given mode."""
if mstr is None:
try:
mstr = self.mode
except AttributeError:
raise AttributeError("Mode attribute missing -- "
"was it deleted? "
"Close and re-open the file.")
if "+" in mstr:
return True
if "-" in mstr and "-" not in mode:
raise IOError("File does not support seeking.")
if "r" in mode:
if "r" not in mstr:
raise IOError("File not opened for reading")
if "w" in mode:
if "w" not in mstr and "a" not in mstr:
raise IOError("File not opened for writing")
return True