import base
import util
import header
import copy
import os
[docs]class File(object):
''' Base file object in laspy. Provides access to most laspy functionality,
and holds references to the HeaderManager, Reader, and potentially Writer objects.
'''
[docs] def __init__(self, filename,
header=None,
vlrs=False,
mode='r',
in_srs=None,
out_srs=None,
evlrs = False):
'''Instantiate a file object to represent an LAS file.
:arg filename: The filename to open
:keyword header: A header open the file with. Not required in modes "r" and "rw"
:type header: a :obj:`laspy.header.Header` instance
:keyword mode: "r" for read, "rw" for modify/update, "w" for write, and "w+" for append (not implemented)
:type mode: string
:keyword in_srs: Input SRS to override the existing file's SRS with (not implemented)
:type in_srs: a :obj:`laspy.SRS` instance
:keyword out_srs: Output SRS to reproject points on-the-fly to as \
they are read/written. (not implemented)
:type out_srs: a :obj:`laspy.SRS` instance (not implemented)
.. note::
To open a file in write mode, you must provide a
laspy.header.Header instance which will be immediately written to
the file. If you provide a header instance in read mode, the
values of that header will be used in place of those in the actual
file.
.. note::
If a file is open for write, it cannot be opened for read and vice
versa.
>>> from laspy import file
>>> f = file.File('file.las', mode='r')
>>> for p in f:
... print 'X,Y,Z: ', p.x, p.y, p.z
>>> h = f.header
>>> f2 = file.File('file2.las', mode = "w", header=h)
>>> points = f.points
>>> f2.points = points
>>> f2.close()
'''
self.filename = os.path.abspath(filename)
self._header = header
self._vlrs = vlrs
self._evlrs = evlrs
self._mode = mode.lower()
self.in_srs = in_srs
self.out_srs = out_srs
self.open()
[docs] def open(self):
'''Open the file for processing, called by __init__
'''
if self._mode == 'r':
if not os.path.exists(self.filename):
raise OSError("No such file or directory: '%s'" % self.filename)
## Make sure we have a header
if self._header == None:
self._reader = base.Reader(self.filename,mode= self._mode)
self._header = self._reader.get_header()
else:
raise util.LaspyException("Headers must currently be stored in the file, you provided: " + str(self._header))
self._reader = base.Reader(self.filename, mode = self._mode, header=self._header)
if self.in_srs:
self._reader.SetInputSRS(self.in_srs)
if self.out_srs:
self._reader.SetOutputSRS(self.out_srs)
## Wire up API for extra dimensions
if self._reader.extra_dimensions != []:
for dimension in self._reader.extra_dimensions:
dimname = dimension.name.replace("\x00", "").replace(" ", "_").lower()
self.addProperty(dimname)
if self._mode == 'rw':
if self._header == None:
self._writer = base.Writer(self.filename,mode = self._mode)
self._reader = self._writer
self._header = self._reader.get_header()
## Wire up API for any extra Dimensions
if self._writer.extra_dimensions != []:
for dimension in self._writer.extra_dimensions:
dimname = dimension.name.replace("\x00", "").replace(" ", "_").lower()
self.addProperty(dimname)
else:
raise util.LaspyException("Headers must currently be stored in the file, you provided: " + str(self._header))
if self._mode == 'w':
if self._header == None:
raise util.LaspyException("Creation of a file in write mode requires a header object.")
if isinstance(self._header, header.HeaderManager):
vlrs = self._header.vlrs
evlrs = self._header.evlrs
self._header = self._header.copy()
if self._vlrs != False:
self._vlrs.extend(vlrs)
else:
self._vlrs = vlrs
if self._evlrs != False:
self._evlrs.extend(evlrs)
else:
self._evlrs = evlrs
self._writer = base.Writer(self.filename, mode = "w",
header = self._header,
vlrs = self._vlrs, evlrs = self._evlrs)
self._reader = self._writer
## Wire up API for any extra Dimensions
if self._writer.extra_dimensions != []:
for dimension in self._writer.extra_dimensions:
dimname = dimension.name.replace("\x00", "").replace(" ", "_").lower()
self.addProperty(dimname)
if self._mode == 'w+':
raise NotImplementedError
[docs] def close(self, ignore_header_changes = False):
'''Closes the LAS file
'''
if self._mode == "r":
self._reader.close()
self._reader = None
self._header = None
else:
self._writer.close(ignore_header_changes)
self._reader = None
self._writer = None
self._header = None
def addProperty(self, name):
def fget(self):
return(self._writer.get_dimension(name))
def fset(self, value):
self.assertWriteMode()
self._writer.set_dimension(name, value)
setattr(self.__class__, name, property(fget, fset, None, None))
def assertWriteMode(self):
if self._mode == "r":
raise util.LaspyException("File is not opened in a write mode.")
# TO BE IMPLEMENTED
def set_srs(self, value):
if self._mode == "r":
return
else:
return
def set_output_srs(self,value):
return(set_srs(value))
def get_output_srs(self):
return self.out_srs
doc = '''The output :obj:`laspy.SRS` for the file. Data will be
reprojected to this SRS according to either the :obj:`input_srs` if it
was set or default to the :obj:`laspy.header.Header.SRS` if it was
not set. The header's SRS must be valid and exist for reprojection
to occur. GDAL support must also be enabled for the library for
reprojection to happen.'''
output_srs = property(get_output_srs, set_output_srs, None, doc)
def set_input_srs(self, value):
if self._mode == "r":
return
else:
return
def get_input_srs(self):
return self.in_srs
doc = '''The input :obj:`laspy.SRS` for the file. This overrides the
:obj:`laspy.header.Header.SRS`. It is useful in cases where the header's
SRS is not valid or does not exist.'''
input_srs = property(get_input_srs, set_input_srs, None, doc)
def get_header(self):
'''Returns the laspy.header.Header for the file'''
if self._mode == "r":
return self._reader.get_header()
else:
return self._writer.get_header()
return None
def set_header(self, header):
'''Sets the laspy.header.Header for the file. If the file is in \
append mode, the header will be overwritten in the file.'''
# append mode
if self._mode == "w+":
self._writer.set_header(header)
return True
raise util.LaspyException("The header can only be set "
"after file creation for files in append mode")
doc = '''The file's :obj:`laspy.header.Header`
.. note::
The header class supports .xml and .etree methods.
.. note::
If the file is in append mode, the header will be overwritten in the
file. Setting the header for the file when it is in read mode has no
effect. If you wish to override existing header information with your
own at read time, you must instantiate a new :obj:`laspy.file.File`
instance.
'''
header = property(get_header, set_header, None, doc)
def get_writer(self):
return(self._writer)
def set_writer(self, writer):
self._writer = writer
def get_reader(self):
return(self._reader)
def set_reader(self, reader):
self._reader = reader
doc = '''The file's :obj:`laspy.base.Reader` object.'''
reader = property(get_reader, set_reader, None, doc)
doc = '''The file's :obj:`laspy.base.Writer` object, if applicable.'''
writer = property(get_writer,set_writer, None, doc)
def get_points(self):
'''Return a numpy array of all point data in the file'''
return self._reader.get_points()
def set_points(self, new_points):
'''Set the points in the file from a valid numpy array, as generated from get_points,
or a list/array of laspy.base.Point instances.'''
self.assertWriteMode()
self._writer.set_points(new_points)
return
doc = '''The point data from the file. Get or set the points as either a valid numpy array, or
a list/array of laspy.base.Point instances. In write mode, the number of point records is set the
first time a dimension or point array is supplied to the file.'''
points = property(get_points, set_points, None, doc)
def read(self, index, nice = True):
'''Reads the point at the given index'''
if self._reader.get_pointrecordscount() >= index:
return(self._reader.get_point(index, nice))
else:
raise util.LaspyException("Index greater than point records count")
def get_x(self):
return(self._reader.get_x())
def set_x(self,x):
self.assertWriteMode()
self._writer.set_x(x)
return
def get_x_scaled(self):
return(self._reader.get_x(scale =True))
def set_x_scaled(self,x):
self.assertWriteMode()
self._writer.set_x(x, scale = True)
return
X = property(get_x, set_x, None, None)
x = property(get_x_scaled, set_x_scaled, None, None)
def get_y(self):
return(self._reader.get_y())
def set_y(self, y):
self.assertWriteMode()
self._writer.set_y(y)
return
def get_y_scaled(self):
return(self._reader.get_y(scale = True))
def set_y_scaled(self, y):
self.assertWriteMode()
self._writer.set_y(y, scale = True)
return
Y = property(get_y, set_y, None, None)
y = property(get_y_scaled, set_y_scaled, None, None)
def get_z(self):
return(self._reader.get_z())
def set_z(self, z):
self.assertWriteMode()
self._writer.set_z(z)
return
def get_z_scaled(self):
return(self._reader.get_z(scale = True))
def set_z_scaled(self, z):
self.assertWriteMode()
self._writer.set_z(z, scale = True)
return
Z = property(get_z, set_z, None, None)
z = property(get_z_scaled, set_z_scaled, None, None)
def get_intensity(self):
return(self._reader.get_intensity())
def set_intensity(self, intensity):
self.assertWriteMode()
self._writer.set_intensity(intensity)
return
intensity = property(get_intensity, set_intensity, None, None)
Intensity = intensity
def get_flag_byte(self):
return(self._reader.get_flag_byte())
def set_flag_byte(self, byte):
self.assertWriteMode()
self._writer.set_flag_byte(byte)
return
flag_byte = property(get_flag_byte, set_flag_byte, None, None)
def get_return_num(self):
return(self._reader.get_return_num())
def set_return_num(self, num):
self.assertWriteMode()
self._writer.set_return_num(num)
return_num = property(get_return_num, set_return_num, None, None)
def get_num_returns(self):
return(self._reader.get_num_returns())
def set_num_returns(self, num):
self.assertWriteMode()
self._writer.set_num_returns(num)
return
num_returns = property(get_num_returns, set_num_returns, None, None)
def get_scan_dir_flag(self):
return(self._reader.get_scan_dir_flag())
def set_scan_dir_flag(self,flag):
self.assertWriteMode()
self._writer.set_scan_dir_flag(flag)
return
scan_dir_flag = property(get_scan_dir_flag, set_scan_dir_flag, None, None)
def get_edge_flight_line(self):
return(self._reader.get_edge_flight_line())
def set_edge_flight_line(self,line):
self.assertWriteMode()
self._writer.set_edge_flight_line(line)
return
edge_flight_line = property(get_edge_flight_line,
set_edge_flight_line, None, None)
def get_raw_classification(self):
return(self._reader.get_raw_classification())
def set_raw_classification(self, classification):
self.assertWriteMode()
self._writer.set_raw_classification(classification)
return
raw_classification = property(get_raw_classification,
set_raw_classification, None, None)
Raw_Classification = raw_classification
def get_classification(self):
return(self._reader.get_classification())
def set_classification(self, classification):
self.assertWriteMode()
self._writer.set_classification(classification)
return
classification = property(get_classification,
set_classification, None, None)
Classification = classification
def get_classification_flags(self):
return(self._reader.get_classification_flags())
def set_classification_flags(self,value):
self.assertWriteMode()
self._writer.set_classification_flags(value)
classification_flags = property(get_classification_flags, set_classification_flags, None, None)
def get_scanner_channel(self):
return(self._writer.get_scanner_channel())
def set_scanner_channel(self, value):
self.assertWriteMode()
self._writer.set_scanner_channel(value)
scanner_channel = property(get_scanner_channel, set_scanner_channel, None, None)
def get_synthetic(self):
return(self._reader.get_synthetic())
def set_synthetic(self, synthetic):
self.assertWriteMode()
self._writer.set_synthetic(synthetic)
return
synthetic = property(get_synthetic, set_synthetic, None, None)
Synthetic = synthetic
def get_key_point(self):
return(self._reader.get_key_point())
def set_key_point(self, pt):
self.assertWriteMode()
self._writer.set_key_point(pt)
return
key_point = property(get_key_point, set_key_point, None, None)
Key_Point = key_point
def get_withheld(self):
return(self._reader.get_withheld())
def set_withheld(self, withheld):
self.assertWriteMode()
self._writer.set_withheld(withheld)
return
withheld = property(get_withheld, set_withheld, None, None)
Withheld = withheld
def get_scan_angle_rank(self):
return(self._reader.get_scan_angle_rank())
def set_scan_angle_rank(self, rank):
self.assertWriteMode()
self._writer.set_scan_angle_rank(rank)
return
scan_angle_rank = property(get_scan_angle_rank, set_scan_angle_rank,None,None)
def get_user_data(self):
return(self._reader.get_user_data())
def set_user_data(self, data):
self.assertWriteMode()
self._writer.set_user_data(data)
return
user_data = property(get_user_data, set_user_data, None, None)
def get_pt_src_id(self):
return(self._reader.get_pt_src_id())
def set_pt_src_id(self, data):
self.assertWriteMode()
self._writer.set_pt_src_id(data)
return
pt_src_id = property(get_pt_src_id, set_pt_src_id, None, None)
def get_gps_time(self):
return(self._reader.get_gps_time())
def set_gps_time(self, data):
self.assertWriteMode()
self._writer.set_gps_time(data)
return
gps_time = property(get_gps_time, set_gps_time, None, None)
def get_red(self):
return(self._reader.get_red())
def set_red(self, red):
self.assertWriteMode()
self._writer.set_red(red)
red = property(get_red, set_red, None, None)
Red = red
def get_green(self):
return(self._reader.get_green())
def set_green(self, green):
self.assertWriteMode()
self._writer.set_green(green)
return
green = property(get_green, set_green, None, None)
Green = green
def get_blue(self):
return(self._reader.get_blue())
def set_blue(self, blue):
self.assertWriteMode()
self._writer.set_blue(blue)
return
blue = property(get_blue, set_blue)
Blue = blue
def get_wave_packet_desc_index(self):
return(self._reader.get_wave_packet_desc_index())
def set_wave_packet_desc_index(self,idx):
self.assertWriteMode()
self._writer.set_wave_packet_desc_index(idx)
return
def get_nir(self):
return(self._reader.get_nir())
def set_nir(self, value):
self.assertWriteMode()
self._writer.set_nir(value)
nir = property(get_nir, set_nir, None, None)
wave_packet_desc_index = property(get_wave_packet_desc_index,
set_wave_packet_desc_index, None, None)
def get_byte_offset_to_waveform_data(self):
return(self._reader.get_byte_offset_to_waveform_data())
def set_byte_offset_to_waveform_data(self, idx):
self.assertWriteMode()
self._writer.set_byte_offset_to_waveform_data(idx)
return
byte_offset_to_waveform_data = property(get_byte_offset_to_waveform_data,
set_byte_offset_to_waveform_data,
None, None)
def get_waveform_packet_size(self):
return(self._reader.get_waveform_packet_size())
def set_waveform_packet_size(self, size):
self.assertWriteMode()
self._writer.set_waveform_packet_size(size)
return
waveform_packet_size = property(get_waveform_packet_size,
set_waveform_packet_size,
None, None)
def get_return_point_waveform_loc(self):
return(self._reader.get_return_point_waveform_loc())
def set_return_point_waveform_loc(self, loc):
self.assertWriteMode()
self._writer.set_return_point_waveform_loc(loc)
return
return_point_waveform_loc = property(get_return_point_waveform_loc,
set_return_point_waveform_loc,
None, None)
def get_x_t(self):
return(self._reader.get_x_t())
def set_x_t(self,x):
self.assertWriteMode()
self._writer.set_x_t(x)
return
x_t = property(get_x_t, set_x_t, None, None)
def get_y_t(self):
return(self._reader.get_y_t())
def set_y_t(self,y):
self.assertWriteMode()
self._writer.set_y_t(y)
return
y_t = property(get_y_t, set_y_t, None, None)
def get_z_t(self):
return(self._reader.get_z_t())
def set_z_t(self, z):
self.assertWriteMode()
self._writer.set_z_t(z)
z_t = property(get_z_t, set_z_t, None, None)
def get_extra_bytes(self):
return(self._reader.get_extra_bytes())
def set_extra_bytes(self, new):
self.assertWriteMode()
self._writer.set_extra_bytes(new)
doc = '''It is possible to specify a data_record_length longer than the default,
and the extra space is treated by laspy as raw bytes accessable via this extra_bytes property.
This dimension is only assignable for files in write mode which were instantiated with the appropriate
data_record_length from the header.'''
extra_bytes = property(get_extra_bytes, set_extra_bytes, None, doc)
[docs] def __iter__(self):
'''Iterator support (read mode only)
>>> points = []
>>> for i in f:
... points.append(i)
... print i # doctest: +ELLIPSIS
<laspy.base.Point object at ...>
'''
if self._mode == "r":
self.at_end = False
p = self._reader.get_point(0)
while p and not self.at_end:
yield p
p = self._reader.get_next_point()
if not p:
self.at_end = True
else:
self.close()
self.open()
### END OF GB REVISIONS ###
[docs] def __getitem__(self, index):
'''Index and slicing support
>>> out = f[0:3]
[<laspy.base.Point object at ...>,
<laspy.base.Point object at ...>,
<laspy.base.Point object at ...>]
'''
try:
index.stop
except AttributeError:
return self.read(index)
output = []
if index.step:
step = index.step
else:
step = 1
for i in range(index.start, index.stop, step):
output.append(self.read(i))
return output
[docs] def __len__(self):
'''Returns the number of points in the file according to the header'''
return self.header.point_records_count
def write(self, pt):
'''Writes the point to the file if it is append or write mode. LAS
files are written sequentially starting from the first point (in pure
write mode) or from the last point that exists (in append mode).
:param pt: The point to write.
:type pt: :obj:`laspy.util.Point` instance to write
.. note::
At this time, it is not possible to duck-type point objects and
have them be written into the LAS file (from say numpy or
something). You have to take care of this adaptation yourself.
'''
if not isinstance(pt, util.Point):
raise util.LaspyException('cannot write %s, it must '
'be of type laspy.point.Point' % pt)
if self._mode != "r":
#core.las.LASWriter_WritePoint(self.handle, pt.handle)
pass
def __enter__(self):
return(self)
def __exit__(self,type, value, traceback):
## Updating header changes is slow.
self.close(ignore_header_changes = True)
def get_point_format(self):
return self._reader.point_format
doc = '''The point format of the file, stored as a laspy.util.Format instance. Supports .xml and .etree methods.'''
point_format = property(get_point_format, None, None, doc)
# def get_xmlsummary(self):
# '''Returns an XML string summarizing all of the points in the reader
#
# .. note::
# This method will reset the reader's read position to the 0th
# point to summarize the entire file, and it will again reset the
# read position to the 0th point upon completion.'''
# if self._mode != 0:
# raise util.LaspyException("file must be in read mode, not append or write mode to provide xml summary")
# return
#
# summary = property(get_xmlsummary, None, None, None)