57 static time_t _dsproc_get_next_split_time(
61 int split_mode = ds->split_mode;
62 double split_start = ds->split_start;
63 double split_interval = ds->split_interval;
64 time_t next_start = 0;
65 time_t split_time = 0;
77 memset(&gmt, 0,
sizeof(
struct tm));
79 if (!gmtime_r(&prev_time, &gmt)) {
82 "Could not get next split time.\n"
83 " -> gmtime error: %s\n", strerror(errno));
99 if (split_interval > 0.0) {
100 interval = (time_t)(split_interval + 0.5);
108 gmt.tm_mon = (int)(split_start + 0.5) - 1;
111 if (gmt.tm_mon < 0 ||
117 split_time =
timegm(&gmt);
119 if (split_time > prev_time) {
123 next_start = split_time;
124 split_time =
timegm(&gmt);
129 next_start =
timegm(&gmt);
136 while (split_time <= prev_time) {
138 gmt.tm_mon += interval;
140 if (gmt.tm_mon > 11) {
146 split_time =
timegm(&gmt);
155 if (split_interval > 0.0) {
156 interval = (time_t)((split_interval * 86400.0) + 0.5);
165 split_time =
timegm(&gmt);
167 if (split_start > 0.0) {
168 split_time += (time_t)((split_start * 86400.0) + 0.5);
171 if (split_time > prev_time) {
181 next_start = split_time;
182 split_time =
timegm(&gmt);
184 if (split_start > 0.0) {
185 split_time += (time_t)((split_start * 86400.0) + 0.5);
190 if (gmt.tm_mon < 11) {
198 next_start =
timegm(&gmt);
200 if (split_start > 0.0) {
201 next_start += (time_t)((split_start * 86400.0) + 0.5);
207 while (split_time <= prev_time) {
208 split_time += interval;
215 if (split_interval > 0.0) {
216 interval = (time_t)((split_interval * 3600.0) + 0.5);
224 split_time =
timegm(&gmt);
226 if (split_start > 0.0) {
227 split_time += (time_t)((split_start * 3600.0) + 0.5);
230 next_start = split_time;
232 if (split_time > prev_time) {
236 next_start += 86400;;
241 while (split_time <= prev_time) {
242 split_time += interval;
246 if (next_start < split_time) {
268 int _dsproc_write_csv(
269 const char *full_path,
290 buffer = (
char *)calloc(buflen,
sizeof(
char));
291 skip = (
int *)calloc(dataset->
nvars,
sizeof(
int));
293 if (!skip || !buffer) {
296 "Could not create output CSV file: %s\n"
297 " -> memory allocation error creating buffer\n",
300 if (skip) free(skip);
301 if (buffer) free(buffer);
309 fp = fopen(full_path,
"w");
314 "Could not open output CSV file: %s\n"
316 full_path, strerror(errno));
324 nbytes = fprintf(fp,
"time");
325 if (nbytes < 0)
goto WRITE_ERROR;
327 for (vi = 0; vi < dataset->
nvars; ++vi) {
329 var = dataset->
vars[vi];
333 if (strcmp(var->
name,
"base_time") == 0 ||
334 strcmp(var->
name,
"time") == 0 ||
335 strcmp(var->
name,
"time_offset") == 0) {
343 if (var->
ndims > 1) {
347 "%s: Skipping multi-dimensional variable in CSV output: %s\n",
357 if (var->
ndims == 0 || strcmp(var->
dims[0]->
name,
"time") != 0) {
360 "%s: Skipping non-time-varying variable in CSV output: %s\n",
371 strcmp(att->
value.
cp,
"unitless") != 0) {
373 nbytes = fprintf(fp,
", %s (%s)", var->
name, att->
value.
cp);
376 nbytes = fprintf(fp,
", %s", var->
name);
379 if (nbytes < 0)
goto WRITE_ERROR;
382 nbytes = fprintf(fp,
"\n");
383 if (nbytes < 0)
goto WRITE_ERROR;
389 for (ti = 0; ti < ntimes; ++ti) {
394 if (nbytes < 0)
goto WRITE_ERROR;
398 for (vi = 0; vi < dataset->
nvars; ++vi) {
400 var = dataset->
vars[vi];
401 if (skip[vi])
continue;
406 if (var->
ndims == 1) {
407 nbytes = fprintf(fp,
", %c", var->
data.
cp[ti]);
412 if (length >= buflen) {
415 buffer = (
char *)realloc(buffer, buflen *
sizeof(
char));
420 "Could not write to output CSV file: %s\n"
421 " -> memory allocation error resizing buffer\n",
433 chrp = &var->
data.
cp[ti*length];
435 for (ci = 0; ci < length; ++ci) {
436 buffer[ci] = chrp[ci];
440 if (!strchr(buffer,
',')) {
441 nbytes = fprintf(fp,
", %s", buffer);
443 else if (!strchr(buffer,
'"')) {
444 nbytes = fprintf(fp,
", \"%s\"", buffer);
446 else if (!strchr(buffer,
'\'')) {
447 nbytes = fprintf(fp,
", '%s'", buffer);
450 while ((chrp = strchr(buffer,
','))) {
457 nbytes = fprintf(fp,
", %d", var->
data.
bp[ti]);
460 nbytes = fprintf(fp,
", %d", var->
data.
sp[ti]);
463 nbytes = fprintf(fp,
", %d", var->
data.
ip[ti]);
466 nbytes = fprintf(fp,
", %.7g", var->
data.
fp[ti]);
469 nbytes = fprintf(fp,
", %.15g", var->
data.
dp[ti]);
472 nbytes = fprintf(fp,
", NaN");
476 if (nbytes < 0)
goto WRITE_ERROR;
479 nbytes = fprintf(fp,
"\n");
480 if (nbytes < 0)
goto WRITE_ERROR;
483 if (fclose(fp) != 0) {
500 "Could not write to output CSV file: %s\n"
502 full_path, strerror(errno));
529 for (ds_id = 0; ds_id <
_DSProc->ndatastreams; ds_id++) {
531 ds =
_DSProc->datastreams[ds_id];
596 DataStream *ds =
_DSProc->datastreams[ds_id];
597 const char *ds_path = ds->dir->path;
598 time_t current_time = time(NULL);
603 CDSGroup *out_dataset = ds->out_cds;
610 DSFile **dsfiles = (DSFile **)NULL;
611 DSFile *dsfile = (DSFile *)NULL;
615 time_t split_time = 0;
618 int filtered_first_sample;
623 char *time_desc = (
char *)NULL;
626 char full_path[PATH_MAX];
636 char current_ts[32], begin_ts[32], end_ts[32];
646 "%s: Attempt to store NULL dataset\n", ds->name);
655 if (out_ntimes == 0) {
658 "%s: No data found in output dataset\n", ds->name);
660 _dsproc_free_datastream_out_cds(ds);
668 out_begin = out_times[0];
675 time_desc = (
char *)NULL;
680 time_desc_att =
cds_get_att(time_var,
"long_name");
685 time_desc = strdup(time_desc_att->
value.
cp);
689 "Could not store data for: %s\n"
690 " -> memory allocation error\n",
704 if (!_dsproc_filter_duplicate_samples(
705 &out_ntimes, out_times, out_dataset)) {
750 status = _dsproc_custom_qc_hook(ds_id, out_dataset);
757 if (out_times) free(out_times);
758 if (time_desc) free(time_desc);
759 _dsproc_free_datastream_out_cds(ds);
773 out_begin = out_times[0];
774 out_end = out_times[out_ntimes - 1];
780 snprintf(full_path, PATH_MAX,
"%s/%s.%s.%s",
781 ds_path, ds->name, timestamp, ds->extension);
791 "%s: Writing file for record set:\n"
792 " - times: '%s' to '%s'\n"
794 ds->name, begin_ts, end_ts, full_path);
797 if (!_dsproc_write_csv(
798 full_path, out_dataset, out_ntimes, out_times)) {
808 goto UPDATE_DS_STATS;
830 if (!_dsproc_filter_stored_samples(
831 ds, &out_ntimes, out_times, out_dataset)) {
841 "%s: All data was filtered from the dataset\n", ds->name);
843 if (out_times) free(out_times);
844 if (time_desc) free(time_desc);
845 _dsproc_free_datastream_out_cds(ds);
850 filtered_first_sample = (
TV_EQ(out_times[0], out_begin)) ? 0 : 1;
852 out_begin = out_times[0];
853 out_end = out_times[out_ntimes - 1];
863 "%s: Validating time range ['%s' -> '%s']\n",
864 ds->name, begin_ts, end_ts);
874 if (out_end.tv_sec > current_time) {
879 "Could not store data for: %s\n"
880 " -> end time '%s' is in the future (current time is '%s')\n",
881 ds->name, end_ts, current_ts);
895 if (filtered_first_sample) {
900 if (base_time == midnight) {
904 if (midnight != base_time) {
914 dsfile = (DSFile *)NULL;
916 if (!async_mode && !newfile &&
923 status = _dsproc_find_dsfiles(ds->dir, NULL, &out_begin, &dsfiles);
939 dsfile_end = dsfile->timevals[dsfile->ntimes - 1];
941 if (
TV_LTEQ(out_begin, dsfile_end)) {
944 "%s: Overlapping records found with previously stored data\n"
945 " -> '%s' to '%s': output dataset overlaps file: %s\n",
946 ds->name, begin_ts, end_ts, dsfile->name);
959 split_time = _dsproc_get_next_split_time(ds, dsfile_end.tv_sec);
961 if (split_time <= out_begin.tv_sec) {
962 dsfile = (DSFile *)NULL;
973 dsfile_dod = _dsproc_fetch_dsfile_dod(dsfile);
980 "%s: Checking for DOD metadata changes\n",
997 dsfile = (DSFile *)NULL;
1000 "%s: Forcing NetCDF file split at: %s\n",
1001 ds->name, begin_ts);
1014 "Storing: %s data from '%s' to '%s': %d records\n",
1015 ds->name, begin_ts, end_ts, out_ntimes);
1019 for (si = 0, ei = 0; si < (int)out_ntimes; si = ei + 1) {
1026 split_time = _dsproc_get_next_split_time(ds, out_times[si].tv_sec);
1030 split_timeval.tv_sec = split_time;
1033 out_ntimes, out_times, split_timeval,
CDS_LT);
1036 ei = out_ntimes - 1;
1040 count = ei - si + 1;
1054 "%s: Appending record set to existing file:\n"
1055 " - times: '%s' to '%s'\n"
1057 ds->name, begin_ts, end_ts, dsfile->full_path);
1060 if (!_dsproc_open_dsfile(dsfile, NC_WRITE)) {
1064 ncid = dsfile->ncid;
1065 nc_start = dsfile->ntimes;
1067 if (!_dsproc_update_stored_metadata(out_dataset, ncid)) {
1093 snprintf(full_path, PATH_MAX,
"%s/%s.%s.%s",
1094 ds_path, ds->name, timestamp, ds->extension);
1104 "%s: Creating new file for record set:\n"
1105 " - times: '%s' to '%s'\n"
1107 ds->name, begin_ts, end_ts, full_path);
1110 if (reproc_mode || async_mode) {
1120 "Could not create file: %s\n",
1134 "Could not write static data to file: %s\n",
1152 "Could not write data records to file: %s\n",
1157 "Could not write data records to file: %s\n",
1174 "Could not sync data to file: %s\n",
1184 dsfile->stats.st_mtime = 0;
1188 dsfile = (DSFile *)NULL;
1195 "Could not close file: %s\n",
1205 ds->dir->stats.st_mtime = 0;
1217 ds_id, out_ntimes, &out_begin, &out_end);
1219 if (out_times) free(out_times);
1220 if (time_desc) free(time_desc);
1221 _dsproc_free_datastream_out_cds(ds);
1222 return((
int)out_ntimes);
1228 if (out_times) free(out_times);
1229 if (time_desc) free(time_desc);
1230 _dsproc_free_datastream_out_cds(ds);
1235 "FORCE: Forcing ingest to skip output dataset for: %s\n",