"""
The sensor module contains classes to define point or camera sensors
that can either be stationary and mobile.
.. rubric:: Contents
.. autosummary::
Sensor
Position
Stationary
Mobile
Detector
Point
Camera
"""
from __future__ import print_function, division
import pandas as pd
import numpy as np
from scipy.interpolate import griddata
from scipy.spatial.distance import cdist
from scipy import integrate
from scipy import ndimage as sn
[docs]class Sensor(object):
def __init__(self, position=None, detector=None):
"""
Defines a sensor object and methods to calculate detection.
Parameters
----------
position : chama Position
Sensor position
detector : chama Detector
Sensor detector, determines the method used to calculate detection
"""
self.name = None
if position is None or not isinstance(position, Position):
raise ValueError('Must specify a Position object to create a '
'Sensor')
self.position = position
if detector is None or not isinstance(detector, Detector):
raise ValueError('Must specify a Detector object to create a '
'Sensor')
self.detector = detector
[docs] def get_detected_signal(self, signal, interp_method=None,
min_distance=10.0):
"""
Returns the detected signal.
"""
return self.detector.get_detected_signal(signal, self.position,
interp_method, min_distance)
[docs]class Position(object):
def __init__(self, location=None):
"""
Defines a sensor's position.
Parameters
----------
location : (x,y,z) tuple or index
The location of the Position object defined as an (x,y,z) tuple
or location index, which can be a string or integer
"""
self.location = location
def __call__(self, time):
"""
Returns the location at the specified time
Parameters
----------
time : int or float
Returns
-------
Location tuple (x,y,z) or (j)
"""
if isinstance(self.location, tuple):
return tuple(self.location)
else:
return tuple([self.location])
[docs]class Stationary(Position):
"""
Defines a stationary sensor's position.
"""
pass
[docs]class Mobile(Position):
"""
Defines a mobile sensor's position.
A mobile position moves according to defined waypoints and speed. The
mobile position is assumed to move in a straight line between waypoints
and will repeat its path if needed.
Parameters
----------
locations : list of (x,y,z) tuples
List of (x,y,z) tuples defining the waypoints of the mobile
sensor's path.
speed : int or float
The speed of the mobile sensor in units consistent
with the waypoints and sensor sample_times
repeat : bool
Boolean indicating if the path should repeat
"""
def __init__(self, locations=None, speed=1, start_time=0, repeat=False):
super(Mobile, self).__init__(locations)
self.speed = speed
self.start_time = start_time
self.repeat = repeat
self._d_btwn_locs = None
def __call__(self, time):
"""
Returns the position (x,y,z) at the specified time.
Parameters
----------
time : int or float
Returns
-------
tuple
The (x,y,z) location
"""
# Calculate distance traveled at specified time
delta_time = time - self.start_time
if delta_time < 0:
delta_time = 0
distance = self.speed * delta_time
temp_locs = [np.array(i) for i in self.location]
if self.repeat: # if path repeats
temp_locs.append(temp_locs[0])
if self._d_btwn_locs is None:
# Distances between consecutive points
self._d_btwn_locs = \
[np.linalg.norm(temp_locs[i] - temp_locs[i + 1])
for i in range(len(temp_locs) - 1)]
if self.repeat: # if path repeats
while distance > sum(self._d_btwn_locs):
distance -= sum(self._d_btwn_locs)
else:
if distance > sum(self._d_btwn_locs):
distance = sum(self._d_btwn_locs)
i = 0
# Figure out which line segment
for i, _ in enumerate(self._d_btwn_locs):
if sum(self._d_btwn_locs[:i + 1]) >= distance:
distance -= sum(self._d_btwn_locs[:i])
break
# The two waypoints defining the line segment
loc1 = temp_locs[i]
loc2 = temp_locs[i + 1]
location = loc1 + (loc2 - loc1) * (distance / self._d_btwn_locs[i])
return tuple(location)
[docs]class Detector(object):
"""
Defines a sensor's detector.
Parameters
----------
threshold : int
The minimum signal that can be detected by the sensor
sample_times : list of ints or floats
List of the sensor's sample/measurement times
"""
def __init__(self, threshold=None, sample_times=None):
self.threshold = threshold
self.sample_times = sample_times
self.sample_points = None
[docs] def get_sample_points(self, position):
"""
Returns the sensor sample points in the form (t,x,y,z) or (t,j)
Parameters
----------
position : chama Position
The position of the sensor
Returns
-------
A list of sample points in the form (t,x,y,z) or (t,j)
"""
if self.sample_points is None:
self.sample_points = [(t,) + position(t) for t in
self.sample_times]
return self.sample_points
[docs] def get_detected_signal(self, signal, position, interp_method,
min_distance):
"""
Returns the signal detected by the sensor.
Parameters
----------
signal : pandas DataFrame
DataFrame with the multi-index (T, X, Y, Z) or (T, Node) and columns
containing the concentrations for different scenarios
position : chama Position
The position of the sensor
interp_method : 'linear', 'nearest', or None
Method used to interpolate the signal if needed.
A value of 'linear' will use griddata to interpolate missing
sample points. A value of 'nearest' will set the sample point to
the nearest signal point within a minimum distance of min_distance.
If there are no signal points within this distance then the
signal will be set to zero at the sample point.
min_distance : float
The minimum distance when using the 'nearest' interp_method
Returns
-------
A pandas Series with multi-index (T, Scenario) and signal values above
the sensor threshold.
"""
pts = self.get_sample_points(position)
if len(pts) == 0:
return pd.Series()
signal_sample = self._get_signal_at_sample_points(signal, pts,
interp_method,
min_distance)
if len(signal_sample) == 0:
return pd.Series()
# Reset the index
signal_sample = signal_sample.reset_index()
# At this point we don't need the X,Y,Z or Node columns
if set(['X', 'Y', 'Z']) < set(list(signal_sample.columns)):
signal_sample.drop(['X', 'Y', 'Z'], inplace=True, axis=1)
elif set(['Node']) < set(list(signal_sample.columns)):
signal_sample.drop(['Node'], inplace=True, axis=1)
else:
raise ValueError('Unrecognized signal format')
return
# Set T as the index
signal_sample = signal_sample.set_index('T')
# Apply threshold
signal_sample = signal_sample[signal_sample >= self.threshold]
if len(signal_sample) == 0:
return pd.Series()
# Name the columns so that the index is labeled after stacking
signal_sample.columns.name = 'Scenario'
# Drop Nan and stack by index
return signal_sample.stack()
def _get_signal_at_sample_points(self, signal, sample_points,
interp_method, min_distance):
raise NotImplementedError()
[docs]class Point(Detector):
"""
Defines a point sensor.
"""
def _get_signal_at_sample_points(self, signal, sample_points,
interp_method, min_distance):
"""
Returns the signal at the sensor sample points. If a sample point
does not exist in the signal DataFrame then interpolate the signal.
Parameters
-----------
signal : pandas DataFrame
sample_points : list of tuples
interp_method : 'linear', 'nearest', or None
min_distance : float
Returns
---------
pandas DataFrame has a multi-index containing all of the
sample_points and columns for each scenario with the
concentration at each sample point
"""
# Get subset of signal. If a sample point is not in signal then NaN
# is inserted
try:
signal_subset = signal.loc[sample_points, :]
except: # Some sample points are not in the signal
if ['T', 'X', 'Y', 'Z'] == signal.index.names:
signal_subset = pd.DataFrame(data=np.nan, columns=signal.columns,
index=pd.MultiIndex.from_tuples(sample_points, names=['T', 'X', 'Y', 'Z']))
elif ['T', 'Node'] == signal.index.names:
signal_subset = pd.DataFrame(data=np.nan, columns=signal.columns,
index=pd.MultiIndex.from_tuples(sample_points, names=['T', 'Node']))
else:
raise ValueError('Unrecognized signal format')
sample_points_in_signal = list(set(sample_points).intersection(set(signal.index)))
signal_subset.loc[sample_points_in_signal, :] = signal.loc[sample_points_in_signal, :]
if interp_method is None:
return signal_subset
# Get the sample_points that need to be interpolated
temp = signal_subset.isnull().any(axis=1) # Get rows containing NaN
interp_points = list(signal_subset[temp].index) # Get their index
if len(interp_points) == 0:
return signal_subset
# TODO: Revisit the distance calculation.
# Scaling issue by including both time and xyz location in distance
# calculation. Manually select the signal times bordering
# interp_point times BEFORE calculating the distance? Or include a
# time scaling parameter as an additional input?
# get the distance between the signal points and the interp_points
signal_points = list(signal.index)
distdata = cdist(signal_points, interp_points)
# Might not want to build this data frame when signal is very large
dist = pd.DataFrame(data=distdata, index=signal.index)
if interp_method == 'linear':
# Loop over interp_points
for i in range(len(dist.columns)):
temp = dist.iloc[:, i]
# Get the rows within dist_factor of the minimum distance
dist_factor = 2
temp2 = temp[temp < temp.min() * dist_factor]
# Ensures that we get enough points to do the interpolation
while len(temp2) < 100:
dist_factor += 1
temp2 = temp[temp < temp.min() * dist_factor]
temp_signal = signal.loc[temp2.index, :]
# Loop over scenarios
for j in signal.columns:
interp_signal = griddata(list(temp_signal.index),
list(temp_signal.loc[:, j]),
interp_points[i],
method=interp_method,
rescale=True)
if np.isnan(interp_signal):
raise ValueError('Trying to interpolate a sample '
'point outside of the signal grid. '
'Make sure that all sensor '
'locations are contained in the '
'area spanned by the signal data.')
signal_subset.at[interp_points[i], j] = interp_signal
elif interp_method == 'nearest':
# Loop over interp_points
for i in range(len(dist.columns)):
temp = dist.iloc[:, i]
if temp.min() > min_distance:
# Loop over scenarios
for j in signal.columns:
interp_signal = 0.0
signal_subset.at[interp_points[i], j] = interp_signal
else:
temp2 = temp[temp < min_distance]
temp_signal = signal.loc[temp2.index, :]
# Loop over scenarios
for j in signal.columns:
interp_signal = griddata(list(temp_signal.index),
list(temp_signal.loc[:, j]),
interp_points[i],
method=interp_method,
rescale=True)
signal_subset.at[interp_points[i], j] = interp_signal
else:
raise ValueError('Unrecognized or unsupported interpolation method'
' "%s" was specified. Only "linear" or "nearest" '
'interpolations are supported' % interp_method)
return signal_subset
[docs]class Camera(Detector):
"""
Defines a camera sensor.
Parameters
----------
threshold : int
The minimum number of pixels that must detect something in order for
the camera to detect.
sample_times : list of ints or floats
List of the sensor's sample/measurement times
direction : (x, y, z) tuple
Tuple representing the direction that the camera is pointing in (x,
y, z) coordinates relative to the origin
**kwds : dictionary
Keyword arguments for setting parameter values in the camera model
"""
def __init__(self, threshold=None, sample_times=None,
direction=(1, 1, 1), **kwds):
super(Camera, self).__init__(threshold, sample_times)
# Direction of the camera relative to the origin
self.direction = direction
# Set default camera properties
# Transmission coefficient of air
self.tau_air = kwds.pop('tau_air', 1)
# Maximum distance that the camera can detect in (m)
self.dist = kwds.pop('dist', 500.0)
# TODO: Get descriptions of these from Arvind
self.netd = kwds.pop('netd', 0.015)
self.f_number = kwds.pop('f_number', 1.5)
self.e_a = kwds.pop('e_a', 0.1)
self.e_g = kwds.pop('e_g', 0.5)
self.T_g = kwds.pop('T_g', 300)
self.T_plume = kwds.pop('T_plume', 300)
self.lambda1 = kwds.pop('lambda1', 3.2E-6)
self.lambda2 = kwds.pop('lambda2', 3.4E-6)
self.fov1 = kwds.pop('fov1', 24 * np.pi / 180)
self.fov2 = kwds.pop('fov2', 18 * np.pi / 180)
self.a_d = kwds.pop('a_d', 9.0E-10)
self.Kav = kwds.pop('Kav', 2.191e-20)
# Constants used in the camera model
self.NA = 6.02E23 # Avogadro's number
self.h = 6.626e-34 # Planck's constant [J-s]
self.SIGMA = 5.67e-8 # Stefan-Boltzmann constant [W/m^2-K^4]
self.c = 3.0e8 # Speed of light [m/s]
self.k = 1.38e-23 # Boltzmann's constant [J/K]
def _get_signal_at_sample_points(self, signal, sample_points,
interp_method, min_distance):
"""
Defines detection as seen by a camera sensor. Not just
selecting/interpolating a subset of the signal DataFrame. We are using
the CONCENTRATION signal DataFrame to calculate the PIXEL signal at the
sample points.
Parameters
-----------
signal : pandas DataFrame
DataFrame has a multi-index with (T, X, Y, Z) points
and each column in the frame contains concentration
values at those points for one scenario
sample_points : list of tuples (t,x,y,z)
Returns
---------
pandas DataFrame has a multi-index with the sensor's sample_points
(T,X,Y,Z) and each column contains the number of pixels that
detected something for one scenario
"""
# TODO: Add option to specify a different camera direction at each
# sample point
CamDir = self.direction
# Reset the index and set it to T
allConc = signal.reset_index().set_index('T')
# Create DataFrame to be returned
newidx = pd.MultiIndex.from_tuples(sample_points,
names=('T', 'X', 'Y', 'Z'))
detected_pixels = pd.DataFrame(None, index=newidx,
columns=signal.columns)
# Check if all the sample times are time points included in the
# signal dataframe
signal_t = set(allConc.index)
sample_t = set([p[0] for p in sample_points])
if not sample_t.issubset(signal_t):
raise ValueError('All sampling times for a camera sensor must be '
'contained in the signal data')
# print(' Calculating camera signal detection')
# TODO: Add interpolation for non-gridded or sparse signal data
for point in sample_points:
time = point[0]
# print(' Time: ', time)
CamLoc = point[1:]
# We assume that every sample time is in the signal DataFrame
# Extract the rows at the sample time
Conc = allConc.loc[time, :]
# Might want to move the below calculations to a new function to
# avoid deeply nested for-loops. Any way to vectorize??
# No longer need T column
Conc = Conc.reset_index(drop=True)
# Set and sort the index so that we can guarantee the order of
# the rows and use numpy reshape to do the conversion to a 3D array
Conc = Conc.set_index(['X', 'Y', 'Z'])
Conc = Conc.sort_index()
# Get all the unique X, Y, and Z grid points
gridpoints = list(Conc.index)
groupedpoints = list(zip(*gridpoints))
X = np.unique(groupedpoints[0])
Y = np.unique(groupedpoints[1])
Z = np.unique(groupedpoints[2])
# Check if signal is on a regular grid by looking at the number
# of rows
nx = len(X)
ny = len(Y)
nz = len(Z)
if nx * ny * nz != Conc.shape[0]:
raise ValueError('The camera sensor requires signal data '
'to be on a regular grid')
# Check to make sure X,Y,Z points are equally spaced
xdiff = np.unique(X[1:] - X[:-1])
ydiff = np.unique(Y[1:] - Y[:-1])
zdiff = np.unique(Z[1:] - Z[:-1])
if len(xdiff) > 1 or len(ydiff) > 1 or len(zdiff) > 1:
raise ValueError('The camera sensor requires signal data to '
'be equally spaced over a particular '
'spatial axis (i.e. X, Y, and Z)')
# Calculate angles (horizontal and vertical) associated with the
# camera orientation. The vertical angle is complemented due to
# spherical coordinate convention.
dir1 = np.array(CamDir)
dir2 = dir1 / (np.sqrt(dir1[0] ** 2 + dir1[1] ** 2 + dir1[2] ** 2))
horiz = np.arccos(dir2[0])
vert = np.arccos(dir2[2])
# The camera has 320 X 240 pixels. To speed up computation, this
# has been reduced proportionally to 80 X 60. The horizontal (vert)
# field of view is divided equally among the 80 (60) horizontal
# (vert) pixels
# TODO: convert horiz/vert field of view degrees to parameters
theta_h = np.linspace(horiz - np.pi / 15, horiz + np.pi / 15, 80)
theta_v = np.linspace(vert - np.pi / 20, vert + np.pi / 20, 60)
# factor_x, factor_y, factor_z are used later for
# concentration-pathlength (CPL) calculations. Extrapolation to
# calculate CPL happens in pixel-coordinates rather than real-life
# coordinates. 'dist' is the maximum distance that the IR
# camera can see
Xstep, Ystep, Zstep = X[1] - X[0], Y[1] - Y[0], Z[1] - Z[0]
factor_x = int(self.dist / Xstep)
factor_y = int(self.dist / Ystep)
factor_z = int((self.dist / 5) / Zstep)
p, q = len(theta_h), len(theta_v)
x_end = np.zeros((p, q))
y_end = np.zeros((p, q))
z_end = np.zeros((p, q))
# Calculate the real-life coordinate of a point 'dist' m away for
# each pixel orientation. This is used to calculate CPL. If 'dist'
# goes outside the grid boundary the concentration is set to 0.
for i in range(0, p):
for j in range(0, q):
x_end[i, j] = factor_x * np.cos(theta_h[i]) * \
np.sin(theta_v[j])
y_end[i, j] = factor_y * np.sin(theta_h[i]) * \
np.sin(theta_v[j])
z_end[i, j] = factor_z * np.cos(theta_v[j])
# Because calculations happen in pixel coordinates, the
# location of the camera (start of calculation) and the
# location of far-away point (end of calculation) is converted
# to pixel coordinates.
x_start = (CamLoc[0] - np.min(X)) / Xstep
y_start = (CamLoc[1] - np.min(Y)) / Ystep
z_start = (CamLoc[2] - np.min(Z)) / Zstep
x_end += x_start
y_end += y_start
z_end += z_start
# Calculate camera properties
nep, tec = self._pixelprop()
for scen in Conc.columns:
# Extract the concentration values as a numpy array
ppm = Conc.loc[:, scen].values
# Reshape the concentration column as a 3D array
ppm = ppm.reshape(nx, ny, nz)
IntConc = np.zeros((p, q))
CPL = np.zeros((p, q))
# TODO: Convert this to vector operation to remove for-loops??
for i in range(0, len(theta_h)):
for j in range(0, len(theta_v)):
# Calculate concentration pathlength (CPL)
IntConc[i, j] = self._pathlength(x_start, y_start,
z_start,
x_end[i, j],
y_end[i, j],
z_end[i, j], ppm)
CPL[i, j] = IntConc[i, j] * self.dist
# Convert CPL to image contrast and compare it to nep
# 1e-4 is conversion factor
attn = CPL * self.Kav * self.NA * 1e-4
temp = 1 - 10 ** (-attn)
contrast = temp * np.abs(tec) * self.tau_air
# Count the number of pixels with a contrast greater than nep
pixels = sum(sum(contrast >= nep))
# Camera pixels were truncated to 80 x 60 px, convert pixel
# count back to the original scale
pixel_final = 16 * pixels
detected_pixels.at[point, scen] = pixel_final
# print(detected_pixels)
return detected_pixels
def _pathlength(self, x0, y0, z0, x1, y1, z1, data):
num = 201 # number of points in extrapolation
x = np.linspace(x0, x1, num)
y = np.linspace(y0, y1, num)
z = np.linspace(z0, z1, num)
concs = sn.map_coordinates(data, np.vstack((x, y, z)), order=1)
# CPL as a fraction of total number of points in extrapolation
avgconc = sum(concs) / num
return avgconc
def _pixelprop(self):
"""
Calculates camera properties
Returns
-------
nep : float
Noise-equivalent power
tec : float
Temperature-emissivity contrast
"""
T_a = self.T_g - 20
w1g = self.h * self.c / (self.lambda2 * self.k * self.T_g)
w2g = self.h * self.c / (self.lambda1 * self.k * self.T_g)
n1 = 2 * np.pi * self.k ** 4 * self.T_g ** 3 / \
(self.h ** 3 * self.c ** 2)
temp_y1 = -np.exp(-w1g) * (720 + 720 * w1g + 360 * w1g ** 2 +
120 * w1g ** 3 + 30 * w1g ** 4 +
6 * w1g ** 5 + w1g ** 6)
temp_y2 = -np.exp(-w2g) * (720 + 720 * w2g + 360 * w2g ** 2 +
120 * w2g ** 3 + 30 * w2g ** 4 +
6 * w2g ** 5 + w2g ** 6)
y1 = temp_y2 - temp_y1
y = y1 * n1
nep = y * self.netd * self.a_d / (4 * self.f_number ** 2)
ppixelg = self._pixel_power(self.T_g)
ppixelp = self._pixel_power(self.T_plume)
ppixela = self._pixel_power(T_a)
tec = ppixelp - self.e_g * ppixelg \
- self.e_a * (1 - self.e_g) * ppixela
return nep, tec
def _pixel_power(self, temp):
"""
Calculates the the power incident on a pixel from an infinite blackbody
emitter at a given temperature.
Parameters
-----------
temp : float
Temperature of the emitter (K)
Returns
---------
Power incident on the pixel (W)
"""
# Calculate the nondimensional frequency limits of the sensor
w1 = self.h * self.c / (self.lambda2 * self.k * temp)
w2 = self.h * self.c / (self.lambda1 * self.k * temp)
# Integrate the blackbody radiation over the frequency range
temp_int = integrate.quad(lambda x: x ** 3 / (np.exp(x) - 1), w1, w2)
# calculate the power incident on one camera pixel
frac = temp_int[0] / (np.pi ** 4 / 15)
sblaw = self.SIGMA * temp ** 4 * self.a_d
power = (4 / np.pi) * sblaw * np.tan(self.fov1 / 2) * \
np.tan(self.fov2 / 2)
pixel_power = power * frac
return pixel_power