Source code for mux

import pdb
import numpy as np
import openmdao
import openmdao.api as om


[docs]class Mux(om.ExplicitComponent): """ Mux trajectory phases. """
[docs] def __init__(self, **kwargs): """ Instantiate Mux and populate private members. """ super().__init__(**kwargs) self._vars = {} self._input_names = {}
[docs] def initialize(self): """ Declare options. """ # self.options.declare('gauss_transcription_order', default=3, desc='transcription_order') self.options.declare('size_inputs', default=np.array([20, 20]), desc='Size of input arrays to be muxed') self.options.declare('size_output', default=2, desc='Size of the muxed array')
[docs] def add_var(self, name:str, val=1.0, units=None, desc=''): """ Add variable to the mux component. :param name: variable name :type name: str :param val: variable value :type val: np.ndarray :param units: variable units :type units: str :param desc: variable description :type desc: str :return: None """ # Load options size_inputs = self.options['size_inputs'] size_output = self.options['size_output'] mux_num = len(size_inputs) self._vars[name] = {'val': val, 'units': units, 'desc': desc} options = self._vars[name] kwgs = self._vars[name] # Add inputs for the number of arrays to be muxed for the variable name self._input_names[name] = [] n_output = 0 for i in range(mux_num): # Add input names to options dict in_name = '{0}_{1}'.format(name, i) self._input_names[name].append(in_name) # Add inputs to component self.add_input(name=in_name, shape=(size_inputs[i],), **kwgs) # Add partials if i < mux_num-1: ro = np.arange(n_output, n_output + size_inputs[i]-1) else: ro = np.arange(n_output, n_output + size_inputs[i]) # Delete duplicates # cols = np.arange(shapes[i]) if i < mux_num-1: co = np.arange(size_inputs[i]-1) else: co = np.arange(size_inputs[i]) # Declare partials self.declare_partials(of=name, wrt=in_name, rows=ro, cols=co) # Add to output size if i < mux_num - 1: n_output = n_output + (size_inputs[i] - 1) else: n_output = n_output + size_inputs[i] # Add output variable self.add_output(name=name, val=np.zeros(int(size_output),), units=options['units'], desc=options['desc'])
[docs] def compute(self, inputs: openmdao.vectors.default_vector.DefaultVector, outputs: openmdao.vectors.default_vector.DefaultVector): # Load options size_inputs = self.options['size_inputs'] mux_num = len(size_inputs) # Iterate over the variables in the mux component for var in self._vars: # Select input variable name invar = self._input_names[var] # Append inputs of same variable name to vals output_vals=[] for i in range(mux_num): # Extract input array input_array = inputs[invar[i]] if i < mux_num-1: output_vals.append(input_array[np.arange(size_inputs[i]-1)]) else: output_vals.append(input_array[np.arange(size_inputs[i])]) # Write stack of vals to outputs outputs[var] = np.hstack(output_vals)
[docs] def compute_partials(self, inputs:openmdao.vectors.default_vector.DefaultVector, partials: openmdao.vectors.default_vector.DefaultVector): # Load options size_inputs = self.options['size_inputs'] mux_num = len(size_inputs) for var in self._vars: invar = self._input_names[var] for i, iv in enumerate(invar): if i < mux_num-1: partials[var, iv]= np.ones(size_inputs[i]-1) else: partials[var, iv]= np.ones(size_inputs[i])