Commit 9ef94f77 authored by sarsonl's avatar sarsonl
Browse files

Start of full implementation of HYDWS after handover damb->sarsonl

parent e57f4947
......@@ -2,8 +2,12 @@
HYDWS datamodel ORM representation.
"""
from operator import itemgetter
from sqlalchemy import Column, String, Boolean, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import select, func
from hydws.db.base import (ORMBase, CreationInfoMixin, RealQuantityMixin,
TimeQuantityMixin, EpochMixin,
......@@ -36,7 +40,47 @@ class Borehole(CreationInfoMixin('CreationInfo',
*Quantities* are implemented as `QuakeML
<https://quake.ethz.ch/quakeml>`_ quantities.
"""
_sections = relationship("BoreholeSection", back_populates="_borehole")
_sections = relationship("BoreholeSection", back_populates="_borehole",
order_by="BoreholeSection.m_topdepth_value",
cascade='all, delete-orphan')
@hybrid_property
def m_longitude(self):
# min topdepth defines top-section
return min([s for s in self._sections],
key=lambda x: x.m_topdepth_value).m_toplongitude_value
@hybrid_property
def m_latitude(self):
# min topdepth defines top-section
return min([s for s in self._sections],
key=lambda x: x.m_topdepth_value).m_toplatitude_value
@hybrid_property
def m_depth(self):
# max bottomdepth defines bottom-section
return max([s.m_bottomdepth_value for s in self._sections])
@hybrid_property
def m_injectionpoint(self):
"""
Injection point of the borehole. It is defined by the uppermost
section's bottom with casing and an open bottom.
.. note::
The implementation requires boreholes to be linear.
"""
isection = min([s for s in self._sections
if s.m_casingdiameter_value and not s._bottomclosed],
key=lambda x: x.m_bottomdepth_value, default=None)
if not isection:
raise ValueError('Cased borehole has a closed bottom.')
return (isection.bottomlongitude_value,
isection.bottomlatitude_value,
isection.bottomdepth_value)
class BoreholeSection(EpochMixin('Epoch', epoch_type='open',
......@@ -69,7 +113,8 @@ class BoreholeSection(EpochMixin('Epoch', epoch_type='open',
m_borehole_oid = Column(Integer, ForeignKey('borehole._oid'))
_borehole = relationship("Borehole", back_populates="_sections")
_hydraulics = relationship("HydraulicSample", back_populates="_section")
_hydraulics = relationship("HydraulicSample", back_populates="_section",
order_by="HydraulicSample.m_datetime_value")
class HydraulicSample(TimeQuantityMixin('m_datetime'),
......
......@@ -140,3 +140,84 @@ def make_response(obj, mimetype):
response = _make_response(obj)
response.headers['Content-Type'] = mimetype
return response
class DynamicQuery(object):
"""
Dynamic filtering and of query.
Example:
dyn_query = DynamicQuery(query, orm.BoreholeSection)
dyn_query.filter_query([('m_starttime', 'eq', datetime(...))])
"""
def __init__(self, query):
self.query = query
def operator_attr(self, column, op):
"""
Returns method associated with an comparison operator.
If op, op_ or __op__ does not exist, Exception returned.
:returns type: str.
"""
try:
return list(filter(
lambda e: hasattr(column, e % op),
['%s', '%s_', '__%s__']))[0] % op
except IndexError:
raise Exception('Invalid filter operator: %s' % op)
def filter_query(self, orm_class, query_params, filter_condition,
key_suffix="_value"):
"""
Update self.query based on filter_condition.
:param filter_condition: list, ie: [(key,operator,value)]
operator examples:
eq for ==
lt for <
ge for >=
in for in_
like for like
value can be list or a string.
key must belong in self.orm_class.
"""
for f in filter_condition:
try:
key_basename, op, param_name = f
except ValueError:
raise Exception('Invalid filter input: %s' % f)
if key_suffix:
key = "{}{}".format(key_basename, key_suffix)
else:
key = key_basename
#put in try statement?
param_value = query_params.get(param_name)
if not param_value:
continue
# todo: check column type against value type
# and if they don't match then error?
try:
column = getattr(orm_class, key)
except AttributeError:
raise Exception('Invalid filter column: %s' % key)
if op == 'in':
if isinstance(value, list):
filt = column.in_(param_value)
else:
filt = column.in_(param_value.split(','))
else:
attr = self.operator_attr(column, op)
#if param_value == 'null':
# param_value = None
filt = getattr(column, attr)(param_value)
self.query = self.query.filter(filt)
......@@ -11,7 +11,8 @@ HYDWS_DEFAULT_SERVER_PORT = 5000
# ----------------------------------------------------------------------------
# service specific configuration parameters
HYDWS_PATH_BOREHOLES = '/boreholes'
HYDWS_PATH_SECTIONS = '/sections'
HYDWS_PATH_HYDRAULICS='/hydraulics'
FDSN_DEFAULT_NO_CONTENT_ERROR_CODE = 204
FDSN_NO_CONTENT_CODES = (FDSN_DEFAULT_NO_CONTENT_ERROR_CODE, 404)
HYDWS_SERVICE_DOCUMENTATION_URI = 'http://URL/to/hydws/docs/'
......@@ -20,8 +21,8 @@ HYDWS_DEFAULT_OFORMAT = 'json'
HYDWS_OFORMATS = (HYDWS_DEFAULT_OFORMAT, 'xml')
HYDWS_DEFAULT_LEVEL = 'section'
HYDWS_LEVELS = (HYDWS_DEFAULT_LEVEL, 'borehole')
HYDWS_SECTION_LEVELS = (HYDWS_DEFAULT_LEVEL, 'borehole')
HYDWS_HYDRAULIC_LEVELS = (HYDWS_DEFAULT_LEVEL, 'borehole', 'hydraulic')
MIMETYPE_JSON = 'application/json'
MIMETYPE_TEXT = 'text/plain'
ERROR_MIMETYPE = MIMETYPE_TEXT
......
"""
HYDWS datamodel ORM entity de-/serialization facilities.
"""
import logging
import datetime
......@@ -25,12 +26,13 @@ class QuakeMLQuantityField(fields.Field):
_CHECK_ATTRIBUTE = False # We generate the attribute dynamically
ATTRS = ('value', 'uncertainty', 'loweruncertainty', 'upperuncertainty',
'confidencelevel')
logger = logging.getLogger('hydws.server.v1.resource')
def _serialize(self, value, attr, obj, **kwargs):
retval = {}
for _attr in self.ATTRS:
key = f"{_ATTR_PREFIX}{attr}_{_attr}".lower()
value = get_value(obj, key, default=None)
if isinstance(value, datetime.datetime):
retval[_attr] = value.isoformat()
elif value is not None:
......@@ -43,7 +45,6 @@ class SchemaBase(Schema):
"""
Schema base class for object de-/serialization.
"""
publicid = fields.String()
def get_attribute(self, obj, key, default):
"""
......@@ -80,10 +81,11 @@ class HydraulicSampleSchema(SchemaBase):
fluidcomposition = fields.String()
class BoreholeSectionSchema(SchemaBase):
class SectionSchema(SchemaBase):
"""
Schema implementation of a borehole section.
"""
publicid = fields.String()
starttime = fields.DateTime(format='iso')
endtime = fields.DateTime(format='iso')
toplongitude = QuakeMLQuantityField()
......@@ -101,6 +103,8 @@ class BoreholeSectionSchema(SchemaBase):
casingtype = fields.String()
description = fields.String()
class SectionHydraulicSampleSchema(SectionSchema, SchemaBase):
hydraulics = fields.Nested(HydraulicSampleSchema, many=True,
attribute='_hydraulics')
......@@ -109,13 +113,18 @@ class BoreholeSchema(SchemaBase):
"""
Schema implementation of a borehole.
"""
# TODO(damb): Provide a hierarchical implementation of sub_types (e.g.
# CreationInfo, LiteratureSource etc.); create them dynamically (see: e.g.
# QuakeMLQuantityField)
# TODO(damb): Provide a hierarchical implementation of sub_types; create
# them dynamically (see: e.g. QuakeMLQuantityField)
publicid = fields.String()
longitude = QuakeMLQuantityField()
latitude = QuakeMLQuantityField()
depth = QuakeMLQuantityField()
bedrockdepth = QuakeMLQuantityField()
sections = fields.Nested(BoreholeSectionSchema, many=True,
class BoreholeSectionSchema(BoreholeSchema, SchemaBase):
sections = fields.Nested(SectionSchema, many=True,
attribute='_sections')
class BoreholeSectionHydraulicSampleSchema(BoreholeSchema, SchemaBase):
sections = fields.Nested(SectionHydraulicSampleSchema, many=True,
attribute='_sections')
......@@ -17,11 +17,15 @@ Format = functools.partial(
validate=validate.OneOf(settings.HYDWS_OFORMATS))
Level = functools.partial(
LevelSection = functools.partial(
fields.String,
missing=settings.HYDWS_DEFAULT_LEVEL,
validate=validate.OneOf(settings.HYDWS_LEVELS))
validate=validate.OneOf(settings.HYDWS_SECTION_LEVELS))
LevelHydraulic = functools.partial(
fields.String,
missing=settings.HYDWS_DEFAULT_LEVEL,
validate=validate.OneOf(settings.HYDWS_HYDRAULIC_LEVELS))
NoData = functools.partial(
fields.Int,
......@@ -55,8 +59,74 @@ class GeneralSchema(Schema):
nodata = NoData()
format = Format()
class LocationConstraintsSchemaMixin(Schema):
"""
Query parameters for boreholes, location specific.
"""
minlatitude = fields.Float()
maxlatitude = fields.Float()
minlongitude = fields.Float()
maxlongitude = fields.Float()
# do validations on what is accepted as lat and lon
@validates_schema
def validate_lat_long_constraints(self, data):
"""
Validation of latitude and longitude constraints.
"""
maxlatitude = data.get('maxlatitude')
minlatitude = data.get('minlatitude')
maxlongitude = data.get('maxlongitude')
minlongitude = data.get('minlongitude')
if maxlatitude and maxlatitude > 90.0:
raise ValidationError('maxlatitude greater than 90 degrees')
if minlatitude and minlatitude < -90.0:
raise ValidationError('minlatitude less than -90 degrees')
if maxlongitude and maxlongitude > 180.0:
raise ValidationError('maxlongitude greater than 180 degrees')
if minlongitude and minlongitude > -180.0:
raise ValidationError('minlongitude greater than -180 degrees')
if maxlatitude and minlatitude:
if maxlatitude < minlatitude:
raise ValidationError('maxlatitude must be greater than'
'minlatitude')
if maxlongitude and minlongitude:
if maxlongitude < minlongitude:
raise ValidationError('maxlongitude must be greater than'
'minlongitude')
class HydraulicsSchemaMixin(Schema):
"""
Query parameters for hydraulics data.
"""
minbottomflow = fields.Float()
maxbottomflow = fields.Float()
mintopflow = fields.Float()
maxtopflow = fields.Float()
minbottompressure = fields.Float()
maxbottompressure = fields.Float()
mintoppressure = fields.Float()
maxtoppressure = fields.Float()
mintoptemperature = fields.Float()
maxtoptemperature = fields.Float()
minbottomtemperature = fields.Float()
maxbottomtemperature = fields.Float()
minfluiddensity = fields.Float()
maxfluiddensity = fields.Float()
minfluidviscosity = fields.Float()
maxfluidviscosity = fields.Float()
minfluidph = fields.Float()
maxfluidph = fields.Float()
limit = fields.Integer()
offset = fields.Integer()
#XXX Todo sarsonl add constraints vaidation.
class TimeConstraintsSchemaMixin(Schema):
"""
Schema for time related query parameters.
"""
starttime = FDSNWSDateTime(format='fdsnws')
start = fields.Str(load_only=True)
......@@ -116,6 +186,29 @@ class TimeConstraintsSchemaMixin(Schema):
ordered = True
class BoreholeHydraulicDataListResourceSchema(TimeConstraintsSchemaMixin,
class BoreholeHydraulicSampleListResourceSchema(TimeConstraintsSchemaMixin,
HydraulicsSchemaMixin,
GeneralSchema):
"""
Handle optional query parameters for call returning hydraulics
data for specified borehole id.
"""
level = LevelHydraulic()
class BoreholeListResourceSchema(TimeConstraintsSchemaMixin,
LocationConstraintsSchemaMixin,
GeneralSchema):
"""
Handle optional query parameters for call returning borehole
data for geographical area optionally including section data.
"""
level = LevelSection()
class SectionHydraulicSampleListResourceSchema(TimeConstraintsSchemaMixin,
GeneralSchema):
"""
Handle optional query parameters for call returning hydraulics
data for specified borehole id and section id.
"""
pass
......@@ -13,15 +13,52 @@ from hydws.db import orm
from hydws.server import db, settings
from hydws.server.errors import FDSNHTTPError
from hydws.server.misc import (with_fdsnws_exception_handling, decode_publicid,
make_response)
make_response, DynamicQuery)
from hydws.server.v1 import blueprint
from hydws.server.v1.ostream.schema import BoreholeSchema
from hydws.server.v1.parser import BoreholeHydraulicDataListResourceSchema
from hydws.server.v1.ostream.schema import (BoreholeSchema,
BoreholeSectionSchema,
BoreholeSectionHydraulicSampleSchema,
SectionHydraulicSampleSchema,
HydraulicSampleSchema)
from hydws.server.v1.parser import (
BoreholeHydraulicSampleListResourceSchema,
BoreholeListResourceSchema,
SectionHydraulicSampleListResourceSchema)
api_v1 = Api(blueprint)
# Mapping of columns to comparison operator and input parameter.
# [(orm column, operator, input comparison value)]
# Filter on hydraulics fields:
filter_hydraulics = [
('m_datetime', 'ge', 'starttime'),
('m_datetime', 'le', 'endtime'),
('m_toptemperature', 'ge', 'mintoptemperature'),
('m_toptemperature', 'le', 'maxtoptemperature'),
('m_bottomtemperature', 'ge', 'minbottomtemperature'),
('m_bottomtemperature', 'le', 'maxbottomtemperature'),
('m_toppressure', 'ge', 'mintoppressure'),
('m_toppressure', 'le', 'maxtoppressure'),
('m_bottompressure', 'ge', 'minbottompressure'),
('m_bottompressure', 'le', 'maxbottompressure'),
('m_topflow', 'ge', 'mintopflow'),
('m_topflow', 'le', 'maxtopflow'),
('m_bottomflow', 'ge', 'minbottomflow'),
('m_bottomflow', 'le', 'maxbottomflow'),
('m_fluiddensity', 'ge', 'minfluiddensity'),
('m_fluiddensity', 'le', 'maxfluiddensity'),
('m_fluidviscosity', 'ge', 'minfluidviscosity'),
('m_fluidviscosity', 'le', 'maxfluidviscosity'),
('m_fluidph', 'ge', 'minfluidph'),
('m_fluidph', 'le', 'maxfluidph')]
filter_boreholes = [
('m_latitude', 'ge', 'minlatitude'),
('m_latitude', 'le', 'maxlatitude'),
('m_longitude', 'ge', 'minlongitude'),
('m_longitude', 'le', 'maxlongitude')]
class ResourceBase(Resource):
LOGGER = 'hydws.server.v1.resource'
......@@ -65,96 +102,66 @@ class ResourceBase(Resource):
class BoreholeListResource(ResourceBase):
def get(self):
pass
LOGGER = 'hydws.server.v1.boreholelistresource'
class BoreholeResource(ResourceBase):
def get(self, borehole_id):
pass
@with_fdsnws_exception_handling(__version__)
@use_kwargs(BoreholeListResourceSchema(), locations=("query",))
def get(self, **query_params):
self.logger.debug(
f"Received request: "
f"query_params={query_params}")
resp = self._process_request(db.session,
**query_params)
class DynamicFilter(ResourceBase):
"""
if not resp:
self._handle_nodata(query_params)
Dynamic filtering of query.
# TODO(damb): Serialize according to query_param format=JSON|XML
# format response
level = query_params.get('level')
if level == 'borehole':
resp = BoreholeSchema(many=True).dumps(resp)
elif level == 'section':
resp = BoreholeSectionSchema(many=True).dumps(resp)
Example:
dyn_query = DynamicFilter(query, orm.BoreholeSection)
dyn_query.filter_query([('m_starttime', 'eq', datetime(...))])
return make_response(resp, settings.MIMETYPE_JSON)
"""
def _process_request(self, session,
**query_params):
def __init__(self, query, orm_class):
self.query = query
self.orm_class = orm_class
level = query_params.get('level')
query = session.query(orm.Borehole)
if level == 'section':
query = query.join(orm.BoreholeSection)
dynamic_query = DynamicQuery(query)
def operator_attr(self, column, op):
"""
Returns method associated with an comparison operator.
If op, op_ or __op__ does not exist, Exception returned.
# XXX(damb): Emulate QuakeML type Epoch (though on DB level it is
# defined as QuakeML type OpenEpoch
:returns type: str.
# XXX(lsarson): Should there be functionality to add OR queries?
# if so then there should have another method added to DynamicQuery
"""
try:
return list(filter(
lambda e: hasattr(column, e % op),
['%s', '%s_', '__%s__']))[0] % op
except IndexError:
raise Exception('Invalid filter operator: %s' % op)
# XXX(lsarson): Filter None first or query will fail due to type differences.
dynamic_query.filter_query(orm.Borehole, query_params,
filter_boreholes)
try:
return dynamic_query.query.all()
def filter_query(self, filter_condition):
"""
Update self.query based on filter_condition.
:param filter_condition: list, ie: [(key,operator,value)]
operator examples:
eq for ==
lt for <
ge for >=
in for in_
like for like
value can be list or a string.
key must belong in self.orm_class.
except NoResultFound:
return None
"""
class BoreholeHydraulicSampleListResource(ResourceBase):
for f in filter_condition:
try:
key, op, value = f
except ValueError:
raise Exception('Invalid filter input: %s' % f)
column = getattr(self.orm_class, key)
if not column:
raise Exception('Invalid filter column: %s' % key)
if op == 'in':
if isinstance(value, list):
filt = column.in_(value)
else:
filt = column.in_(value.split(','))
else:
attr = self.operator_attr(self, column, op)
if value == 'null':
value = None
print(column, attr, value)
filt = getattr(column, attr)(value)
self.query = self.query.filter(filt)
def return_query(self):
return self.query
class BoreholeHydraulicDataListResource(ResourceBase):
LOGGER = 'hydws.server.v1.boreholehydraulicdatalistresource'
LOGGER = 'hydws.server.v1.boreholehydraulicsamplelistresource'
@with_fdsnws_exception_handling(__version__)
@use_kwargs(BoreholeHydraulicDataListResourceSchema(),
@use_kwargs(BoreholeHydraulicSampleListResourceSchema,
locations=("query", ))
def get(self, borehole_id, **query_params):
print('get')
borehole_id = decode_publicid(borehole_id)
self.logger.debug(
......@@ -169,7 +176,129 @@ class BoreholeHydraulicDataListResource(ResourceBase):
# TODO(damb): Serialize according to query_param format=JSON|XML
# format response
resp = BoreholeSchema().dumps(resp)
print("#################### level: ", query_params.get('level'))
level = query_params.get('level')
if level == 'borehole':
resp = BoreholeSchema(many=True).dumps(resp)
elif level == 'section':
resp = BoreholeSectionSchema(many=True).dumps(resp)
elif level == 'hydraulics':
resp = BoreholeHydraulicSampleSchema(many=True).dumps(resp)
return make_response(resp, settings.MIMETYPE_JSON)
def _process_request(self, session, borehole_id,
**query_params):
if not borehole_id:
raise ValueError(f"Invalid borehole identifier: {borehole_id!r}")
level = query_params.get('level')
query = session.query(orm.Borehole)
if level == 'section':
query = query.join(orm.BoreholeSection)
elif level == 'hydraulics':
query = query.join(orm.HydraulicsSample)
query = query.filter(orm.Borehole.m_publicid==borehole_id)
dynamic_query = DynamicQuery(query)
# XXX(damb): Emulate QuakeML type Epoch (though on DB level it is
# defined as QuakeML type OpenEpoch
# XXX(lsarson): Should there be functionality to add OR queries?
# if so then there should have another method added to DynamicQuery
# XXX(lsarson): Filter None first or query will fail due to type differences.
dynamic_query.filter_query(orm.HydraulicSample, query_params,
filter_hydraulics)
try:
return dynamic_query.query.all()
except NoResultFound:
return None