data_to_asdf.py 7.4 KB
Newer Older
thaag's avatar
thaag committed
1
2
3
4
5
"""Read data from an array to a ASDF file.
Create a new file when needed.
"""
import time
import logging
thaag's avatar
thaag committed
6
import os
7
import io
thaag's avatar
thaag committed
8

thaag's avatar
thaag committed
9
from ctypes import c_int32
thaag's avatar
thaag committed
10
11
12

from math import floor
from obspy.core import Stream, Trace, UTCDateTime
13
from obspy.core.inventory import Longitude, Latitude
thaag's avatar
thaag committed
14
15
16

import pyasdf

17
18
from dug_seis.acquisition.flat_response_stationxml import get_flat_response_inventory

thaag's avatar
thaag committed
19
20
21
22
23

class DataToASDF:

    def __init__(self, param):
        self.folder = param['Acquisition']['asdf_settings']['data_folder']
thaag's avatar
thaag committed
24
        self.folder_tmp = param['Acquisition']['asdf_settings']['data_folder_tmp']
25
26
27
28
        if self.folder[len(self.folder)-1] is not "/":
            self.folder += "/"
        if self.folder_tmp[len(self.folder_tmp)-1] is not "/":
            self.folder_tmp += "/"
thaag's avatar
thaag committed
29
30
        self.filename = param['General']['project_name']
        self.compression = param['Acquisition']['asdf_settings']['compression']
thaag's avatar
thaag committed
31
        self.file_length_sec = param['Acquisition']['asdf_settings']['file_length_sec']
32
        self.station_naming = param['Acquisition']['asdf_settings']['station_naming']
thaag's avatar
thaag committed
33
34
        self.l_notify_size = c_int32(param['Acquisition']['bytes_per_transfer'])

35
36
37
38
        self.stats = {'network': param['General']['stats']['network'],
                      'station': '099',
                      'location': '00',
                      'channel': '001',
thaag's avatar
thaag committed
39
                      'starttime': UTCDateTime().timestamp,
40
41
                      'sampling_rate': param['Acquisition']['hardware_settings']['sampling_frequency'],
                      'gain': '0'
thaag's avatar
thaag committed
42
43
44
                      }

        self._sampling_rate = param['Acquisition']['hardware_settings']['sampling_frequency']
45
        self._gain_selection = param['Acquisition']['hardware_settings']['gain_selection']
46
47
        self._sensor_coords = param['General']['sensor_coords']
        self._sensor_count = param['General']['sensor_count']
thaag's avatar
thaag committed
48
        self._nr_of_datapoints = floor(self.l_notify_size.value / 16 / 2)   # nr of channels & 16 bit = 2 bytes
thaag's avatar
thaag committed
49
50
        self._file_handle = None
        self._time_age_of_file = 0      # keeps track internally how old the file is
thaag's avatar
thaag committed
51
        self._last_used_file_name = None
52
        self._really_verbose_timing_output = False
thaag's avatar
thaag committed
53
54
55
56

    def _check_if_folders_exist_create_if_needed(self):
        if not os.path.isdir(self.folder):
            os.makedirs(self.folder)
57
            logging.info("creating folder: {}".format(self.folder))
thaag's avatar
thaag committed
58
59
60

        if not os.path.isdir(self.folder_tmp):
            os.makedirs(self.folder_tmp)
61
            logging.info("creating folder_tmp: {}".format(self.folder_tmp))
thaag's avatar
thaag committed
62
63
64
65

    def _create_new_file(self):
        """Creates a new file.
        With parameters of the DataToAsdf class. Sets the age of the file to time.time()."""
thaag's avatar
thaag committed
66

thaag's avatar
thaag committed
67
68
        file_name = "{0}_{1}.h5".format(UTCDateTime(), self.filename)
        file_name = file_name.replace(":", "_")
thaag's avatar
thaag committed
69
        folder_file_name = "{0}{1}".format(self.folder_tmp, file_name)
70
        # print("_create_new_file with folder_file_name = {0}".format(folder_file_name))
71
        logging.info("_create_new_file with folder_file_name = {0}".format(folder_file_name))
thaag's avatar
thaag committed
72
73

        self._time_age_of_file = time.time()
thaag's avatar
thaag committed
74
75
76
77
78
79
        # logging.info("self.compression = {}, type = {}".format(self.compression, type(self.compression)))
        if self.compression == 'None':
            # logging.info("if self.compression = None: -> true")
            self._file_handle = pyasdf.ASDFDataSet(folder_file_name, compression=None)
        else:
            self._file_handle = pyasdf.ASDFDataSet(folder_file_name, compression=self.compression)
thaag's avatar
thaag committed
80

thaag's avatar
thaag committed
81
82
83
84
        if self._last_used_file_name is not None:
            os.rename(self.folder_tmp + self._last_used_file_name, self.folder + self._last_used_file_name)
        self._last_used_file_name = file_name

85
86
        self._add_all_station_xml_s(self._file_handle)

thaag's avatar
thaag committed
87
    def _create_new_file_if_needed(self):
thaag's avatar
thaag committed
88
        # check if file_handle exists
thaag's avatar
thaag committed
89
        if self._file_handle is None:
90
            # logging.info("no file found creating new file.\n")
thaag's avatar
thaag committed
91
            self._check_if_folders_exist_create_if_needed()
thaag's avatar
thaag committed
92
93
94
95
            self._create_new_file()

        # check if file is too old
        if self._time_age_of_file + self.file_length_sec < time.time():
96
            # logging.info("file too old, creating new file.\n")
thaag's avatar
thaag committed
97
98
            self._create_new_file()

99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    def _add_all_station_xml_s(self, ds):
        for i in range(self._sensor_count):
            ds.add_stationxml(self._create_station_xml(i))

    def _create_station_xml(self, channel_nr):
        # print("channel_nr = {}, len(self._sensor_coords) = {}".format(channel_nr, len(self._sensor_coords)))

        inv = get_flat_response_inventory(
            sensitivity_value=self._gain_selection[channel_nr] * 2 / 65536,     # conversion 16bit int to mV
            sensitivity_frequency=1.0,
            input_units="M/S",                                                  # ?
            output_units="Counts",                                              # ?
            creation_date=UTCDateTime(self._time_age_of_file),
            network_code=self.stats['network'],
            station_code=self._get_station_name(channel_nr),
            location_code=self.stats['location'],
            channel_code=self.stats['channel'],
            sampling_rate=self.stats['sampling_rate'],
            latitude=Latitude(self._sensor_coords[channel_nr][1]),             # from here on ?
            longitude=Longitude(self._sensor_coords[channel_nr][0]),
            depth=self._sensor_coords[channel_nr][2],
            elevation=0.0,
            azimuth=0.0,
            dip=0.0)

        # Test if the response makes up a valid StationXML file.
        with io.BytesIO() as buf:
            inv.write(buf, format="stationxml", validate=True)

        return inv

    def _get_station_name(self, channel_nr):
        return str(self.station_naming[channel_nr]).zfill(3)

thaag's avatar
thaag committed
133
    def data_to_asdf(self, np_data_list):
thaag's avatar
thaag committed
134
135
136
137
138
139

        time_start_buffer_data_available = time.time()

        stream = Stream()

        card_nr = 0
thaag's avatar
thaag committed
140
        for np_data in np_data_list:
thaag's avatar
thaag committed
141
142
143
            time_tmp = time.time()

            for i in range(16):
144
                self.stats['station'] = self._get_station_name(i + 16 * card_nr)
145
                stream += Trace(np_data[i], header=self.stats)      # without transpose np_data[:, i]
146
                # logging.info("{}, {}\n".format(self.stats['station'], self.stats['starttime']))
thaag's avatar
thaag committed
147

thaag's avatar
thaag committed
148
            del np_data
149
150
            if self._really_verbose_timing_output:
                logging.debug("stream += Trace: {0:.3f} sec, ".format(time.time() - time_tmp))
thaag's avatar
thaag committed
151

thaag's avatar
thaag committed
152
153
            card_nr += 1

thaag's avatar
thaag committed
154
155
        time_tmp = time.time()
        self._create_new_file_if_needed()
156
157
        if self._really_verbose_timing_output:
            logging.debug("_create_new_file_if_needed(): {0:.3f} sec, ".format(time.time() - time_tmp))
thaag's avatar
thaag committed
158
159
        time_tmp = time.time()

thaag's avatar
thaag committed
160
        self._file_handle.append_waveforms(stream, tag="raw_recording")
161
162
        if self._really_verbose_timing_output:
            logging.debug("append_waveforms: {0:.3f} sec".format(time.time() - time_tmp))
thaag's avatar
thaag committed
163
164
165

        # starttime for next segment
        self.stats['starttime'] = UTCDateTime(self.stats['starttime']) +\
thaag's avatar
thaag committed
166
            self._nr_of_datapoints / self._sampling_rate
167
        # logging.info("self.stats[ starttime ] = {}".format(self.stats['starttime']))
thaag's avatar
thaag committed
168
169

        del stream
170
171
172
        if self._really_verbose_timing_output:
            logging.debug("end of data_to_asdf.py cycle used time: {0:.3f} sec"
                          .format(time.time() - time_start_buffer_data_available))