Module curlew.geology.model
A class representing a time-aware geological model and facilitating interactions with the underlying linked-list of SF instances (that represent each geological structure in the model).
Classes
class GeoModel (fields: list, forward=None, lr=0.01, grid=None)
-
Expand source code
class GeoModel(object): """ A class representing a time-aware geological model and facilitating interactions with the underlying linked-list of SF instances (that represent each geological structure in the model). """ def __init__( self, fields : list, forward=None, lr=0.01, grid=None ): """ Construct a GeoModel from a list of SFs. Parameters ---------- fields : list A list of SF instances representing geological events, from oldest to youngest. This list can include domain boundaries if needed, but non-domain fields (e.g., faults, stratigraphy, etc.) should not be older than these. forward : curlew.fields.NF | optional A neural field taking 2-inputs (scalar value and structure ID) and outputting predictions matching any defined property measurements, such that it learns a forward model used to constrain model geometry with arbitrary (informative) quantitative data. lr : float | optional The learning rate to use for any optimised parameters outside of the neural fields. This includes e.g., fault offsets. grid : curlew.geometry.Grid | optional An optional grid to associate with this GeoModel instance. This will set the `M.grid` variable but is not necessary (i.e. can be `null`; which is the default). """ # set parent and child properties of underlying SFs # (i.e. build our linked list / binary tree of SFs) _linkF( fields) # traverse back down linked list / tree and define IDs def traverse( node, i = 1): node.eid = i if isinstance( node.parent, SF ): i = traverse( node.parent, i+1) if isinstance( node.parent2, SF): i = traverse( node.parent2, i+1) return i traverse( fields[-1] ) # accumulate all fields in this model self.fields = [] def traverse_fields( node ): if isinstance(node, SF): self.fields.append(node) if isinstance(node.parent, SF): traverse_fields( node.parent ) if isinstance(node.parent2, SF): traverse_fields( node.parent2 ) traverse_fields( fields[-1] ) # traverse from last field in the list self.fields = self.fields[::-1] # we want the youngest field last, so reverse the list self.lastEvent = self.fields[-1] # change to evaluate model in some paleo-space self.eidLookup = { f.eid : f for f in self.fields } # create a lookup table for translating event IDs to SF instances # store forward model, if defined self.forward = forward # store grid if defined self.grid = grid # init optimiser for custom parameters (e.g., fault slip) # (these are gathered from all the fields in this model) # also store references to each fields optimiser for easy access. self.optim = {} self.frozen = {'forward':False} # all frozen optimisers will be stored here and prevented from stepping for F in self.fields: # create all "non-mlp" params params = [ param for name, param in F.field.named_parameters() if 'mlp' not in name ] if len(params) > 0: self.optim[F.name + '_params'] = ( # create optimiser and store params torch.optim.SGD( params, lr=lr, momentum=0 ), params ) self.freeze(geometry=False, params=False) # start with everything "unfrozen" def freeze( self, name=None, geometry=True, params=False ): """ Freeze the specified field or parameter. Used to e.g., optimise fault offset while keeping fault geometry fixed. Parameters ------------ name, str | SF | list: The name of the SF to freeze. Can also be a list of names or instances. If None, the specified freeze will be applied to all SFs in this model. Use `'forward'` to address any defined forward model. geometry : bool True if the geometry of the specified SF should be frozen. Default is True. params : bool True if other parameters (e.g., fault slip) associated with the specified SFs should be frozen. Default is False. """ if name is None: name = [f.name for f in self.fields] # apply to all if not isinstance(name, list) or isinstance(name, tuple): name = [name] for f in name: if not isinstance(f, str): f = f.name self.frozen[f] = geometry if f+'_params' in self.optim: self.frozen[ f+'_params' ] = params def prefit(self, epochs, **kwargs): """ Train all SFs in this model to fit their respective constraints in isolation, starting with the youngest field. Parameters ---------- epochs : int The number of epochs to train for. Keywords ---------- All keywords are passed to `curlew.fields.NF.fit(...)`. These include: learning_rate : float, optional Reset each SF's optimiser to the specified learning rate before training. best : bool, optional After training set neural field weights to the best loss. vb : bool, optional Display a tqdm progress bar to monitor training. Returns ------- loss : float The loss of the final (best if best=True) model state. details : dict A more detailed breakdown of the final loss. """ out = {} for F in self.fields[::-1]: _, loss = F.fit( epochs, prefix=F.name, **kwargs ) out.update(loss) # add outputs return out def zero(self): """ Zero all (unfrozen) optimisers associated with the neural fields and other learned parameters (e.g., fault offsets) in this model. """ if (self.forward is not None) and not self.frozen.get('forward', False): self.forward.zero() for f in self.fields: if not self.frozen.get(f.name,False): f.field.zero() # underlying field for k,v in self.optim.items(): if not self.frozen.get(k, False): v[0].zero_grad() # parameter optimiser def step(self, fields=True, forward=True, params=True): """ Step all (unfrozen) optimisers associated with the neural fields and other learned parameters (e.g., fault offsets) in this model. """ if forward and (self.forward is not None) and not self.frozen.get('forward', False): self.forward.step() if fields: for f in self.fields: if not self.frozen.get(f.name,False): f.field.step() # underlying field if params: for k,v in self.optim.items(): if not self.frozen.get(k, False): v[0].step() # parameter optimiser def fit(self, epochs, learning_rate=None, early_stop=(100,1e-4), best=True, vb=True, prefix='Training'): """ Train all SFs in this model to fit the specified constraints simeltaneously. Parameters ---------- epochs : int The number of epochs to train each SF for. learning_rate : float, optional Reset each SF's optimiser to the specified learning rate before training. early_stop : tuple, Tuple containing early stopping criterion. This should be (n,t) such that optimisation stops after n iterations with <= t improvement in the loss. Set to None to disable. Note that early stopping is only applied if `best = True`. best : bool, optional After training set the neural field weights to the best loss. vb : bool, optional Display a tqdm progress bar to monitor training. prefix : str, optional The prefix used for the tqdm progress bar. Returns ------- loss : float The loss of the final (best if best=True) model state. details : dict A more detailed breakdown of the final loss. """ # set learning rate if specified if learning_rate is not None: for F in self.fields: F.field.set_rate( learning_rate ) if self.forward is not None: self.forward.set_rate( learning_rate ) for opt in self.optim.values(): for param_group in opt[0].param_groups: param_group['lr'] = learning_rate # setup progress bar bar = range(epochs) if vb: bar = tqdm(range(epochs), desc=prefix, bar_format="{desc}: {n_fmt}/{total_fmt}|{postfix}") # iterate out = {} best_state = [] best_loss = np.inf best_count = 0 eps = 0 if early_stop is not None: eps = early_stop[1] for epoch in bar: loss = 0 for F in self.fields[::-1]: ll, details = F.field.loss() # compute loss for this field loss = loss + ll # accumulate loss out.update(details) # store for output # also add forward (property) reconstruction loss if self.forward is not None: pp = self.forward.C.pp # position of property constraints pv = self.forward.C.pv # value of property constraints spred = self.fields[-1].predict(pp, combine=True, to_numpy=False) # automatically recursed back throught the linked list. # One Hot encoding if self.forward.H.one_hot: one_hot_encoder = torch.nn.functional.one_hot((spred[:, 1] - 1).long(), num_classes=len(self.fields)) encoded_spred = one_hot_encoder * spred[:, 0][:, None] ppred = self.forward( encoded_spred ) else: ppred = self.forward( spred ) # generate property predictions prop_loss = self.forward.loss_func( ppred, pv ) # compute loss if isinstance( self.forward.H.prop_loss, str): self.forward.H.prop_loss = float(self.forward.H.prop_loss) / prop_loss.item() loss = loss + self.forward.H.prop_loss * prop_loss out['forward'] = (prop_loss.item(),{}) # store best state(s) if (loss.item() < (best_loss+eps)): best_state = [ copy.deepcopy( F.field.state_dict() ) for F in self.fields ] if self.forward is not None: best_state.append( copy.deepcopy( self.forward.state_dict() ) ) best_loss = loss.item() best_count = 0 else: best_count += 1 # early stopping if (early_stop is not None) and (best_count > early_stop[0]): break # update progress if vb: bar.set_postfix({ k : v[0] for k,v in out.items() }) self.zero() # zero gradients loss.backward() # backprop losses self.step() # set best state if best_state: for i,F in enumerate(self.fields): F.field.load_state_dict(best_state[i]) if self.forward is not None: self.forward.load_state_dict(best_state[-1]) # return return loss.item(), out def predict(self, x : np.ndarray ): """ Create model predictions at the specified points. Parameters ---------- x : np.ndarray An array of shape (N, input_dim) containing the coordinates at which to evaluate this GeoModel. Returns -------- S : An array of shape (N,1) containig the predicted scalar values and corresponding SF that "created" them. """ out = self.fields[-1].predict( x, combine=True, to_numpy=False) # automatically recursed back throught the linked list. out = out.cpu().detach().numpy() if out.shape[1] == 1: # add "structure ID" for consistency if needed out = np.hstack([out, np.zeros_like(out)]) out[:,1] = out[:,1].astype(int) # convert field IDs to integers return out def gradient(self, x : np.ndarray, return_vals : bool =False, normalize : bool = True ): """ Evaluate this model using `self.predict(x, **kwds)` and then compute and return the gradients of the underlying scalar fields (as these represent e.g., bedding orientation). Parameters ---------- x : np.ndarray And (N,d) array containing the locations at which to evaluate the model. return_vals : bool True if evaluated scalar values should be returned as well as gradients. Default is False. normalize : bool True if gradient vectors should be normalised to length 1 (i.e. to represent poles to planes). Default is True. Returns -------- Gradient vectors at the specified locations (`x`). If `return_vals` is `True`, tuple (gradients, values) will be returned. """ # compute gradients from final scalar field # (n.b. this automatically recurses back through the linked list) return self.fields[-1].gradient( x, combine=True, to_numpy=True, normalize=normalize, return_vals=return_vals) def classify(self, x : np.ndarray, return_vals=False): """ Evaluate this model using `self.predict(x, **kwds)` and then bin the resulting scalar values according to the isosurfaces associated with the scalar field responsible for each prediction (i.e. after taking account for overprinting relations between the various fields). Parameters ---------- x : np.ndarray And (N,d) array containing the locations at which to evaluate the model. return_vals : bool True if evaluated scalar values should be returned as well as classes. Default is False. Returns ------- class_ids : np.ndarray Class ID's after classifying according to the defined isosurfaces. class_names : np.ndarray Array of unit names corresponding to the above class_ids. """ # evaluate all scalar values pred = self.predict(x) sfv = pred[:, 0].copy() # scalar field values sid = pred[:, 1].astype(int) # structural field IDs # individually classify each of them, and aggregate classes labels = [] classes = pred.copy() classes[:, 0] = classes[:, 1] unique_ids = np.array([f.eid for f in self.fields]).astype(int) for i in range(len(unique_ids)): # mask to get pixels this SF is responsible for mask = (sid == unique_ids[i]) if mask.sum() > 0: # Belongs to normal fields iso = self.fields[i].getIsovalues() isov = np.array( list(iso.values()) ) ixx = np.argsort(isov) inames = np.array( list(iso.keys()) )[ixx] cids = np.digitize( sfv[mask], bins=isov[ixx] ) # store IDs and corresponding labels classes[mask, 0] = cids + len(labels) labels += [('%d_'%i)+n for n in inames] labels += ['overburden'] # we need to add one extra class if return_vals: return classes, labels, pred else: return classes, labels def drill( self, start, end, step ): """ Evaluate the model along a line between start and end with an interval of step. Parameters ----------- start : np.ndarray The start coordinate of the "drillhole" end : np.ndarray The end coordinate of the "drillhole" step : float The distance between points along this line Returns --------- A dictionary containing the following model output arrays ``` {"pos":[...positions], "scalar":[...scalarValues], "fieldID":[...scalar field id], "classID":[...class IDs (as returned by classify(...))], "classNames":[...class Names (as returned by classify(...))] } ``` """ dir = np.array(end) - np.array(start) length = np.linalg.norm(dir) dir = (dir / length)*step pos = np.array([start+dir*i for i in range( int(length / step) ) ]) # evaluate model cids, inames, pred = self.classify( pos, return_vals=True ) # find contacts cmask = np.abs( np.diff( cids[:,0], prepend=cids[0,0] ) ) > 0 ori = self.gradient(pos[cmask], return_vals=False, normalize=True) contacts = dict( pos=pos[cmask], ori=ori, scalar=pred[cmask,0], fieldID=pred[cmask,1].astype(int), classID=cids[cmask,0].astype(int), className=inames ) # return return dict( pos=pos, scalar=pred[:,0], fieldID=pred[:,1].astype(int), classID=cids[:,0].astype(int), className=inames, contacts=contacts ) def evaluate( self, grid, topology=False, buffer=None, surfaces=None, batch_size=50000, vb=True): """ Evaluate a *curlew* model on a grid and extract isosurfaces, topology and/or fault buffers. Parameters ---------- grid : curlew.geometry.Grid | np.ndarray A structured Grid to evaluate the model on (if surfaces are to be calculated), or an array of coordinates (unstructured grid). Isosurfaces cannot be calculate for unstructured grids. topology : bool, optional True if model topology (fault hangingwall and footwall relations) should be calculated and returned. Default is False. buffer : float, optional If not None, this distance (in model coordinates) will be used to compute a buffer of this size on either side of each fault surface. surfaces : str | bool, optional If not None, isosurfaces will be computed and returned. If a string is passed, these will also be saved to PLY in the specified folder. Returns ------- A dict containing some of the following keys: 'topology', 'buffer', 'surfaces'. """ # TODO - extend this to include e.g., lithological classifications, stratigraphic contacts, etc. from curlew.geometry import Grid if isinstance(grid, Grid): gxy = grid.coords() else: surfaces = None # disable surfaces gxy = grid # setup output array out = dict() if buffer: out['buffer'] = np.zeros( len(gxy) ) # initialise fault buffer if topology: out['topology'] = np.zeros( (len(gxy), len(self.fields)) ) # array to store hanging-wall & footwall information if surfaces: out['surfaces'] = {} # recurse through model extracting required info def recurse( f, dmask, i=0 ): # evaluate model from curlew.utils import batchEval if (f.parent2 is not None) or (f.deformation is not None): # ignore stratigraphic fields if vb: print(f"Evaluating field {i}/{len(self.fields)}", end='\r') pred = batchEval( gxy, f.predict, batch_size=batch_size, vb=False)[:,0] pred[dmask] = np.nan # remove masked areas # evaluate topology, buffer & recurse if f.parent2 is not None: # this is a domain boundary iso = f.getIsovalue( f.bound ) if buffer: i0 = f.getIsovalue( f.bound, offset=-buffer ) # lower buffer i1 = f.getIsovalue( f.bound, offset=buffer ) # upper buffer out['buffer'][ (pred >= min(i0, i1)) & (pred <= max(i0, i1)) & (out['buffer'] == 0) ] = f.eid footwall = pred < iso hangingwall = pred >= iso if topology: out['topology'][ footwall, i ] = -1 out['topology'][ hangingwall, i ] = 1 if isinstance(f.parent, SF): recurse( f.parent, dmask=(dmask + footwall ), i=i+1 ) # recurse hangingwall objects if isinstance(f.parent2, SF): recurse( f.parent2, dmask=(dmask + hangingwall), i=i+1 ) # recurse footwall objects elif (f.deformation is not None) and (f.deformation == faultOffset): # this is a fault surface if topology: iso = f.getIsovalue( f.deformation_args['contact'], offset=0 ) # fault surface footwall = pred < iso hangingwall = pred > iso out['topology'][ footwall, i ] = -1 out['topology'][ hangingwall, i ] = 1 if buffer: i0 = f.getIsovalue( f.deformation_args['contact'], offset=-buffer ) # lower buffer i1 = f.getIsovalue( f.deformation_args['contact'], offset=buffer ) # upper buffer out['buffer'][ (pred >= min(i0, i1)) & (pred <= max(i0, i1)) & (out['buffer'] == 0) ] = f.eid if isinstance(f.parent, SF): recurse( f.parent, dmask=dmask, i=i+1 ) # recurse older objects else: iso = None if surfaces: # compute isosurface meshes out['surfaces'][f.name] = {} for k in f.isosurfaces.keys(): if grid.ndim == 3: # 3D verts, faces = grid.contour( pred, iso=f.getIsovalue(k)) out['surfaces'][f.name][k] = (verts, np.array(faces)) if isinstance(surfaces, str) or isinstance(surfaces, Path): from curlew.io import savePLY savePLY( Path( surfaces ) / str(f.name) / f'{str(k)}.ply', xyz = verts, faces = faces ) elif grid.ndim == 2: # 2D contours = grid.contour( pred, iso=f.getIsovalue(k)) out['surfaces'][f.name][k] = contours # traverse from last event in model recurse( self.fields[-1], np.full( len(gxy), False ) ) return out def getField( self, eid ): """ Get the scalar field associated with the specified event ID. Parameters ---------- eid : int The event ID of the scalar field to retrieve. Returns ------- SF The scalar field instance associated with the specified event ID. """ return self.eidLookup.get( int(eid), None) def _getPositions(self, G, node, first_x=0, first_y=0, step_x=10, step_y=10, pos=None): """ Recursively calculate the 2D positions of nodes in a hierarchical structure. Used when plotting the model tree as a 2D graph. Parameters ---------- G : networkx.Digraph The directed graph containing the nodes to be positioned. node : str The current node for which to calculate the position. first_x : int, optional The initial x-coordinate for the current node. first_y : int, optional The initial y-coordinate for the current node. step_x : int, optional The horizontal step size for moving to the right. step_y : int, optional The vertical step size for moving down. pos : dict, optional A dictionary to store the positions of nodes. Returns ------- pos : dict A dictionary mapping each node to its (x, y) position. """ if pos is None: pos = {} # Assign the position to the current node pos[node] = (first_x, first_y) # Get the children of the current node children = list(G.successors(node)) if not children: return pos # If the node is a domain boundary, handle its children differently node_field = next((field for field in self.fields if field.name == node), None) if node_field.parent2 is not None and isinstance(node_field, SF): # Move to the right pos = self._getPositions(G, children[0], first_x + step_x, first_y, step_x, step_y, pos) # Move down pos = self._getPositions(G, children[1], first_x, first_y - step_y, step_x, step_y, pos) else: # For non-domain boundary nodes, move to the right pos = self._getPositions(G, children[0], first_x + step_x, first_y, step_x, step_y, pos) return pos def _repr_svg_(self): """ Visualize the model tree of a GeoModel and return it as an SVG string. Parameters ---------- None Returns ------- str An SVG string representation of the visualized model tree. """ # Create an empty graph try: import networkx as nx except: assert False, "Please install networkx using `pip install networkx`" graph = nx.DiGraph() domain_boundary_color = '#E35B0E' dilative_event_color = "#F0C419" generative_event_color = '#31B4C2' kinematic_event_color = "#A6340B" fixed_value_color = "#FAE8B6" for field in self.fields[::-1]: # Determine the color based on the event type color = None if field.parent2 is not None: # domain boundary color = domain_boundary_color elif field.bound is not None and field.deformation is not None: # dilative event color = dilative_event_color elif field.bound is not None: # generative event color = generative_event_color elif field.deformation is not None: # kinematic event color = kinematic_event_color graph.add_node(field.name, label=field.name, color=color) # Add edges if isinstance(field.parent, SF): graph.add_edge(field.name, field.parent.name) if isinstance(field.parent2, SF): graph.add_edge(field.name, field.parent2.name) if not isinstance(field.parent, SF) and field.parent is not None: # Handle fixed values graph.add_edge(field.name, str(field.parent)) if not isinstance(field.parent2, SF) and field.parent2 is not None: graph.add_edge(field.name, str(field.parent2)) # Plot try: import matplotlib.pyplot as plt except: assert False, "Please install matplotlib to use plotting tools.`" fig, ax = plt.subplots(1,1, figsize=(8, 4)) pos = self._getPositions(graph, list(graph.nodes())[0], step_x=1, step_y=1) node_colors = [graph.nodes[node].get('color', fixed_value_color) for node in graph.nodes()] nx.draw(graph, pos, with_labels=True, arrows=True, node_size=2000, node_color=node_colors, font_size=8, ax=ax) # Legend legend_labels = { domain_boundary_color : 'Domain', dilative_event_color : 'Dilative', generative_event_color : 'Generative', kinematic_event_color : 'Kinematic', fixed_value_color : 'Fixed' } #ax_legend.axis('off') for color, label in legend_labels.items(): ax.scatter([], [], color=color, label=label, s=200) ax.legend(loc='lower center', ncol=5) # Save the figure buffer = io.StringIO() fig.savefig(buffer, format='svg') plt.close(fig) svg = buffer.getvalue() buffer.close() return svg
A class representing a time-aware geological model and facilitating interactions with the underlying linked-list of SF instances (that represent each geological structure in the model).
Construct a GeoModel from a list of SFs.
Parameters
fields
:list
- A list of SF instances representing geological events, from oldest to youngest. This list can include domain boundaries if needed, but non-domain fields (e.g., faults, stratigraphy, etc.) should not be older than these.
forward
:curlew.fields.NF | optional
- A neural field taking 2-inputs (scalar value and structure ID) and outputting predictions matching any defined property measurements, such that it learns a forward model used to constrain model geometry with arbitrary (informative) quantitative data.
lr
:float | optional
- The learning rate to use for any optimised parameters outside of the neural fields. This includes e.g., fault offsets.
grid
:curlew.geometry.Grid | optional
- An optional grid to associate with this GeoModel instance. This will set the
M.grid
variable but is not necessary (i.e. can benull
; which is the default).
Methods
def classify(self, x: numpy.ndarray, return_vals=False)
-
Expand source code
def classify(self, x : np.ndarray, return_vals=False): """ Evaluate this model using `self.predict(x, **kwds)` and then bin the resulting scalar values according to the isosurfaces associated with the scalar field responsible for each prediction (i.e. after taking account for overprinting relations between the various fields). Parameters ---------- x : np.ndarray And (N,d) array containing the locations at which to evaluate the model. return_vals : bool True if evaluated scalar values should be returned as well as classes. Default is False. Returns ------- class_ids : np.ndarray Class ID's after classifying according to the defined isosurfaces. class_names : np.ndarray Array of unit names corresponding to the above class_ids. """ # evaluate all scalar values pred = self.predict(x) sfv = pred[:, 0].copy() # scalar field values sid = pred[:, 1].astype(int) # structural field IDs # individually classify each of them, and aggregate classes labels = [] classes = pred.copy() classes[:, 0] = classes[:, 1] unique_ids = np.array([f.eid for f in self.fields]).astype(int) for i in range(len(unique_ids)): # mask to get pixels this SF is responsible for mask = (sid == unique_ids[i]) if mask.sum() > 0: # Belongs to normal fields iso = self.fields[i].getIsovalues() isov = np.array( list(iso.values()) ) ixx = np.argsort(isov) inames = np.array( list(iso.keys()) )[ixx] cids = np.digitize( sfv[mask], bins=isov[ixx] ) # store IDs and corresponding labels classes[mask, 0] = cids + len(labels) labels += [('%d_'%i)+n for n in inames] labels += ['overburden'] # we need to add one extra class if return_vals: return classes, labels, pred else: return classes, labels
Evaluate this model using
self.predict(x, **kwds)
and then bin the resulting scalar values according to the isosurfaces associated with the scalar field responsible for each prediction (i.e. after taking account for overprinting relations between the various fields).Parameters
x
:np.ndarray
- And (N,d) array containing the locations at which to evaluate the model.
return_vals
:bool
- True if evaluated scalar values should be returned as well as classes. Default is False.
Returns
class_ids
:np.ndarray
- Class ID's after classifying according to the defined isosurfaces.
class_names
:np.ndarray
- Array of unit names corresponding to the above class_ids.
def drill(self, start, end, step)
-
Expand source code
def drill( self, start, end, step ): """ Evaluate the model along a line between start and end with an interval of step. Parameters ----------- start : np.ndarray The start coordinate of the "drillhole" end : np.ndarray The end coordinate of the "drillhole" step : float The distance between points along this line Returns --------- A dictionary containing the following model output arrays ``` {"pos":[...positions], "scalar":[...scalarValues], "fieldID":[...scalar field id], "classID":[...class IDs (as returned by classify(...))], "classNames":[...class Names (as returned by classify(...))] } ``` """ dir = np.array(end) - np.array(start) length = np.linalg.norm(dir) dir = (dir / length)*step pos = np.array([start+dir*i for i in range( int(length / step) ) ]) # evaluate model cids, inames, pred = self.classify( pos, return_vals=True ) # find contacts cmask = np.abs( np.diff( cids[:,0], prepend=cids[0,0] ) ) > 0 ori = self.gradient(pos[cmask], return_vals=False, normalize=True) contacts = dict( pos=pos[cmask], ori=ori, scalar=pred[cmask,0], fieldID=pred[cmask,1].astype(int), classID=cids[cmask,0].astype(int), className=inames ) # return return dict( pos=pos, scalar=pred[:,0], fieldID=pred[:,1].astype(int), classID=cids[:,0].astype(int), className=inames, contacts=contacts )
Evaluate the model along a line between start and end with an interval of step.
Parameters
start
:np.ndarray
- The start coordinate of the "drillhole"
end
:np.ndarray
- The end coordinate of the "drillhole"
step
:float
- The distance between points along this line
Returns
A dictionary containing the following model output arrays
{"pos":[...positions], "scalar":[...scalarValues], "fieldID":[...scalar field id], "classID":[...class IDs (as returned by classify(...))], "classNames":[...class Names (as returned by classify(...))] }
def evaluate(self, grid, topology=False, buffer=None, surfaces=None, batch_size=50000, vb=True)
-
Expand source code
def evaluate( self, grid, topology=False, buffer=None, surfaces=None, batch_size=50000, vb=True): """ Evaluate a *curlew* model on a grid and extract isosurfaces, topology and/or fault buffers. Parameters ---------- grid : curlew.geometry.Grid | np.ndarray A structured Grid to evaluate the model on (if surfaces are to be calculated), or an array of coordinates (unstructured grid). Isosurfaces cannot be calculate for unstructured grids. topology : bool, optional True if model topology (fault hangingwall and footwall relations) should be calculated and returned. Default is False. buffer : float, optional If not None, this distance (in model coordinates) will be used to compute a buffer of this size on either side of each fault surface. surfaces : str | bool, optional If not None, isosurfaces will be computed and returned. If a string is passed, these will also be saved to PLY in the specified folder. Returns ------- A dict containing some of the following keys: 'topology', 'buffer', 'surfaces'. """ # TODO - extend this to include e.g., lithological classifications, stratigraphic contacts, etc. from curlew.geometry import Grid if isinstance(grid, Grid): gxy = grid.coords() else: surfaces = None # disable surfaces gxy = grid # setup output array out = dict() if buffer: out['buffer'] = np.zeros( len(gxy) ) # initialise fault buffer if topology: out['topology'] = np.zeros( (len(gxy), len(self.fields)) ) # array to store hanging-wall & footwall information if surfaces: out['surfaces'] = {} # recurse through model extracting required info def recurse( f, dmask, i=0 ): # evaluate model from curlew.utils import batchEval if (f.parent2 is not None) or (f.deformation is not None): # ignore stratigraphic fields if vb: print(f"Evaluating field {i}/{len(self.fields)}", end='\r') pred = batchEval( gxy, f.predict, batch_size=batch_size, vb=False)[:,0] pred[dmask] = np.nan # remove masked areas # evaluate topology, buffer & recurse if f.parent2 is not None: # this is a domain boundary iso = f.getIsovalue( f.bound ) if buffer: i0 = f.getIsovalue( f.bound, offset=-buffer ) # lower buffer i1 = f.getIsovalue( f.bound, offset=buffer ) # upper buffer out['buffer'][ (pred >= min(i0, i1)) & (pred <= max(i0, i1)) & (out['buffer'] == 0) ] = f.eid footwall = pred < iso hangingwall = pred >= iso if topology: out['topology'][ footwall, i ] = -1 out['topology'][ hangingwall, i ] = 1 if isinstance(f.parent, SF): recurse( f.parent, dmask=(dmask + footwall ), i=i+1 ) # recurse hangingwall objects if isinstance(f.parent2, SF): recurse( f.parent2, dmask=(dmask + hangingwall), i=i+1 ) # recurse footwall objects elif (f.deformation is not None) and (f.deformation == faultOffset): # this is a fault surface if topology: iso = f.getIsovalue( f.deformation_args['contact'], offset=0 ) # fault surface footwall = pred < iso hangingwall = pred > iso out['topology'][ footwall, i ] = -1 out['topology'][ hangingwall, i ] = 1 if buffer: i0 = f.getIsovalue( f.deformation_args['contact'], offset=-buffer ) # lower buffer i1 = f.getIsovalue( f.deformation_args['contact'], offset=buffer ) # upper buffer out['buffer'][ (pred >= min(i0, i1)) & (pred <= max(i0, i1)) & (out['buffer'] == 0) ] = f.eid if isinstance(f.parent, SF): recurse( f.parent, dmask=dmask, i=i+1 ) # recurse older objects else: iso = None if surfaces: # compute isosurface meshes out['surfaces'][f.name] = {} for k in f.isosurfaces.keys(): if grid.ndim == 3: # 3D verts, faces = grid.contour( pred, iso=f.getIsovalue(k)) out['surfaces'][f.name][k] = (verts, np.array(faces)) if isinstance(surfaces, str) or isinstance(surfaces, Path): from curlew.io import savePLY savePLY( Path( surfaces ) / str(f.name) / f'{str(k)}.ply', xyz = verts, faces = faces ) elif grid.ndim == 2: # 2D contours = grid.contour( pred, iso=f.getIsovalue(k)) out['surfaces'][f.name][k] = contours # traverse from last event in model recurse( self.fields[-1], np.full( len(gxy), False ) ) return out
Evaluate a curlew model on a grid and extract isosurfaces, topology and/or fault buffers.
Parameters
grid
:curlew.geometry.Grid | np.ndarray
- A structured Grid to evaluate the model on (if surfaces are to be calculated), or an array of coordinates (unstructured grid). Isosurfaces cannot be calculate for unstructured grids.
topology
:bool
, optional- True if model topology (fault hangingwall and footwall relations) should be calculated and returned. Default is False.
buffer
:float
, optional- If not None, this distance (in model coordinates) will be used to compute a buffer of this size on either side of each fault surface.
surfaces
:str | bool
, optional- If not None, isosurfaces will be computed and returned. If a string is passed, these will also be saved to PLY in the specified folder.
Returns
A dict containing some of the following keys: 'topology', 'buffer', 'surfaces'.
def fit(self,
epochs,
learning_rate=None,
early_stop=(100, 0.0001),
best=True,
vb=True,
prefix='Training')-
Expand source code
def fit(self, epochs, learning_rate=None, early_stop=(100,1e-4), best=True, vb=True, prefix='Training'): """ Train all SFs in this model to fit the specified constraints simeltaneously. Parameters ---------- epochs : int The number of epochs to train each SF for. learning_rate : float, optional Reset each SF's optimiser to the specified learning rate before training. early_stop : tuple, Tuple containing early stopping criterion. This should be (n,t) such that optimisation stops after n iterations with <= t improvement in the loss. Set to None to disable. Note that early stopping is only applied if `best = True`. best : bool, optional After training set the neural field weights to the best loss. vb : bool, optional Display a tqdm progress bar to monitor training. prefix : str, optional The prefix used for the tqdm progress bar. Returns ------- loss : float The loss of the final (best if best=True) model state. details : dict A more detailed breakdown of the final loss. """ # set learning rate if specified if learning_rate is not None: for F in self.fields: F.field.set_rate( learning_rate ) if self.forward is not None: self.forward.set_rate( learning_rate ) for opt in self.optim.values(): for param_group in opt[0].param_groups: param_group['lr'] = learning_rate # setup progress bar bar = range(epochs) if vb: bar = tqdm(range(epochs), desc=prefix, bar_format="{desc}: {n_fmt}/{total_fmt}|{postfix}") # iterate out = {} best_state = [] best_loss = np.inf best_count = 0 eps = 0 if early_stop is not None: eps = early_stop[1] for epoch in bar: loss = 0 for F in self.fields[::-1]: ll, details = F.field.loss() # compute loss for this field loss = loss + ll # accumulate loss out.update(details) # store for output # also add forward (property) reconstruction loss if self.forward is not None: pp = self.forward.C.pp # position of property constraints pv = self.forward.C.pv # value of property constraints spred = self.fields[-1].predict(pp, combine=True, to_numpy=False) # automatically recursed back throught the linked list. # One Hot encoding if self.forward.H.one_hot: one_hot_encoder = torch.nn.functional.one_hot((spred[:, 1] - 1).long(), num_classes=len(self.fields)) encoded_spred = one_hot_encoder * spred[:, 0][:, None] ppred = self.forward( encoded_spred ) else: ppred = self.forward( spred ) # generate property predictions prop_loss = self.forward.loss_func( ppred, pv ) # compute loss if isinstance( self.forward.H.prop_loss, str): self.forward.H.prop_loss = float(self.forward.H.prop_loss) / prop_loss.item() loss = loss + self.forward.H.prop_loss * prop_loss out['forward'] = (prop_loss.item(),{}) # store best state(s) if (loss.item() < (best_loss+eps)): best_state = [ copy.deepcopy( F.field.state_dict() ) for F in self.fields ] if self.forward is not None: best_state.append( copy.deepcopy( self.forward.state_dict() ) ) best_loss = loss.item() best_count = 0 else: best_count += 1 # early stopping if (early_stop is not None) and (best_count > early_stop[0]): break # update progress if vb: bar.set_postfix({ k : v[0] for k,v in out.items() }) self.zero() # zero gradients loss.backward() # backprop losses self.step() # set best state if best_state: for i,F in enumerate(self.fields): F.field.load_state_dict(best_state[i]) if self.forward is not None: self.forward.load_state_dict(best_state[-1]) # return return loss.item(), out
Train all SFs in this model to fit the specified constraints simeltaneously.
Parameters
epochs
:int
- The number of epochs to train each SF for.
learning_rate
:float
, optional- Reset each SF's optimiser to the specified learning rate before training.
early_stop
:tuple,
- Tuple containing early stopping criterion. This should be (n,t) such that optimisation
stops after n iterations with <= t improvement in the loss. Set to None to disable. Note
that early stopping is only applied if
best = True
. best
:bool
, optional- After training set the neural field weights to the best loss.
vb
:bool
, optional- Display a tqdm progress bar to monitor training.
prefix
:str
, optional- The prefix used for the tqdm progress bar.
Returns
loss
:float
- The loss of the final (best if best=True) model state.
details
:dict
- A more detailed breakdown of the final loss.
def freeze(self, name=None, geometry=True, params=False)
-
Expand source code
def freeze( self, name=None, geometry=True, params=False ): """ Freeze the specified field or parameter. Used to e.g., optimise fault offset while keeping fault geometry fixed. Parameters ------------ name, str | SF | list: The name of the SF to freeze. Can also be a list of names or instances. If None, the specified freeze will be applied to all SFs in this model. Use `'forward'` to address any defined forward model. geometry : bool True if the geometry of the specified SF should be frozen. Default is True. params : bool True if other parameters (e.g., fault slip) associated with the specified SFs should be frozen. Default is False. """ if name is None: name = [f.name for f in self.fields] # apply to all if not isinstance(name, list) or isinstance(name, tuple): name = [name] for f in name: if not isinstance(f, str): f = f.name self.frozen[f] = geometry if f+'_params' in self.optim: self.frozen[ f+'_params' ] = params
Freeze the specified field or parameter. Used to e.g., optimise fault offset while keeping fault geometry fixed.
Parameters
- name, str | SF | list:
- The name of the SF to freeze. Can also be a list of names or instances. If None,
- the specified freeze will be applied to all SFs in this model. Use
'forward'
to - address any defined forward model.
geometry
:bool
- True if the geometry of the specified SF should be frozen. Default is True.
params
:bool
- True if other parameters (e.g., fault slip) associated with the specified SFs should be frozen. Default is False.
def getField(self, eid)
-
Expand source code
def getField( self, eid ): """ Get the scalar field associated with the specified event ID. Parameters ---------- eid : int The event ID of the scalar field to retrieve. Returns ------- SF The scalar field instance associated with the specified event ID. """ return self.eidLookup.get( int(eid), None)
Get the scalar field associated with the specified event ID.
Parameters
eid
:int
- The event ID of the scalar field to retrieve.
Returns
SF
- The scalar field instance associated with the specified event ID.
def gradient(self, x: numpy.ndarray, return_vals: bool = False, normalize: bool = True)
-
Expand source code
def gradient(self, x : np.ndarray, return_vals : bool =False, normalize : bool = True ): """ Evaluate this model using `self.predict(x, **kwds)` and then compute and return the gradients of the underlying scalar fields (as these represent e.g., bedding orientation). Parameters ---------- x : np.ndarray And (N,d) array containing the locations at which to evaluate the model. return_vals : bool True if evaluated scalar values should be returned as well as gradients. Default is False. normalize : bool True if gradient vectors should be normalised to length 1 (i.e. to represent poles to planes). Default is True. Returns -------- Gradient vectors at the specified locations (`x`). If `return_vals` is `True`, tuple (gradients, values) will be returned. """ # compute gradients from final scalar field # (n.b. this automatically recurses back through the linked list) return self.fields[-1].gradient( x, combine=True, to_numpy=True, normalize=normalize, return_vals=return_vals)
Evaluate this model using
self.predict(x, **kwds)
and then compute and return the gradients of the underlying scalar fields (as these represent e.g., bedding orientation).Parameters
x
:np.ndarray
- And (N,d) array containing the locations at which to evaluate the model.
return_vals
:bool
- True if evaluated scalar values should be returned as well as gradients. Default is False.
normalize
:bool
- True if gradient vectors should be normalised to length 1 (i.e. to represent poles to planes). Default is True.
Returns
Gradient vectors at the specified locations (
x
). Ifreturn_vals
isTrue
, tuple (gradients, values) will be returned. def predict(self, x: numpy.ndarray)
-
Expand source code
def predict(self, x : np.ndarray ): """ Create model predictions at the specified points. Parameters ---------- x : np.ndarray An array of shape (N, input_dim) containing the coordinates at which to evaluate this GeoModel. Returns -------- S : An array of shape (N,1) containig the predicted scalar values and corresponding SF that "created" them. """ out = self.fields[-1].predict( x, combine=True, to_numpy=False) # automatically recursed back throught the linked list. out = out.cpu().detach().numpy() if out.shape[1] == 1: # add "structure ID" for consistency if needed out = np.hstack([out, np.zeros_like(out)]) out[:,1] = out[:,1].astype(int) # convert field IDs to integers return out
Create model predictions at the specified points.
Parameters
x
:np.ndarray
- An array of shape (N, input_dim) containing the coordinates at which to evaluate this GeoModel.
Returns
S
:An array
ofshape (N,1) containig the predicted scalar values and corresponding SF
- that "created" them.
def prefit(self, epochs, **kwargs)
-
Expand source code
def prefit(self, epochs, **kwargs): """ Train all SFs in this model to fit their respective constraints in isolation, starting with the youngest field. Parameters ---------- epochs : int The number of epochs to train for. Keywords ---------- All keywords are passed to `curlew.fields.NF.fit(...)`. These include: learning_rate : float, optional Reset each SF's optimiser to the specified learning rate before training. best : bool, optional After training set neural field weights to the best loss. vb : bool, optional Display a tqdm progress bar to monitor training. Returns ------- loss : float The loss of the final (best if best=True) model state. details : dict A more detailed breakdown of the final loss. """ out = {} for F in self.fields[::-1]: _, loss = F.fit( epochs, prefix=F.name, **kwargs ) out.update(loss) # add outputs return out
Train all SFs in this model to fit their respective constraints in isolation, starting with the youngest field.
Parameters
epochs
:int
- The number of epochs to train for.
Keywords
All keywords are passed to
NF.fit()(…)
. These include: learning_rate : float, optional Reset each SF's optimiser to the specified learning rate before training. best : bool, optional After training set neural field weights to the best loss. vb : bool, optional Display a tqdm progress bar to monitor training.Returns
loss
:float
- The loss of the final (best if best=True) model state.
details
:dict
- A more detailed breakdown of the final loss.
def step(self, fields=True, forward=True, params=True)
-
Expand source code
def step(self, fields=True, forward=True, params=True): """ Step all (unfrozen) optimisers associated with the neural fields and other learned parameters (e.g., fault offsets) in this model. """ if forward and (self.forward is not None) and not self.frozen.get('forward', False): self.forward.step() if fields: for f in self.fields: if not self.frozen.get(f.name,False): f.field.step() # underlying field if params: for k,v in self.optim.items(): if not self.frozen.get(k, False): v[0].step() # parameter optimiser
Step all (unfrozen) optimisers associated with the neural fields and other learned parameters (e.g., fault offsets) in this model.
def zero(self)
-
Expand source code
def zero(self): """ Zero all (unfrozen) optimisers associated with the neural fields and other learned parameters (e.g., fault offsets) in this model. """ if (self.forward is not None) and not self.frozen.get('forward', False): self.forward.zero() for f in self.fields: if not self.frozen.get(f.name,False): f.field.zero() # underlying field for k,v in self.optim.items(): if not self.frozen.get(k, False): v[0].zero_grad() # parameter optimiser
Zero all (unfrozen) optimisers associated with the neural fields and other learned parameters (e.g., fault offsets) in this model.