libdsproc3  2.0
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups
dsproc_dataset_store.c
Go to the documentation of this file.
1 /*******************************************************************************
2 *
3 * COPYRIGHT (C) 2010 Battelle Memorial Institute. All Rights Reserved.
4 *
5 ********************************************************************************
6 *
7 * Author:
8 * name: Brian Ermold
9 * phone: (509) 375-2277
10 * email: brian.ermold@pnl.gov
11 *
12 ********************************************************************************
13 *
14 * REPOSITORY INFORMATION:
15 * $Revision: 81662 $
16 * $Author: ermold $
17 * $Date: 2017-10-27 16:09:46 +0000 (Fri, 27 Oct 2017) $
18 *
19 ********************************************************************************
20 *
21 * NOTE: DOXYGEN is used to generate documentation for this file.
22 *
23 *******************************************************************************/
24 
25 /** @file dsproc_dataset_store.c
26  * Dataset Store Functions.
27  */
28 
29 #include <string.h>
30 #include <unistd.h>
31 
32 #include "dsproc3.h"
33 #include "dsproc_private.h"
34 
35 extern DSProc *_DSProc; /**< Internal DSProc structure */
36 
37 /** @privatesection */
38 
39 /*******************************************************************************
40  * Static Functions Visible Only To This Module
41  */
42 
43 /**
44  * Static: Get the next time the output file should be split at.
45  *
46  * If an error occurs in this function it will be appended to the log and
47  * error mail messages, and the process status will be set appropriately.
48  *
49  * @param ds - pointer to the DataStream structure
50  * @param prev_time - time of the previously stored data record
51  *
52  * @return
53  * - next split time (in seconds since 1970)
54  * - 0 if the file should be split now (i.e. SPLIT_ON_STORE is set)
55  * - -1 if an error occurred
56  */
57 static time_t _dsproc_get_next_split_time(
58  DataStream *ds,
59  time_t prev_time)
60 {
61  int split_mode = ds->split_mode;
62  double split_start = ds->split_start;
63  double split_interval = ds->split_interval;
64  time_t next_start = 0;
65  time_t split_time = 0;
66  time_t interval = 0;
67  struct tm gmt;
68 
69  if (split_mode == SPLIT_ON_STORE ||
70  split_mode == SPLIT_NONE) {
71 
72  return(0);
73  }
74 
75  /* Get the tm structure for the specified time */
76 
77  memset(&gmt, 0, sizeof(struct tm));
78 
79  if (!gmtime_r(&prev_time, &gmt)) {
80 
82  "Could not get next split time.\n"
83  " -> gmtime error: %s\n", strerror(errno));
84 
86  return(-1);
87  }
88 
89  gmt.tm_hour = 0;
90  gmt.tm_min = 0;
91  gmt.tm_sec = 0;
92 
93  /* Get the starting split time */
94 
95  if (split_mode == SPLIT_ON_MONTHS) {
96 
97  /* get the split interval */
98 
99  if (split_interval > 0.0) {
100  interval = (time_t)(split_interval + 0.5);
101  }
102  else {
103  interval = 1;
104  }
105 
106  /* get the starting split time */
107 
108  gmt.tm_mon = (int)(split_start + 0.5) - 1;
109  gmt.tm_mday = 1;
110 
111  if (gmt.tm_mon < 0 ||
112  gmt.tm_mon > 11) {
113 
114  gmt.tm_mon = 0;
115  }
116 
117  split_time = timegm(&gmt);
118 
119  if (split_time > prev_time) {
120 
121  gmt.tm_year--;
122 
123  next_start = split_time;
124  split_time = timegm(&gmt);
125  }
126  else {
127  gmt.tm_year++;
128 
129  next_start = timegm(&gmt);
130 
131  gmt.tm_year--;
132  }
133 
134  /* find the next split time */
135 
136  while (split_time <= prev_time) {
137 
138  gmt.tm_mon += interval;
139 
140  if (gmt.tm_mon > 11) {
141 
142  gmt.tm_mon -= 12;
143  gmt.tm_year++;
144  }
145 
146  split_time = timegm(&gmt);
147  }
148  }
149  else if (split_mode == SPLIT_ON_DAYS) {
150 
151  split_start -= 1.0;
152 
153  /* get the split interval */
154 
155  if (split_interval > 0.0) {
156  interval = (time_t)((split_interval * 86400.0) + 0.5);
157  }
158  else {
159  interval = 86400;
160  }
161 
162  /* get the starting split time */
163 
164  gmt.tm_mday = 1;
165  split_time = timegm(&gmt);
166 
167  if (split_start > 0.0) {
168  split_time += (time_t)((split_start * 86400.0) + 0.5);
169  }
170 
171  if (split_time > prev_time) {
172 
173  if (gmt.tm_mon) {
174  gmt.tm_mon--;
175  }
176  else {
177  gmt.tm_mon = 11;
178  gmt.tm_year--;
179  }
180 
181  next_start = split_time;
182  split_time = timegm(&gmt);
183 
184  if (split_start > 0.0) {
185  split_time += (time_t)((split_start * 86400.0) + 0.5);
186  }
187  }
188  else {
189 
190  if (gmt.tm_mon < 11) {
191  gmt.tm_mon++;
192  }
193  else {
194  gmt.tm_mon = 0;
195  gmt.tm_year++;
196  }
197 
198  next_start = timegm(&gmt);
199 
200  if (split_start > 0.0) {
201  next_start += (time_t)((split_start * 86400.0) + 0.5);
202  }
203  }
204 
205  /* find the next split time */
206 
207  while (split_time <= prev_time) {
208  split_time += interval;
209  }
210  }
211  else { /* default: SPLIT_ON_HOURS */
212 
213  /* get the split interval */
214 
215  if (split_interval > 0.0) {
216  interval = (time_t)((split_interval * 3600.0) + 0.5);
217  }
218  else {
219  interval = 86400;
220  }
221 
222  /* get the starting split time */
223 
224  split_time = timegm(&gmt);
225 
226  if (split_start > 0.0) {
227  split_time += (time_t)((split_start * 3600.0) + 0.5);
228  }
229 
230  next_start = split_time;
231 
232  if (split_time > prev_time) {
233  split_time -= 86400;
234  }
235  else {
236  next_start += 86400;;
237  }
238 
239  /* find the next split time */
240 
241  while (split_time <= prev_time) {
242  split_time += interval;
243  }
244  }
245 
246  if (next_start < split_time) {
247  return(next_start);
248  }
249 
250  return(split_time);
251 }
252 
253 /**
254  * Static: Write an output csv file.
255  *
256  * If an error occurs in this function it will be appended to the log and
257  * error mail messages, and the process status will be set appropriately.
258  *
259  * @param full_path - full path to the output file
260  * @param dataset - output dataset
261  * @param ntimes - number of times in the output dataset
262  * @param times - array of times in the output dataset
263  *
264  * @return
265  * - 1 if successful
266  * - 0 if an error occurred
267  */
268 int _dsproc_write_csv(
269  const char *full_path,
270  CDSGroup *dataset,
271  size_t ntimes,
272  timeval_t *times)
273 {
274  FILE *fp;
275  CDSVar *var;
276  CDSAtt *att;
277  int *skip;
278  int buflen;
279  char *buffer;
280  char *chrp;
281  char ts[32];
282  int nbytes;
283  int length;
284  int ci, vi;
285  size_t ti;
286 
287  /* Allocate memory for buffers */
288 
289  buflen = 256;
290  buffer = (char *)calloc(buflen, sizeof(char));
291  skip = (int *)calloc(dataset->nvars, sizeof(int));
292 
293  if (!skip || !buffer) {
294 
296  "Could not create output CSV file: %s\n"
297  " -> memory allocation error creating buffer\n",
298  full_path);
299 
300  if (skip) free(skip);
301  if (buffer) free(buffer);
302 
304  return(0);
305  }
306 
307  /* Open the output file */
308 
309  fp = fopen(full_path, "w");
310 
311  if (!fp) {
312 
314  "Could not open output CSV file: %s\n"
315  " -> %s\n",
316  full_path, strerror(errno));
317 
319  return(0);
320  }
321 
322  /* Print the header */
323 
324  nbytes = fprintf(fp, "time");
325  if (nbytes < 0) goto WRITE_ERROR;
326 
327  for (vi = 0; vi < dataset->nvars; ++vi) {
328 
329  var = dataset->vars[vi];
330 
331  /* Skip time variables */
332 
333  if (strcmp(var->name, "base_time") == 0 ||
334  strcmp(var->name, "time") == 0 ||
335  strcmp(var->name, "time_offset") == 0) {
336 
337  skip[vi] = 1;
338  continue;
339  }
340 
341  /* Skip multi-dimensional variables */
342 
343  if (var->ndims > 1) {
344  if (!(var->ndims == 2 && var->type == CDS_CHAR)) {
345 
347  "%s: Skipping multi-dimensional variable in CSV output: %s\n",
348  dataset->name, var->name);
349 
350  skip[vi] = 1;
351  continue;
352  }
353  }
354 
355  /* Skip variables that are not dimensioned by time */
356 
357  if (var->ndims == 0 || strcmp(var->dims[0]->name, "time") != 0) {
358 
360  "%s: Skipping non-time-varying variable in CSV output: %s\n",
361  dataset->name, var->name);
362 
363  skip[vi] = 1;
364  continue;
365  }
366 
367  /* Print column name using variable name and units */
368 
369  att = cds_get_att(var, "units");
370  if (att && att->type == CDS_CHAR && att->value.cp &&
371  strcmp(att->value.cp, "unitless") != 0) {
372 
373  nbytes = fprintf(fp, ", %s (%s)", var->name, att->value.cp);
374  }
375  else {
376  nbytes = fprintf(fp, ", %s", var->name);
377  }
378 
379  if (nbytes < 0) goto WRITE_ERROR;
380  }
381 
382  nbytes = fprintf(fp, "\n");
383  if (nbytes < 0) goto WRITE_ERROR;
384 
385  /* Print data rows */
386 
387  buflen = 0;
388 
389  for (ti = 0; ti < ntimes; ++ti) {
390 
391  /* Print record time */
392 
393  nbytes = fprintf(fp, "%s", format_timeval(&times[ti], ts));
394  if (nbytes < 0) goto WRITE_ERROR;
395 
396  /* Print column values */
397 
398  for (vi = 0; vi < dataset->nvars; ++vi) {
399 
400  var = dataset->vars[vi];
401  if (skip[vi]) continue;
402 
403  switch (var->type) {
404 
405  case CDS_CHAR:
406  if (var->ndims == 1) {
407  nbytes = fprintf(fp, ", %c", var->data.cp[ti]);
408  }
409  else {
410  length = cds_var_sample_size(var);
411 
412  if (length >= buflen) {
413 
414  buflen = length + 1;
415  buffer = (char *)realloc(buffer, buflen * sizeof(char));
416 
417  if (!buffer) {
418 
420  "Could not write to output CSV file: %s\n"
421  " -> memory allocation error resizing buffer\n",
422  full_path);
423 
424  fclose(fp);
425  free(skip);
426  free(buffer);
427 
429  return(0);
430  }
431  }
432 
433  chrp = &var->data.cp[ti*length];
434 
435  for (ci = 0; ci < length; ++ci) {
436  buffer[ci] = chrp[ci];
437  }
438  buffer[ci] = '\0';
439 
440  if (!strchr(buffer, ',')) {
441  nbytes = fprintf(fp, ", %s", buffer);
442  }
443  else if (!strchr(buffer, '"')) {
444  nbytes = fprintf(fp, ", \"%s\"", buffer);
445  }
446  else if (!strchr(buffer, '\'')) {
447  nbytes = fprintf(fp, ", '%s'", buffer);
448  }
449  else {
450  while ((chrp = strchr(buffer, ','))) {
451  *chrp = ';';
452  }
453  }
454  }
455  break;
456  case CDS_BYTE:
457  nbytes = fprintf(fp, ", %d", var->data.bp[ti]);
458  break;
459  case CDS_SHORT:
460  nbytes = fprintf(fp, ", %d", var->data.sp[ti]);
461  break;
462  case CDS_INT:
463  nbytes = fprintf(fp, ", %d", var->data.ip[ti]);
464  break;
465  case CDS_FLOAT:
466  nbytes = fprintf(fp, ", %.7g", var->data.fp[ti]);
467  break;
468  case CDS_DOUBLE:
469  nbytes = fprintf(fp, ", %.15g", var->data.dp[ti]);
470  break;
471  default:
472  nbytes = fprintf(fp, ", NaN");
473  break;
474  }
475 
476  if (nbytes < 0) goto WRITE_ERROR;
477  }
478 
479  nbytes = fprintf(fp, "\n");
480  if (nbytes < 0) goto WRITE_ERROR;
481  }
482 
483  if (fclose(fp) != 0) {
484  fp = (FILE *)NULL;
485  goto WRITE_ERROR;
486  }
487 
488  free(skip);
489  free(buffer);
490 
491  return(1);
492 
493 WRITE_ERROR:
494 
495  if (fp) fclose(fp);
496  free(skip);
497  free(buffer);
498 
500  "Could not write to output CSV file: %s\n"
501  " -> %s\n",
502  full_path, strerror(errno));
503 
505  return(0);
506 }
507 
508 /** @publicsection */
509 
510 /*******************************************************************************
511  * Internal Functions Visible To The Public
512  */
513 
514 /**
515  * Store all output datasets.
516  *
517  * If an error occurs in this function it will be appended to the log and
518  * error mail messages, and the process status will be set appropriately.
519  *
520  * @return
521  * - 1 if successful
522  * - 0 if an error occurred
523  */
525 {
526  DataStream *ds;
527  int ds_id;
528 
529  for (ds_id = 0; ds_id < _DSProc->ndatastreams; ds_id++) {
530 
531  ds = _DSProc->datastreams[ds_id];
532 
533  if (ds->role == DSR_OUTPUT && ds->out_cds) {
534 
535  if (dsproc_store_dataset(ds_id, 0) < 0) {
536  return(0);
537  }
538  }
539  }
540 
541  return(1);
542 }
543 
544 /**
545  * Store an output dataset.
546  *
547  * This function will:
548  *
549  * - Filter out duplicate records in the dataset, and verify that the
550  * record times are in chronological order. Duplicate records are
551  * defined has having identical times and data values.
552  *
553  * - Filter all NaN and Inf values for variables that have a missing value
554  * defined for datastreams that have the DS_FILTER_NANS flag set. This
555  * should only be used if the DS_STANDARD_QC flag is also set, or for
556  * datasets that do not have any QC variables defined. This is the default
557  * for a and b level datastreams.
558  * (see the dsproc_set_datastream_flags() function).
559  *
560  * - Apply standard missing value, min, max, and delta QC checks for
561  * datastreams that have the DS_STANDARD_QC flag set. This is the default
562  * for b level datastreams.
563  * (see the dsproc_set_datastream_flags() function).
564  *
565  * - Filter out all records that are duplicates of previously stored
566  * data, and verify that the records do not overlap any previously
567  * stored data. This check is skipped if we are in asynchronous processing
568  * mode. This check is also currently being skipped if we are in
569  * reprocessing mode and the file splitting mode is SPLIT_ON_STORE
570  * (the default for VAPs).
571  *
572  * - Verify that none of the record times are in the future.
573  *
574  * - Merge datasets with existing files and only split on defined intervals
575  * or when metadata values change. The default for VAPs is to create a new
576  * file for every dataset stored, and the default for ingests is to create
577  * daily files that split at midnight UTC
578  * (see the dsproc_set_datastream_split_mode() function).
579  *
580  * If an error occurs in this function it will be appended to the log and
581  * error mail messages, and the process status will be set appropriately.
582  *
583  * @param ds_id - datastream ID
584  * @param newfile - specifies if a new file should be created
585  *
586  * @return
587  * - number of data samples stored.
588  * - 0 if no data was found in the dataset, or if all the data
589  * samples were duplicates of previously stored data.
590  * - -1 if an error occurred
591  */
593  int ds_id,
594  int newfile)
595 {
596  DataStream *ds = _DSProc->datastreams[ds_id];
597  const char *ds_path = ds->dir->path;
598  time_t current_time = time(NULL);
599  int reproc_mode = dsproc_get_reprocessing_mode();
600  int force_mode = dsproc_get_force_mode();
601  int async_mode = dsproc_get_asynchrounous_mode();
602 
603  CDSGroup *out_dataset = ds->out_cds;
604  DSFormat out_format = ds->format;
605  timeval_t *out_times = (timeval_t *)NULL;
606  size_t out_ntimes;
607  timeval_t out_begin;
608  timeval_t out_end;
609 
610  DSFile **dsfiles = (DSFile **)NULL;
611  DSFile *dsfile = (DSFile *)NULL;
612  timeval_t dsfile_end;
613  CDSGroup *dsfile_dod;
614 
615  time_t split_time = 0;
616  timeval_t split_timeval = { 0, 0 };
617 
618  int filtered_first_sample;
619  time_t base_time;
620  time_t midnight;
621  CDSVar *time_var;
622  CDSAtt *time_desc_att;
623  char *time_desc = (char *)NULL;
624 
625  char timestamp[32];
626  char full_path[PATH_MAX];
627  int ncid;
628 
629  size_t ds_start;
630  size_t nc_start;
631  size_t count;
632  int si, ei;
633 
634  int last_errno;
635  int status;
636  char current_ts[32], begin_ts[32], end_ts[32];
637 
639 
640  /************************************************************
641  * Make sure we have an output dataset with data in it
642  *************************************************************/
643 
644  if (!out_dataset) {
646  "%s: Attempt to store NULL dataset\n", ds->name);
647  return(0);
648  }
649 
650  out_ntimes = 0;
651  out_times = dsproc_get_sample_timevals(out_dataset, 0, &out_ntimes, NULL);
652 
653  if (!out_times) {
654 
655  if (out_ntimes == 0) {
656 
658  "%s: No data found in output dataset\n", ds->name);
659 
660  _dsproc_free_datastream_out_cds(ds);
661  return(0);
662  }
663  else {
664  goto ERROR_EXIT;
665  }
666  }
667 
668  out_begin = out_times[0];
669 
670  /************************************************************
671  * Get the long_name of the time variable. This may be needed
672  * later if the base time needs to be adjusted.
673  *************************************************************/
674 
675  time_desc = (char *)NULL;
676  time_var = cds_get_var(out_dataset, "time");
677 
678  if (time_var) {
679 
680  time_desc_att = cds_get_att(time_var, "long_name");
681 
682  if (time_desc_att &&
683  time_desc_att->type == CDS_CHAR) {
684 
685  time_desc = strdup(time_desc_att->value.cp);
686  if (!time_desc) {
687 
689  "Could not store data for: %s\n"
690  " -> memory allocation error\n",
691  ds->name);
692 
694  goto ERROR_EXIT;
695  }
696  }
697  }
698 
699  /************************************************************
700  * Filter out duplicate samples in the output dataset, and
701  * verify all samples are in chronological order.
702  *************************************************************/
703 
704  if (!_dsproc_filter_duplicate_samples(
705  &out_ntimes, out_times, out_dataset)) {
706 
707  goto ERROR_EXIT;
708  }
709 
710  /************************************************************
711  * Set cell boundary data variables.
712  *************************************************************/
713 
714  if (!dsproc_set_bounds_data(out_dataset, 0, 0)) {
715  goto ERROR_EXIT;
716  }
717 
718  /************************************************************
719  * Set _ChunkSizes attribute values.
720  *************************************************************/
721 
722  if (!dsproc_set_chunksizes(out_dataset, 0)) {
723  goto ERROR_EXIT;
724  }
725 
726  /************************************************************
727  * Filter Nan/Inf values
728  *************************************************************/
729 
730  if (ds->flags & DS_FILTER_NANS) {
731  if (!dsproc_filter_dataset_nans(out_dataset, 1)) {
732  goto ERROR_EXIT;
733  }
734  }
735 
736  /************************************************************
737  * Apply Standard QC checks
738  *************************************************************/
739 
740  if (ds->flags & DS_STANDARD_QC) {
741  if (!dsproc_standard_qc_checks(ds_id, out_dataset)) {
742  goto ERROR_EXIT;
743  }
744  }
745 
746  /************************************************************
747  * Apply Custom QC checks
748  *************************************************************/
749 
750  status = _dsproc_custom_qc_hook(ds_id, out_dataset);
751 
752  if (status < 0) {
753  goto ERROR_EXIT;
754  }
755 
756  if (status == 0) {
757  if (out_times) free(out_times);
758  if (time_desc) free(time_desc);
759  _dsproc_free_datastream_out_cds(ds);
760  return(0);
761  }
762 
763  /************************************************************
764  * Hack for CSV output format.
765  *
766  * The proper way to do this is to add a data file access
767  * layer to abstract the file format from the code logic...
768  * but this should be good enough for what is needed now...
769  *************************************************************/
770 
771  if (out_format == DSF_CSV) {
772 
773  out_begin = out_times[0];
774  out_end = out_times[out_ntimes - 1];
775 
776  /* Create file name */
777 
778  dsproc_create_timestamp(out_begin.tv_sec, timestamp);
779 
780  snprintf(full_path, PATH_MAX, "%s/%s.%s.%s",
781  ds_path, ds->name, timestamp, ds->extension);
782 
783  /* Create the file */
784 
786 
787  format_timeval(&out_begin, begin_ts);
788  format_timeval(&out_end, end_ts);
789 
791  "%s: Writing file for record set:\n"
792  " - times: '%s' to '%s'\n"
793  " - file: %s",
794  ds->name, begin_ts, end_ts, full_path);
795  }
796 
797  if (!_dsproc_write_csv(
798  full_path, out_dataset, out_ntimes, out_times)) {
799 
800  goto ERROR_EXIT;
801  }
802 
803  /* Update datastream stats and return.
804  *
805  * Yes, I'm using a goto... it's a hack...
806  */
807 
808  goto UPDATE_DS_STATS;
809  }
810 
811  /************************************************************
812  * Filter out samples in the output dataset that are duplicates
813  * of previously stored data, and verify that the remaining
814  * samples do not overlap any previously stored data.
815  *
816  * This is skipped when running in asynchronous mode because the
817  * output files may not be created in chronological order, and
818  * may still be in the process of being created.
819  *
820  * We also skip this if reprocessing mode is enabled and the
821  * split mode is SPLIT_ON_STORE. I don't really like doing this,
822  * but it was a requirement to maintain the behavior the VAP
823  * developers were used to when using the older libraries.
824  *
825  *************************************************************/
826 
827  if (!async_mode &&
828  (!reproc_mode || ds->split_mode != SPLIT_ON_STORE)) {
829 
830  if (!_dsproc_filter_stored_samples(
831  ds, &out_ntimes, out_times, out_dataset)) {
832 
833  goto ERROR_EXIT;
834  }
835 
836  /* Check if all samples were filtered out */
837 
838  if (!out_ntimes) {
839 
841  "%s: All data was filtered from the dataset\n", ds->name);
842 
843  if (out_times) free(out_times);
844  if (time_desc) free(time_desc);
845  _dsproc_free_datastream_out_cds(ds);
846  return(0);
847  }
848  }
849 
850  filtered_first_sample = (TV_EQ(out_times[0], out_begin)) ? 0 : 1;
851 
852  out_begin = out_times[0];
853  out_end = out_times[out_ntimes - 1];
854 
855  format_timeval(&out_begin, begin_ts);
856  format_timeval(&out_end, end_ts);
857 
858  /************************************************************
859  * Validate the begin and end times
860  *************************************************************/
861 
863  "%s: Validating time range ['%s' -> '%s']\n",
864  ds->name, begin_ts, end_ts);
865 
866  /* Validate first sample time */
867 
868  if (!dsproc_validate_datastream_data_time(ds_id, &out_begin)) {
869  goto ERROR_EXIT;
870  }
871 
872  /* Validate last sample time */
873 
874  if (out_end.tv_sec > current_time) {
875 
876  format_secs1970(current_time, current_ts);
877 
879  "Could not store data for: %s\n"
880  " -> end time '%s' is in the future (current time is '%s')\n",
881  ds->name, end_ts, current_ts);
882 
884  goto ERROR_EXIT;
885  }
886 
887  /************************************************************
888  * Check if the base time needs to be adjusted.
889  *
890  * We only want to do this if the first sample of the dataset
891  * was filtered out, and the time of midnight is being used
892  * as the base_time.
893  *************************************************************/
894 
895  if (filtered_first_sample) {
896 
897  base_time = dsproc_get_base_time(out_dataset);
898  midnight = cds_get_midnight(base_time);
899 
900  if (base_time == midnight) {
901 
902  midnight = cds_get_midnight(out_begin.tv_sec);
903 
904  if (midnight != base_time) {
905  dsproc_set_base_time(out_dataset, time_desc, midnight);
906  }
907  }
908  }
909 
910  /************************************************************
911  * Check if we need to start a new file for this Dataset
912  *************************************************************/
913 
914  dsfile = (DSFile *)NULL;
915 
916  if (!async_mode && !newfile &&
917  ds->split_mode != SPLIT_ON_STORE) {
918 
919  /************************************************************
920  * Check for an existing file we should append this dataset to
921  *************************************************************/
922 
923  status = _dsproc_find_dsfiles(ds->dir, NULL, &out_begin, &dsfiles);
924 
925  if (status < 0) {
926  goto ERROR_EXIT;
927  }
928 
929  if (status > 0) {
930 
931  dsfile = dsfiles[0];
932  free(dsfiles);
933 
934  /************************************************************
935  * Make sure the begin time of the output dataset is after
936  * the end time of the previous dataset.
937  *************************************************************/
938 
939  dsfile_end = dsfile->timevals[dsfile->ntimes - 1];
940 
941  if (TV_LTEQ(out_begin, dsfile_end)) {
942 
944  "%s: Overlapping records found with previously stored data\n"
945  " -> '%s' to '%s': output dataset overlaps file: %s\n",
946  ds->name, begin_ts, end_ts, dsfile->name);
947 
949  goto ERROR_EXIT;
950  }
951 
952  /************************************************************
953  * Check for a split interval between the previously
954  * stored data and the output dataset.
955  *************************************************************/
956 
957  if (ds->split_mode != SPLIT_NONE) {
958 
959  split_time = _dsproc_get_next_split_time(ds, dsfile_end.tv_sec);
960 
961  if (split_time <= out_begin.tv_sec) {
962  dsfile = (DSFile *)NULL;
963  }
964  }
965 
966  if (dsfile) {
967 
968  /************************************************************
969  * Check for metadata changes between the previously
970  * stored data and the output dataset.
971  *************************************************************/
972 
973  dsfile_dod = _dsproc_fetch_dsfile_dod(dsfile);
974 
975  if (!dsfile_dod) {
976  goto ERROR_EXIT;
977  }
978 
980  "%s: Checking for DOD metadata changes\n",
981  ds->name);
982 
983  status = dsproc_compare_dods(dsfile_dod, out_dataset, 1);
984 
985  if (status < 0) {
986  goto ERROR_EXIT;
987  }
988 
989  if (status != 0) {
990 
991  /* We do not want to split files only on dod_version changes,
992  * but we do want to report if the changes found were a result
993  * of a dod_version change. */
994 
995  dsproc_compare_dod_versions(dsfile_dod, out_dataset, 1);
996 
997  dsfile = (DSFile *)NULL;
998 
1000  "%s: Forcing NetCDF file split at: %s\n",
1001  ds->name, begin_ts);
1002  }
1003  }
1004 
1005  } /* end if found previous datastream file */
1006 
1007  } /* end if (!newfile) */
1008 
1009  /************************************************************
1010  * Store the Dataset
1011  *************************************************************/
1012 
1014  "Storing: %s data from '%s' to '%s': %d records\n",
1015  ds->name, begin_ts, end_ts, out_ntimes);
1016 
1017  /* Loop over split intervals */
1018 
1019  for (si = 0, ei = 0; si < (int)out_ntimes; si = ei + 1) {
1020 
1021  /************************************************************
1022  * Get the time index of the next file split,
1023  * and set the sample count accordingly.
1024  *************************************************************/
1025 
1026  split_time = _dsproc_get_next_split_time(ds, out_times[si].tv_sec);
1027 
1028  if (split_time) {
1029 
1030  split_timeval.tv_sec = split_time;
1031 
1033  out_ntimes, out_times, split_timeval, CDS_LT);
1034  }
1035  else {
1036  ei = out_ntimes - 1;
1037  }
1038 
1039  ds_start = si;
1040  count = ei - si + 1;
1041 
1042  if (dsfile) {
1043 
1044  /************************************************************
1045  * Append this record set to the existing file
1046  *************************************************************/
1047 
1049 
1050  format_timeval(&out_times[si], begin_ts);
1051  format_timeval(&out_times[ei], end_ts);
1052 
1054  "%s: Appending record set to existing file:\n"
1055  " - times: '%s' to '%s'\n"
1056  " - file: %s",
1057  ds->name, begin_ts, end_ts, dsfile->full_path);
1058  }
1059 
1060  if (!_dsproc_open_dsfile(dsfile, NC_WRITE)) {
1061  goto ERROR_EXIT;
1062  }
1063 
1064  ncid = dsfile->ncid;
1065  nc_start = dsfile->ntimes;
1066 
1067  if (!_dsproc_update_stored_metadata(out_dataset, ncid)) {
1068  goto ERROR_EXIT;
1069  }
1070  }
1071  else {
1072 
1073  /************************************************************
1074  * Check if we need to adjust the base_time metadata
1075  * for the first time in this output record set.
1076  *************************************************************/
1077 
1078  if (si != 0) {
1079 
1080  midnight = cds_get_midnight(out_times[si].tv_sec);
1081 
1082  dsproc_set_base_time(out_dataset, time_desc, midnight);
1083  }
1084 
1085  /************************************************************
1086  * Create the new file
1087  *************************************************************/
1088 
1089  /* Create file name */
1090 
1091  dsproc_create_timestamp(out_times[si].tv_sec, timestamp);
1092 
1093  snprintf(full_path, PATH_MAX, "%s/%s.%s.%s",
1094  ds_path, ds->name, timestamp, ds->extension);
1095 
1096  /* Create the file */
1097 
1099 
1100  format_timeval(&out_times[si], begin_ts);
1101  format_timeval(&out_times[ei], end_ts);
1102 
1104  "%s: Creating new file for record set:\n"
1105  " - times: '%s' to '%s'\n"
1106  " - file: %s",
1107  ds->name, begin_ts, end_ts, full_path);
1108  }
1109 
1110  if (reproc_mode || async_mode) {
1111  ncid = ncds_create_file(out_dataset, full_path, 0, 0, 1);
1112  }
1113  else {
1114  ncid = ncds_create_file(out_dataset, full_path, NC_NOCLOBBER, 0, 1);
1115  }
1116 
1117  if (!ncid) {
1118 
1120  "Could not create file: %s\n",
1121  full_path);
1122 
1124  goto ERROR_EXIT;
1125  }
1126 
1127  /************************************************************
1128  * Write the static data
1129  *************************************************************/
1130 
1131  if (!ncds_write_static_data(out_dataset, ncid)) {
1132 
1134  "Could not write static data to file: %s\n",
1135  full_path);
1136 
1138  goto ERROR_EXIT;
1139  }
1140 
1141  nc_start = 0;
1142  }
1143 
1144  /************************************************************
1145  * Write the data records
1146  *************************************************************/
1147 
1148  if (!ncds_write_records(out_dataset, ds_start, ncid, nc_start, count)) {
1149 
1150  if (dsfile) {
1152  "Could not write data records to file: %s\n",
1153  dsfile->full_path);
1154  }
1155  else {
1157  "Could not write data records to file: %s\n",
1158  full_path);
1159  }
1160 
1162  goto ERROR_EXIT;
1163  }
1164 
1165  /************************************************************
1166  * Flush data to disk
1167  *************************************************************/
1168 
1169  if (dsfile) {
1170 
1171  if (!ncds_sync(ncid)) {
1172 
1174  "Could not sync data to file: %s\n",
1175  dsfile->full_path);
1176 
1178  goto ERROR_EXIT;
1179  }
1180 
1181  /* Make sure the file times get reloaded
1182  * if this file is accessed again */
1183 
1184  dsfile->stats.st_mtime = 0;
1185 
1186  /* We are done appending data to this file */
1187 
1188  dsfile = (DSFile *)NULL;
1189  }
1190  else {
1191 
1192  if (!ncds_close(ncid)) {
1193 
1195  "Could not close file: %s\n",
1196  full_path);
1197 
1199  goto ERROR_EXIT;
1200  }
1201 
1202  /* Make sure the directory listing gets reloaded
1203  * if this directory is accessed again */
1204 
1205  ds->dir->stats.st_mtime = 0;
1206  }
1207 
1208  } /* end loop over split intervals */
1209 
1210  /************************************************************
1211  * Update datastream stats and times
1212  *************************************************************/
1213 
1214 UPDATE_DS_STATS:
1215 
1217  ds_id, out_ntimes, &out_begin, &out_end);
1218 
1219  if (out_times) free(out_times);
1220  if (time_desc) free(time_desc);
1221  _dsproc_free_datastream_out_cds(ds);
1222  return((int)out_ntimes);
1223 
1224 ERROR_EXIT:
1225 
1226  last_errno = errno;
1227 
1228  if (out_times) free(out_times);
1229  if (time_desc) free(time_desc);
1230  _dsproc_free_datastream_out_cds(ds);
1231 
1232  if (force_mode && !dsproc_is_fatal(last_errno)) {
1233 
1235  "FORCE: Forcing ingest to skip output dataset for: %s\n",
1236  ds->name);
1237 
1238  return(0);
1239  }
1240 
1241  return(-1);
1242 }
1243 
1244 /*******************************************************************************
1245  * Public Functions
1246  */
1247