Source code for pynoddy.experiment

'''
Base class from which PyNoddy experiments should inherit.

Much basic functionality (random perturbation, plotting etc. is defined here).

Thought: perhaps drawing functions etc. should be moved into NoddyOutput class?

@author: flohorovicic, samthiele
'''
import os
import numpy as np

# from pynoddy.history import NoddyHistory
# from pynoddy.output import NoddyOutput

import pynoddy.history
import pynoddy.output

import util.sampling as Sample


[docs]class Experiment(pynoddy.history.NoddyHistory, pynoddy.output.NoddyOutput): '''Noddy experiment container, inheriting from both noddy history and output methods ''' def __init__(self, history=None, **kwds): """Combination of input and output methods for complete kinematic experiments with NOddy **Optional Keywords**: - *his_file* = string : filename of Noddy history input file """ super(Experiment, self).__init__(history, **kwds) # super(Experiment, self).test() # if kwds.has_key("history"): # self.load_history(kwds['history']) # print("Determine events") # self.determine_events()
[docs] def update(self): """Update model computation""" self.set_up_to_date()
[docs] def set_up_to_date(self): """Set boolean variable for valid object""" self.__is_up_to_date = True
[docs] def get_up_to_date(self): """Get model state""" return self.__is_up_to_date
is_up_to_date = property(get_up_to_date, set_up_to_date, None, "Model state")
[docs] def set_parameter_statistics(self, param_stats): """Define parameter statistics for uncertainty simulation and sensitivity analysis param_stats = list : list with relevant statistics defined for event parameters list is organised as: param_stats[event_id][parameter_name][stats_type] = value Example: param_stats[2]["Dip"]["min"] = 200. Possible statistics are: - min = float : minimum bound - max = float : maximum bound - type = 'normal', 'uniform' : distribution type - stdev = float : standard deviation (if normal distribution) """ self.param_stats = param_stats if not hasattr(self, 'base_events'): # if this model has not been frozen before self.freeze()
[docs] def load_parameter_file(self, filename, **kwds): """Load parameter statistics from external csv file The csv file should contain a header row with the relevant keywords identifying columns. In order to be read in correctly, the header should contain the labels: - 'event' : event id - 'parameter' : Noddy parameter ('Dip', 'Dip Direction', etc.) - 'mean' : mean parameter value - 'type' = 'normal', 'vonmises' or 'uniform'. In addition, it is necessary to define PDF type and parameters. For now, the following settings are supported: - '+-' = Defines the 2.5th and 97.5th percentiles of the distribution, similar to a 95% confidence interval. - 'stdev' = standard deviation. Only works if type='normal'. - 'min' = The minimum value of a uniform distribution (if type='uniform') - 'max' = The maximum value of a uniform distribution (if type='uniform') **Arguments**: - *filename* = string : filename **Optional arguments**: - *delim* = string : delimiter (default: ',' or ';', both checked) """ lines = open(filename).readlines() delim = kwds.get("delim", ",") # test if "," is actually the delimiter - if not, try ';' (stupid Excel standard...) header = lines[0].rstrip().split(delim) if len(header) == 1: delim = ';' header = lines[0].rstrip().split(delim) # set up parameter list self.param_stats = [] for line in lines[1:]: l = line.rstrip().split(delim) # set up parameter dictionary param_dict = {} if l[0] == '': break # end of entries for ele in header: if ele == '': continue # empty column in header if ele == 'event': event_code = l[header.index(ele)] evts = [] for e in event_code.split('|'): # multiple events can be separated by | evts.append(int(e)) # append events param_dict[ele] = tuple(evts) continue try: param_dict[ele] = float(l[header.index(ele)]) except ValueError: # not a number param_dict[ele] = l[header.index(ele)] self.param_stats.append(param_dict) if not hasattr(self, 'base_events'): # if this model has not been frozen before self.freeze()
[docs] def freeze(self, **kwds): """Freeze the current model state: store the event settings for later comparison""" self.base_events = self.copy_events()
[docs] def reset_base(self): """Set events back to base model (stored in self.base_events)""" import copy self.events = copy.deepcopy(self.base_events)
[docs] def set_random_seed(self, random_seed): """Set random seed for reproducible experiments **Arguments**: - *random_seed* = int (or array-like) : define seed """ self.seed = random_seed # apply random seed np.random.seed(random_seed)
[docs] def reset_random_seed(self): """Reset random seed to defined value (stored in self.seed, set with self.set_random_seed)""" if not hasattr(self, 'seed'): raise AttributeError("Random seed not defined! Set with self.set_random_seed()") np.random.seed(self.seed)
[docs] def random_draw(self, **kwds): """Perform a random draw for parameter distributions as defined, and calculate model This method is based on the model "base-state", and not the current state (as opposed to the self.random_perturbation() method). **Optional Keywords**: - *verbose* = bool: print out parameter changes as they happen (default: False) - *store_params* = bool : store random parameter set (default: True) """ self.reset_base() self.random_perturbation(**kwds)
[docs] def random_perturbation(self, **kwds): """Perform a random perturbation of the model according to parameter statistics defined in self.param_stats. Note that by default, this function is identical to random_draw. If model_type is set to 'current', then parameters are varied according using the *current values* as distribution means - this allows 'random walk' away from the initial model state, which is usually not desired. **Optional arguments**: - *store_params* = bool : store random parameter set (default: True) - *verbose* = bool: print out parameter changes as they happen (default: False) - *model_type* = 'base', 'current' : perturb on basis of current model, or use base model (default: 'base' model) """ store_params = kwds.get("store_params", True) verbose = kwds.get("verbose", False) # create a dictionary for event parameter changes: param_changes = {} # relative parameter changes absolute_changes = {} # absolute parameter changes # calculate new values according to 'statistics' for param in self.param_stats: # if param['event'] is not a tuple, make it one if not type(param['event']) is tuple: param['event'] = (param['event'],) # cast to tuple for e in param['event']: # intialise param changes dictionaries if not param_changes.has_key(e): param_changes[e] = {} absolute_changes[e] = {} # get original value model_type = kwds.get('model_type', 'base') if model_type == 'base': ori_val = self.base_events[param['event'][0]].properties[param['parameter']] else: ori_val = self.events[param['event'][0]].properties[param['parameter']] # sample value from appropriate distribution random_val = 0 if param.has_key("type"): if param['type'] == 'normal': # draw value of normal distribution: mean = param.get("mean", ori_val) # default mean is original value # use assigned standard deviation if param.has_key('stdev'): stdev = param.get("stdev") random_val = np.random.normal(mean, stdev) elif param.has_key('+-'): ci = param.get('+-') random_val = Sample.Normal(mean, ci, 1) else: # not enough information to calculate standard deviation raise AttributeError( "Error: Normal distribution is underdefined. Please assign either a 'stdev' value or a '+-' value (defining the interval between the 2.5th and 97.25th quantile)") if param['type'] == 'vonmises': mean = param.get("mean", ori_val) # sample distribution if param.has_key('+-'): ci = param.get('+-') random_val = Sample.VonMises(mean, ci, 1) else: # +- needs to be defined raise AttributeError( "Error: Von-Mises distribution is underdefined. Please assign either a '+-' value (defining the interval between the 2.5th and 97.25th quantile)") if param['type'] == 'uniform': # retrieve specified min/max values if param.has_key("min") and param.has_key("max"): minimum = param.get("min") maximum = param.get("max") random_val = np.random.uniform(minimum, maximum) elif param.has_key("+-") and param.has_key("mean"): # retrieve mean and confidence interval mean = param.get("mean") ci = param.get("+-") random_val = Sample.Uniform(mean, ci, 1) else: raise AttributeError( "Error: Sampling from a uniform distribution requires either a specified range ('min' and 'max' values) or a mean and '+-' value (95% confidence interval)") # throw error for other types of distribution if param['type'] != 'normal' and param['type'] != 'vonmises' and param['type'] != 'uniform': raise AttributeError("Sampling for type %s not yet implemented, sorry." % param['type']) # store relative changes for e in param['event']: param_changes[e][param['parameter']] = random_val - self.events[param['event'][0]].properties[ param['parameter']] # store absolute changes absolute_changes[e][param['parameter']] = random_val # print changes if verbose: print('Changing %s to %s' % (param['parameter'], random_val)) else: raise AttributeError("Please define type of parameter statistics ('type' keyword in table)") # assign changes to model: self.change_event_params(param_changes) # store results for later analysis if store_params: if not hasattr(self, 'random_parameter_changes'): # initialise array self.random_parameter_changes = [absolute_changes] else: self.random_parameter_changes.append(absolute_changes)
[docs] def shuffle_event_order(self, event_ids): """Randomly shuffle order of events **Arguments**: - *event_ids* = [list of event ids] : event ids to be randomly shuffeled """ new_order = np.random.choice(event_ids, size=len(event_ids), replace=False) # set up reorder-dictionary reorder_dict = {} for i, ev_id in enumerate(event_ids): reorder_dict[ev_id] = new_order[i] self.reorder_events(reorder_dict)
[docs] def get_sampling_line_data(self, xyz_from, xyz_to): """Get computed model along a line, for example as a drillhole position **Arguments**: - *xyz_from* = [x, y, z] : list of float values for starting position - *xyz_to* = [x, y, z] : list of float values for starting position """ pass
[docs] def plot_section(self, direction='y', position='center', **kwds): """Extended version of plot_section method from pynoddy.output class **Arguments**: - *direction* = 'x', 'y', 'z' : coordinate direction of section plot (default: 'y') - *position* = int or 'center' : cell position of section as integer value or identifier (default: 'center') **Optional Keywords**: - *ax* = matplotlib.axis : append plot to axis (default: create new plot) - *figsize* = (x,y) : matplotlib figsize - *colorbar* = bool : plot colorbar (default: True) - *colorbar_orientation* = 'horizontal' or 'vertical' : orientation of colorbar (default: 'vertical') - *title* = string : plot title - *savefig* = bool : save figure to file (default: show directly on screen) - *cmap* = matplotlib.cmap : colormap (default: YlOrRd) - *fig_filename* = string : figure filename - *ve* = float : vertical exaggeration - *layer_labels* = list of strings: labels for each unit in plot - *layers_from* = noddy history file : get labels automatically from history file - *resolution* = float : set resolution for section (default: self.cube_size) - *model_type* = 'current', 'base' : model type (base "freezed" model can be plotted for comparison) - *data* = np.array : data to plot, if different to block data itself """ if kwds.has_key("data"): super(Experiment, self).plot_section(direction, position, **kwds) else: # get model as section tmp_out = self.get_section(direction, position, **kwds) self.determine_model_stratigraphy() # tmp_out.plot_section(direction = direction, layer_labels = self.model_stratigraphy, **kwds) tmp_out.plot_section(direction=direction, **kwds)
[docs] def export_to_vtk(self, **kwds): """Export model to VTK Export the geology blocks to VTK for visualisation of the entire 3-D model in an external VTK viewer, e.g. Paraview. ..Note:: Requires pyevtk, available for free on: https://github.com/firedrakeproject/firedrake/tree/master/python/evtk **Optional keywords**: - *vtk_filename* = string : filename of VTK file (default: output_name) - *data* = np.array : data array to export to VKT (default: entire block model) - *recompute* = bool : recompute the block model (default: True) - *model_type* = 'current', 'base' : model type (base "freezed" model can be plotted for comparison) ..Note:: If data is defined, the model is not recomputed and the data from this array is plotted """ if kwds.has_key("data"): super(Experiment, self).export_to_vtk(**kwds) else: recompute = kwds.get("recompute", True) # recompute by default if recompute: import pynoddy import pynoddy.output # re-compute noddy model # save temporary file tmp_his_file = "tmp_section.his" tmp_out_file = "tmp_section_out" # reset to base model? if kwds.has_key("model_type") and (kwds['model_type'] == 'base'): # 1. create copy import copy tmp_his = copy.deepcopy(self) tmp_his.events = self.base_events.copy() tmp_his.write_history(tmp_his_file) else: self.write_history(tmp_his_file) pynoddy.compute_model(tmp_his_file, tmp_out_file) # open output # tmp_out = pynoddy.output.NoddyOutput(tmp_out_file) # tmp_out.export_to_vtk(**kwds) super(Experiment, self).set_basename(tmp_out_file) super(Experiment, self).load_model_info() super(Experiment, self).load_geology() super(Experiment, self).export_to_vtk(**kwds)
[docs] def get_section(self, direction='y', position='center', **kwds): """Get geological section of the model (re-computed at required resolution) as noddy object **Arguments**: - *direction* = 'x', 'y', 'z' : coordinate direction of section plot (default: 'y') - *position* = int or 'center' : cell position of section as integer value or identifier (default: 'center') **Optional arguments**: - *resolution* = float : set resolution for section (default: self.cube_size) - *model_type* = 'current', 'base' : model type (base "freezed" model can be plotted for comparison) - *compute_output* = bool : provide output from command line call (default: True) """ compute_output = kwds.get("compute_output", True) self.get_cube_size() self.get_extent() resolution = kwds.get("resolution", self.cube_size) model_type = kwds.get("model_type", 'current') # self.determine_model_stratigraphy() # code copied from noddy.history.HistoryFile.get_drillhole_data() self.get_origin() z_min = kwds.get("z_min", self.origin_z) z_max = kwds.get("z_max", self.extent_z) # 1. create copy import copy tmp_his = copy.deepcopy(self) # 2. set values if direction == 'y': x_min = self.origin_x x_max = self.extent_x if position == 'center' or position == 'centre': # AE and BE friendly :-) y_pos = (self.extent_y - self.origin_y - resolution) / 2. else: # set position excplicity to cell y_pos = position tmp_his.set_origin(x_min, y_pos, z_min) # z_min) tmp_his.set_extent(x_max, resolution, z_max) tmp_his.change_cube_size(resolution) elif direction == 'x': y_min = self.origin_y y_max = self.extent_y if position == 'center' or position == 'centre': x_pos = (self.extent_x - self.origin_x - resolution) / 2. else: # set position excplicity to cell x_pos = position tmp_his.set_origin(x_pos, y_min, z_min) # z_min) tmp_his.set_extent(resolution, y_max, z_max) tmp_his.change_cube_size(resolution) if model_type == 'base': tmp_his.events = self.base_events.copy() # 3. save temporary file tmp_his_file = "tmp_section.his" tmp_his.write_history(tmp_his_file) tmp_out_file = "tmp_section_out" # 4. run noddy import pynoddy.output pynoddy.compute_model(tmp_his_file, tmp_out_file, output=compute_output) # 5. open output tmp_out = pynoddy.output.NoddyOutput(tmp_out_file) # 6. # tmp_out.plot_section(direction = direction, player_labels = self.model_stratigraphy, **kwds) # return tmp_out.block # remove temorary file # find all files that match base name of output file (depends on Noddy compute type!) import os for f in os.listdir('.'): if os.path.splitext(f)[0] == tmp_out_file: os.remove(f) return tmp_out
[docs] def write_parameter_changes(self, filepath): if hasattr(self, 'random_parameter_changes'): # if parameter changes have been stored # ensure directory exists if (not os.path.exists(os.path.dirname(filepath))) and (not os.path.dirname(filepath) == ''): os.makedirs(os.path.dirname(filepath)) # open f = open(filepath, 'w') # todo: write initial values # write header f.write("ChangeNumber") change_list = [] for e, c in self.random_parameter_changes[ 0].iteritems(): # NB: This only works if the same parameters have been changed at each step! for p, v in c.iteritems(): f.write(",Event_%s_%s" % (e, p)) # write parameter: eg. Event_1_Amplitude # retrieve values i = 0 for change in self.random_parameter_changes: change_list.append([]) # list of changes for e, c in change.iteritems(): for p, v in c.iteritems(): change_list[i].append(v) # store value i = i + 1 # write values for i in range(0, len(change_list)): # write ChangeNumber f.write("\n%d" % (i + 1)) # write change values for v in change_list[i]: f.write(",%f" % v) f.close() else: print ("This experiment has no stored changes")