Commit 15dee06d authored by doetschj's avatar doetschj
Browse files

Implementation of parallel event processing using celery and redis.

parent c1aeb116
......@@ -78,9 +78,12 @@ Trigger:
spread_int : 1e-4
noise_vis : 'True'
noise_vis_interval : 60 # in seconds
time_between_events: 0.0001
Processing:
parallel_processing: 'True'
number_workers : 5
vp : 2600.0 # velocity in m/s
vs : 2000.0 # velocity in m/s
bandpass_f_min : 1000
......@@ -95,7 +98,7 @@ Processing:
lt_window : 700 #in samples #4.2
Locate:
algorithm : 'hom_iso' # other options: hom_iso, hom_aniso, nonlinloc
algorithm : 'hom_iso' # other options: hom_iso, hom_aniso
min_picks : 6
damping : 0.01
......
......@@ -6,3 +6,14 @@ seismicity during rock-laboratory experiments.
# Copyright (c) 2018 by SCCER-SoE and SED at ETHZ
"""
try:
from celery import Celery
celery_app = Celery(main='dug_seis')
# Use previously generated config.
celery_app.config_from_object('dug_seis.celeryconfig')
except ImportError:
pass
# -*- coding: utf-8 -*-
# Broker settings.
broker_url = "redis://"
# Using the database to store task state and results.
result_backend = "redis://"
# List of modules to import when celery starts.
imports = (
"dug_seis.processing.celery_tasks"
)
accept_content = ["pickle"]
task_serializer = "pickle"
result_serializer = "pickle"
task_ignore_result = False
......@@ -44,10 +44,12 @@ def cli(ctx, cfg, verbose, mode, log):
seismicity during rock-laboratory experiments
"""
# kill leftover celery workers
#os.system("pkill -9 - f 'celery worker'")
# Setup logging. By default we log to stdout with ERROR level and to a log
# file (if specified) with INFO level. Setting verbose logs to both
# handlers with DEBUG level.
logger = logging.getLogger()
logger = logging.getLogger('dug-seis')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)-7s %(message)s')
ch = logging.StreamHandler()
......@@ -68,7 +70,7 @@ def cli(ctx, cfg, verbose, mode, log):
exit(-1)
with open(cfg_path) as f:
try:
param = yaml.load(f)
param = yaml.load(f, Loader=yaml.FullLoader)
if len(param['General']['sensor_coords'])/3 != param['General']['sensor_count']:
logger.error('Number of coordinates does not match \"sensor_count\"')
if len(param['Acquisition']['asdf_settings']['station_naming']) != param['General']['sensor_count']:
......
# helper file to collect all celery functions
from dug_seis import celery_app
from dug_seis.processing.event_processing import event_processing
import logging
from logging.handlers import RotatingFileHandler
from celery.utils.log import get_task_logger
import celery
@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
logger = get_task_logger('dug-seis-events')
log = 'dug-seis-events.log'
fh = RotatingFileHandler(log)
fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)-7s %(message)s')
fh.formatter = formatter
logger.addHandler(fh)
logger.propagate = True
@celery_app.task()
def event_processing_celery(param, load_file, trig_time, event_id, classification):
logger = get_task_logger('dug-seis-events')
event_processing(param, load_file, trig_time, event_id, classification, logger)
......@@ -11,18 +11,19 @@ Version 0.0, 23.10.2018, Joseph Doetsch (doetschj)
"""
from matplotlib import pyplot as plt
from obspy.core.event.event import Event as ObsPyEvent
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
from obspy.core.event import WaveformStreamID, Pick, Origin, Arrival, Magnitude
import numpy as np
import pyproj
import logging
import pandas as pd
import matplotlib
matplotlib.use("Agg")
from matplotlib import pyplot as plt
class Event(ObsPyEvent):
def __init__(self, param, wf_stream, event_id, classification):
def __init__(self, param, wf_stream, event_id, classification, logger):
super().__init__(resource_id='smi:%s/event/%d' % (param['General']['project_name'], event_id))
self.param = param
......@@ -34,12 +35,13 @@ class Event(ObsPyEvent):
self.dist = []
self.tcalc = []
self.loc_ind = []
self.logger = logger
if self.classification != 'noise':
t_start_plot = self.wf_stream.traces[0].stats["starttime"]
logging.info('Event ' + str(self.event_id) + ': Time ' + str(t_start_plot)[11:-1] + ', processing started.')
self.logger.info('Event ' + str(self.event_id) + ': Time ' + str(t_start_plot)[11:-1] + ', processing started.')
else:
t_start_plot = self.wf_stream.traces[0].stats["starttime"]
logging.info('Noise Visualisation at ' + str(t_start_plot)[11:-1] + ', processing started.')
self.logger.info('Noise Visualisation at ' + str(t_start_plot)[11:-1] + ', processing started.')
def bandpass(self):
# bandpass filter using same parameters as for triggering
......@@ -64,7 +66,7 @@ class Event(ObsPyEvent):
waveform_id=WaveformStreamID(network_code='GR', station_code='%03i' % station_id,
channel_code='001', location_code='00'),
method_id='recursive_sta_lta'))
logging.info('Event ' + str(self.event_id) + ': ' + str(len(self.picks)) + ' picks.')
self.logger.info('Event ' + str(self.event_id) + ': ' + str(len(self.picks)) + ' picks.')
def locate(self):
lparam = self.prparam['Locate']
......@@ -118,7 +120,7 @@ class Event(ObsPyEvent):
loc = loc + dm[0:3]
t0 = t0 + dm[3]
else:
logging.warning('Location algorithm not implemented. Location not estimated.')
self.logger.warning('Location algorithm not implemented. Location not estimated.')
x = loc[0] + self.param['General']['origin_ch1903_east']
y = loc[1] + self.param['General']['origin_ch1903_north']
......@@ -129,7 +131,7 @@ class Event(ObsPyEvent):
cols = pd.Index(['Event_id', 'datenum', 'x', 'y', 'z'], name='cols')
outdata = [self.event_id] + [t0.matplotlib_date] + [x] + [y] + [z]
df = pd.DataFrame(data=[outdata], columns=cols)
df.to_csv('events.csv', mode='a', header=False, index=False)
df.to_csv('event_loc.csv', mode='a', header=False, index=False)
# convert coordinates to WGS84
proj_ch1903 = pyproj.Proj(init="epsg:21781")
......@@ -161,7 +163,7 @@ class Event(ObsPyEvent):
self.dist.append(dist)
self.tcalc.append(tcalc)
self.loc_ind.append(trig_ch)
logging.info('Event ' + str(self.event_id) + ': Location %3.2f %3.2f %3.2f; %i iterations, rms %4.3f ms'
self.logger.info('Event ' + str(self.event_id) + ': Location %3.2f %3.2f %3.2f; %i iterations, rms %4.3f ms'
% (loc[0], loc[1], loc[2], nit, rms))
def est_magnitude(self, origin_number=np.inf):
......@@ -169,7 +171,7 @@ class Event(ObsPyEvent):
if origin_number > len(self.origins):
origin_number = len(self.origins)
elif origin_number < 1:
logging.warning('Origin number does not exist.')
self.logger.warning('Origin number does not exist.')
origin_number = origin_number - 1
trig_ch = []
......@@ -215,7 +217,7 @@ class Event(ObsPyEvent):
# calculate relative magnitude for each receiver/event recording
average_magnitude = np.log10(np.mean(mag_exp))
logging.info('Event ' + str(self.event_id) + ': Relative magnitude %4.2f.' % average_magnitude)
self.logger.info('Event ' + str(self.event_id) + ': Relative magnitude %4.2f.' % average_magnitude)
if(average_magnitude < np.inf):
self.magnitudes.append(Magnitude(mag=average_magnitude,
resource_id='%s/magnitude/%d' % (
......@@ -273,8 +275,7 @@ class Event(ObsPyEvent):
'r-', linewidth=1.0)
if len(self.origins):
loc_ind_trig = self.loc_ind[-1][
:] # why was this? if this line gives error switch between ind_trig[0] and self.loc_ind[-1][:]
loc_ind_trig = self.loc_ind[-1][:] # why was this? if this line gives error switch between ind_trig[0] and self.loc_ind[-1][:]
for m in range(len(loc_ind_trig)):
axs[loc_ind_trig[m]].plot(
(self.tcalc[len(self.origins) - 1][m], self.tcalc[len(self.origins) - 1][m]),
......@@ -318,15 +319,37 @@ class Event(ObsPyEvent):
if save_fig:
fig.savefig(fparam['plot_folder_'+self.classification] + "/event-" + str(self.event_id) + '_' + t_start_plotn + ".png",
dpi=100)
logging.info('Event ' + str(self.event_id) + ': Plotted, Figure saved.')
self.logger.info('Event ' + str(self.event_id) + ': Plotted, Figure saved.')
else:
logging.info('Event ' + str(self.event_id) + ': Plotted, Figure not saved.')
self.logger.info('Event ' + str(self.event_id) + ': Plotted, Figure not saved.')
else:
if save_fig:
fig.savefig(fparam['noise_vis_folder'] + "/noise vis-" + t_start_plotn + ".png", dpi=100)
logging.info('Noise Visualisation: Plotted, Figure saved.')
self.logger.info('Noise Visualisation: Plotted, Figure saved.')
else:
logging.info('Noise Visualisation: Plotted, Figure not saved.')
self.logger.info('Noise Visualisation: Plotted, Figure not saved.')
plt.close(fig)
def event_save_csv(self, filename='events.csv'):
# write CSV file with CH1903 coordinates
cols = pd.Index(['Event_id', 'datenum', 'x', 'y', 'z', 'Mag', 'loc_rms', 'npicks'], name='cols')
if len(self.t0):
t0 = self.t0[-1].matplotlib_date
x = self.extra['x']['value']
y = self.extra['y']['value']
z = self.extra['z']['value']
rms = self.origins[-1].time_errors['uncertainty']
else:
t0 = ' '
x, y, z, rms = 0, 0, 0, 0
if len(self.magnitudes):
Mag = self.magnitudes[-1]['mag']
else:
Mag = -99
npicks = len(self.picks)
outdata = [self.event_id] + [t0] + [x] + [y] + [z] + [Mag] + [rms] + [npicks]
df = pd.DataFrame(data=[outdata], columns=cols)
df.to_csv(filename, mode='a', header=False, index=False)
self.logger.info('Event ' + str(self.event_id) + ': Info saved to %s.', filename)
......@@ -12,16 +12,14 @@ Version 0.0, 23.10.2018, Joseph Doetsch (doetschj)
"""
import logging
from dug_seis.processing.event import Event
from dug_seis.processing.get_waveforms import get_waveforms
def event_processing(param, load_file, trig_time, event_id, classification):
logging.debug('Starting processing for event %d' % event_id)
def event_processing(param, load_file, trig_time, event_id, classification, logger):
wf_stream = get_waveforms(param, load_file, trig_time)
event = Event(param, wf_stream, event_id, classification)
event = Event(param, wf_stream, event_id, classification, logger)
if classification == 'noise':
event.event_plot(save_fig=True)
elif classification == 'electronic':
......@@ -46,3 +44,5 @@ def event_processing(param, load_file, trig_time, event_id, classification):
elif classification == 'active':
event.write('%s/hammer%s.xml' % (param['Processing']['Folders']['quakeml_folder'], event_id), 'quakeml',
validate=True)
event.event_save_csv('events.csv')
logger.info('Finished event processing for event %d' % event_id)
......@@ -21,8 +21,13 @@ from dug_seis.processing.event_processing import event_processing
def processing(param):
logger = logging.getLogger('dug-seis')
logger.info('Starting trigger module')
tparam = param['Trigger']
logging.info('Starting trigger module')
if param['Processing']['parallel_processing']:
from dug_seis.processing.celery_tasks import event_processing_celery
os.system('redis-server > redis.log &')
os.system('celery -A dug_seis worker --loglevel=debug --concurrency=%i &> celery.log &' %param['Processing']['number_workers'])
# create folders for processing
if not os.path.exists(param['Processing']['Folders']['quakeml_folder']):
......@@ -52,9 +57,8 @@ def processing(param):
processed_files = []
while 1:
if len(new_files):
# print(new_files)
current_file = new_files.pop(0)
logging.info('Working on file ' + current_file)
logger.info('Working on file ' + current_file)
sta = Stream()
ds = pyasdf.ASDFDataSet(asdf_folder + '/' + current_file, mode='r')
......@@ -70,7 +74,7 @@ def processing(param):
tr.stats.delta = sta_total[0].stats.delta
sta_total.merge(method=1, interpolation_samples=0)
logging.debug(sta_total)
logger.debug(sta_total)
trigger_out, event_nr = dug_trigger(sta_total, tparam, event_nr, event_nr_s) # run trigger function
overlap = tparam['starttime'] + tparam['endtime'] #+ tparam['endtime']['sta_lta']['lt_window']/sta_total.stats.sampling_rate
......@@ -79,7 +83,7 @@ def processing(param):
sta_overlap = sta.trim(t_end - overlap, t_end)
#trigger_out = trigger_out[trigger_out['Classification'] == 'passive']
logging.debug(trigger_out)
logger.debug(trigger_out)
event_id = [i for i in trigger_out['Event_id']]
classification = [i for i in trigger_out['Classification']]
......@@ -92,7 +96,11 @@ def processing(param):
load_file = [current_file]
for l in range(0, len(trig_time)):
event_processing(param, load_file, trig_time[l], event_id[l], classification[l]) # run processing for each event
if param['Processing']['parallel_processing']:
event_processing_celery.delay(param, load_file, trig_time[l], event_id[l], classification[l])
logger.info('Event ' + str(event_id[l]) + ' at ' + str(trig_time[l]) + ' sent to parallel worker.')
else:
event_processing(param, load_file, trig_time[l], event_id[l], classification[l], logger) # run processing for each event
# noise visualization
if tparam['noise_vis']=='True':
......@@ -103,7 +111,11 @@ def processing(param):
load_file = [processed_files[-1], current_file]
else:
load_file = [current_file]
event_processing(param, load_file, next_noise_vis, 0, 'noise') # start processing to visualize noise
if param['Processing']['parallel_processing']:
event_processing_celery.delay(param, load_file, next_noise_vis, 0, 'noise') # start processing to visualize noise
logger.info('Noise at ' + str(next_noise_vis) + ' sent to parallel worker.')
else:
event_processing(param, load_file, next_noise_vis, 0, 'noise', logger) # start processing to visualize noise
next_noise_vis += tparam['noise_vis_interval']
processed_files.append(current_file)
......@@ -112,5 +124,5 @@ def processing(param):
flist = sorted([f for f in os.listdir(asdf_folder) if f.endswith('.h5')])
new_files = [f for f in flist if f not in processed_files]
if not len(new_files):
logging.info('Waiting for new files.')
logger.info('Waiting for new files.')
time.sleep(1)
......@@ -24,7 +24,8 @@ if sys.version_info[:2] < (3, 6):
_authors = [
'Joseph Doetsch',
'Linus Villiger',
'Thomas Haag']
'Thomas Haag',
'Sem Demir']
_authors_email = [
'joseph.doetsch@erdw.ethz.ch',
'linus.villiger@sed.ethz.ch']
......@@ -37,7 +38,9 @@ _install_requires = [
"pyyaml",
"pyasdf",
"pandas",
"pyproj",]
"pyproj",
"celery",
"redis",]
setup(
name='DUG-Seis',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment